import { BehaviorSubject, connectable, Observable, of, ReplaySubject, SubjectLike } from 'rxjs';
import { catchError, filter, first, map, mergeMap, shareReplay, switchMap, tap } from 'rxjs/operators';
import { extendPayload } from '../common/extend-payload';
import { AuthValidator, SocketIoConfig } from '../config';
import { toNetworkId } from '../data-types/network/wen-networks';
import { ALL_SOCKET_EVENTS, AuthHelloPayload, RPC_EVENT_PREFIX, RPC_SOCKET_EVENTS, SOCKET_EVENTS, SocketPayload } from '../events/events';
import { WenSocketIoClient } from './socket-io-client';

export type SocketCallback<T = any> = (response: T) => void;

interface SocketEmitData {
  event: ALL_SOCKET_EVENTS;
  payload?: SocketPayload;
  callback?: SocketCallback;
}

enum HandshakeState {
  NOT_DONE = 'NOT_DONE',
  PENDING = 'PENDING',
  DONE = 'DONE'
}

export enum SocketConnectionState {
  CONNECTED = 'CONNECTED',
  RECONNECTING = 'RECONNECTING',
  DISCONNECTED = 'DISCONNECTED'
}

class NoopAuthValidator implements AuthValidator {
  validateAuth(): Observable<boolean> {
    return of(true);
  }
}

export class SocketIoConnector {

  private socketState = new BehaviorSubject<SocketConnectionState>(SocketConnectionState.RECONNECTING);
  public readonly socketState$ = this.socketState.pipe(shareReplay(1));

  private handshakeState = new BehaviorSubject<HandshakeState>(HandshakeState.NOT_DONE);

  private scheduleSocketEmit = new ReplaySubject<SocketEmitData>();

  constructor(
    private config: SocketIoConfig,
    private socketIoClient: WenSocketIoClient,
  ) {
    const { authValidator = new NoopAuthValidator() } = this.config;
    this.socketIoClient.fromEvent('connect').subscribe(() => {
      this.socketState.next(SocketConnectionState.CONNECTED);
    });
    this.socketIoClient.fromEvent('disconnect').subscribe(() => {
      this.socketState.next(this.socketIoClient.isActive() ? SocketConnectionState.RECONNECTING : SocketConnectionState.DISCONNECTED);
      this.handshakeState.next(HandshakeState.NOT_DONE);
    });
    this.socketIoClient.fromEvent('connect_error').subscribe(() => {
      this.socketState.next(this.socketIoClient.isActive() ? SocketConnectionState.RECONNECTING : SocketConnectionState.DISCONNECTED);
    });
    this.scheduleSocketEmit.pipe(
      mergeMap((data) => {
        return this.socketState.pipe(
          filter(state => state === SocketConnectionState.CONNECTED),
          first(),
          mergeMap(() => authValidator.validateAuth()),
          filter((successfulValidation) => successfulValidation),
          mergeMap(() => {
            let result: Observable<any>;
            if (this.handshakeState.getValue() === HandshakeState.DONE) {
              result = of(data);
            } else if (this.handshakeState.getValue() === HandshakeState.PENDING) {
              result = this.handshakeState.pipe(
                filter((state) => state === HandshakeState.DONE),
                first()
              );
            } else {
              result = this.doHandshake();
            }
            return result.pipe(
              map(() => data),
              catchError(() => {
                return of(null);
              })
            );
          })
        );
      }),
      filter(data => Boolean(data)),
      mergeMap((data) => {
        return this.config.upstreamGuard.onEmit(data.event, data.payload).pipe(
          first(),
          map((payloadMeta) => ({ data, payloadMeta }))
        );
      }),
    ).subscribe(({ data, payloadMeta }) => {
      const { event, callback } = data;
      const payload = extendPayload(data.payload, payloadMeta);
      if (callback) {
        this.socketIoClient.emit(event, payload, callback);
      } else {
        this.socketIoClient.emit(event, payload);
      }
    });
  }

