import { Observable, defer, filter, first, from, map, mergeAll, race, switchMap, toArray } from 'rxjs';

const MAX_CONCURRENT_TRANSACTIONS = 20;

export class StreamAbortedError { }

export const concurrentlyInterruptable = <T, R>(
  projector: (val: T) => Observable<R>,
  interruptSignal: Observable<boolean>,
  pauseSignal: Observable<boolean>,
  concurrency = MAX_CONCURRENT_TRANSACTIONS
) => (source: Observable<T[]>) => {
  const result$ = source.pipe(
    switchMap(arr => {
      const operations$ = arr.map(item => {
        const createItem = () => interruptSignal.pipe(
          first(),
          switchMap((isAborted) => {
            if (isAborted) {
              throw new StreamAbortedError();
            }
            return pauseSignal.pipe(
              first(),
              switchMap((isPaused) => {
                if (!isPaused) {
                  return projector(item);
                }
                return race([
                  pauseSignal.pipe(
                    filter((v) => !v),
                    switchMap(() => projector(item))
                  ),
                  interruptSignal.pipe(
                    filter(Boolean),
                    map(() => {
                      throw new StreamAbortedError();
                    })
                  )
                ]).pipe(
                  first(),
                );
              })
            );
          })
        );
        return defer(createItem);
      });
      return from(operations$).pipe(
        mergeAll(concurrency),
        toArray()
      );
    })
  );
  return result$;
};
