import { asyncScheduler, defer, Observable, SchedulerLike, Subject } from 'rxjs';
import { finalize, switchMap } from 'rxjs/operators';

/* eslint-disable @typescript-eslint/no-explicit-any */

export class ObservablePool {
  /** Wrappers for queued observables. */
  private readonly _queue: Subject<void>[] = [];

  /** Currently active observables. */
  private readonly _active: Subject<void>[] = [];

  private readonly _scheduler: SchedulerLike;

  private _closed: boolean = false;

  public constructor(
    /** Maximum concurrent operations. */
    public readonly size: number,
    /** Scheduler for starting operations. Defaults to `asyncScheduler`. */
    scheduler?: SchedulerLike
  ) {
    this._scheduler = scheduler ?? asyncScheduler;
  }

  /** Wraps the parameter observable, adding it to the pool queue. */
  public queue<T>(source$: Observable<T> | PromiseLike<T>): Observable<T> {
    if (this._closed) {
      throw Error('Queue is destroyed');
    }

    const source = isPromise(source$)
      ? defer(() => source$)
      : source$;

    const sourceWrapper = new Subject<void>();

    const output$ = sourceWrapper.pipe(switchMap(() => source.pipe(
      finalize(() => sourceWrapper.complete())
    ))).pipe(
      finalize(() => {
        this._active.remove(sourceWrapper);
        this.refresh();
      })
    );

    this._queue.push(sourceWrapper);
    this.refresh();
    return output$;
  }

  /** Completes all queued and active observables */
  public destroy(): void {
    this._closed = true;

    if (this._active.length) {
      this._active.forEach(x => x.complete());
      this._active.clear();
    }
    if (this._queue.length) {
      this._queue.forEach(x => x.complete());
      this._queue.clear();
    }
  }

  /** Triggers queued observables until queue is empty or active count is full. */
  private refresh(): void {
    while (this._queue.length && this.size > this._active.length) {
      const currentWrapper = this._queue.shift();
      this._active.push(currentWrapper);

      // Queue next
      this._scheduler.schedule(() => {
        if (!this._closed) {
          currentWrapper.next();
        }
      });
    }
  }
}

function isPromise<T = unknown>(arg: any): arg is PromiseLike<T> {
  return typeof arg?.then === 'function';
}