  doInitialHandshake() {
    return this.reconnectIfDisconnected().pipe(
      tap(() => {
        this.scheduleSocketEmit.next(null);
      }),
      switchMap(() => {
        return this.handshakeState.pipe(
          filter(handshakeState => handshakeState === HandshakeState.DONE),
          first(),
          map(() => null)
        );
      })
    );
  }

  invalidateHandshake() {
    this.handshakeState.next(HandshakeState.NOT_DONE);
    this.socketIoClient.disconnect();
  }

  fromEvent<T>(event: SOCKET_EVENTS) {
    return this.socketIoClient.fromEvent<T>(event);
  }

  fromEventWithReplayOne<T>(event: SOCKET_EVENTS, initialValue?: T) {
    let subject: SubjectLike<T>;
    if (initialValue !== undefined) {
      subject = new BehaviorSubject(initialValue);
    } else {
      subject = new ReplaySubject(1);
    }
    const event$ = connectable(this.fromEvent<T>(event), {
      connector: () => subject, resetOnDisconnect: false
    });
    event$.connect();
    return event$.pipe();
  }

  fromEventWithReplay<T>(event: SOCKET_EVENTS) {
    const event$ = connectable(this.fromEvent<T>(event), {
      connector: () => new ReplaySubject(), resetOnDisconnect: false
    });
    event$.connect();
    return event$.pipe();
  }

  emit<TEVENT extends RPC_SOCKET_EVENTS | SOCKET_EVENTS, TPAYLOAD extends SocketPayload = SocketPayload>(
    event: TEVENT, payload?: TPAYLOAD, callback?: SocketCallback
  ) {
    if (!Boolean(payload)) {
      payload = {} as TPAYLOAD;
    }
    let socketEmitData: SocketEmitData = { event, payload };
    if (event.startsWith(RPC_EVENT_PREFIX)) {
      socketEmitData = { ...socketEmitData, callback };
    }
    if (!this.socketIoClient.isConnected()) {
      this.socketIoClient.connect();
    }
    this.scheduleSocketEmit.next(socketEmitData);
  }

  acknowledgement$<C, E extends SOCKET_EVENTS = any, P extends SocketPayload = SocketPayload>(
    event: E, payload?: P
  ) {
    const result$ = new ReplaySubject<C>(1);
    const updatedEventName: RPC_SOCKET_EVENTS = `${RPC_EVENT_PREFIX}${event}`;
    this.emit(updatedEventName, payload, (result: C) => {
      result$.next(result);
      result$.complete();
    });
    return result$.pipe();
  }

  reconnectIfConnected() {
    return this.socketIoClient.reconnectIfConnected();
  }

  reconnectIfDisconnected() {
    return this.socketIoClient.reconnectIfDisconnected();
  }

  private doHandshake() {
    const { connectionDataProvider, network, authHelloOptions = {} } = this.config;
    this.handshakeState.next(HandshakeState.PENDING);
    const welcome$ = this.fromEvent<{ errorType?: string }>(SOCKET_EVENTS.AUTH_WELCOME).pipe(
      shareReplay(1)
    );
    const helloPayload: AuthHelloPayload = {
      content: connectionDataProvider.getIdToken(),
      device: connectionDataProvider.getDeviceId(),
      network: toNetworkId(network),
      options: {
        disableChatRelevantAutoEmit: Boolean(authHelloOptions.disableChatRelevantAutoEmit),
        disableChannelRelevantAutoEmit: Boolean(authHelloOptions.disableChannelRelevantAutoEmit),
        disableChatSummaryAutoEmit: Boolean(authHelloOptions.disableChatSummaryAutoEmit),
        disableChannelSummaryAutoEmit: Boolean(authHelloOptions.disableChannelSummaryAutoEmit),
      }
    };
    this.socketIoClient.emit(SOCKET_EVENTS.AUTH_HELLO, helloPayload);
    return welcome$.pipe(
      first(),
      map((authHelloResponse) => {
        const errorType = authHelloResponse?.errorType;
        if (errorType) {
          this.invalidateHandshake();
          throw new Error('Invalid handshake response!');
        } else {
          this.handshakeState.next(HandshakeState.DONE);
          return authHelloResponse;
        }
      })
    );
  }
}
