import {
  BehaviorSubject,
  defer,
  forkJoin,
  from,
  MonoTypeOperatorFunction,
  Observable,
  of,
  OperatorFunction,
  ReplaySubject,
  SchedulerLike,
  Subject,
} from 'rxjs';
import {
  filter,
  finalize,
  last,
  map,
  mergeMap,
  shareReplay,
  switchMap,
  takeUntil,
  toArray,
} from 'rxjs/operators';

import { LoadingSubject } from './loading-subject';

/* eslint-disable no-redeclare */
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */

export {
  prepare,
  indicate,
  forkThrottle,
  shareUntil,
  switchJoin,
  switchInsert,
  waitWhile
};

/**
 * Returns an Observable that mirrors the source Observable, but will call a specified function before
 * the source is executed.
 *
 * @param callback function that is called before source executes
 * @example someWebRequest$.pipe(prepare(() => disabledButtons())).subscribe()
 */
function prepare<T>(
  callback: () => void
): MonoTypeOperatorFunction<T> {
  return source => defer(() => {
    callback();
    return source;
  });
}

/**
 * Returns an Observable that mirrors the source Observable, but will emit true before source executes and false after
 * source terminates for given indicator subject. If multiple observables indicate the same subject,
 * use a `LoadingSubject`.
 *
 * @param indicator Subject that is used as indicator
 * @example someWebRequest$.pipe(indicate(this.isLoading$)).subscribe()
 */
function indicate<T>(
  indicator: Subject<boolean> | LoadingSubject
): MonoTypeOperatorFunction<T> {
  if (LoadingSubject.isLoadingSubject(indicator)) {
    const hash = Math.random().toString();
    return source => source.pipe(
      prepare(() => indicator.next(true, hash)),
      finalize(() => indicator.next(false, hash))
    );
  } else {
    return source => source.pipe(
      prepare(() => indicator.next(true)),
      finalize(() => indicator.next(false))
    );
  }
}

/**
 * Combination of `shareReplay` and `takeUntil` operators.
 *
 * Defaults to `bufferSize` 1.
 */
function shareUntil<T>(
  notifier: Observable<unknown>,
  scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T> {
  return source => source.pipe(shareReplay(1, undefined, scheduler), takeUntil(notifier));
}

/**
 * Mimics `forkJoin` while allowing only the specified amount of observables to be active at the same time.
 * Preserves order.
 * @param sources observables to join
 * @param concurrency maximum amount of concurrently active observables
 */
function forkThrottle<T>(
  sources: Observable<T>[],
  concurrency: number
): Observable<T[]> {
  return from(sources).pipe(
    mergeMap((inner, index) => inner.pipe(
      last(),
      map(value => ({ index, value }))
    ), concurrency),
    toArray(),
    map(result => result.sortBy('index').map(x => x.value))
  );
}

function switchJoin<T, U>(innerSelector: (outer: T) => Observable<U>): OperatorFunction<T, [T, U]>;
function switchJoin<T, U, R>(
  innerSelector: (outer: T) => Observable<U>,
  resultSelector: (outer: T, inner: U) => R
): OperatorFunction<T, R>;

/**
 * `switchMap` with inner `forkJoin`.
 * @param innerSelector selector to return the inner observable using value from outer
 * @param resultSelector value selector to combine outer and inner emitted values
 */
function switchJoin(
  innerSelector: (outer: unknown) => Observable<unknown>,
  resultSelector?: (outer: unknown, inner: unknown) => unknown
): OperatorFunction<any, any> {
  return switchMap(
    (value: unknown) => forkJoin([
      of(value),
      innerSelector(value)
    ]).pipe(
      map(([a, b]: [unknown, unknown]) => resultSelector ? resultSelector(a, b) : [a, b])
    )
  );
}

/**
 * Causes source values to be ignored as long as latest value emitted by `selector` is true
 * As soon as `selector` emits false, latest source value is emitted to output `Observable`
 * @param selector must emit false before output `Observable` emits
 */
function waitWhile<T, R extends ReplaySubject<boolean> | BehaviorSubject<boolean>>(
  selector: R
): MonoTypeOperatorFunction<T> {
  return source => source.pipe(
    switchMap(
      value => selector.pipe(
        filter(skip => !skip),
        map(() => value)
      )
    )
  );
}

/** Inserts `T` at the head of tuple type `U`. */
type Prepend<T, U extends any[]> = ((a: T, ...b: U) => void) extends ((...args: infer Q) => void) ? Q : [];

/**
 * `forkJoin`s the inner observable's latest value to the _beginning_ of the outer observable tuple.
 * @param selector inner observable selector
 */
function switchInsert<T extends any[], R>(
  selector: (...values: T) => Observable<R>
): OperatorFunction<T, Prepend<R, T>> {
  return switchMap((values: T) => forkJoin<any[]>([
    selector(...values),
    ...values.map(v => of(v)),
  ])) as OperatorFunction<any, any>;
}
