import { Injectable } from '@angular/core';
import { Store } from '@ngrx/store';
import { InboxReplayResponse, RoomInboxAckType, SendToUsersEventResponseUtil, SocketIoService, ToUsersEventType } from '@portal/wen-backend-api';
import { ChatTracer } from '@portal/wen-chat-client';
import { connectable, first, forkJoin, map, of, ReplaySubject, switchMap, tap } from 'rxjs';
import { DateUtil } from '../../../common/date/date-util';
import { groupBy } from '../../../common/util/group-by';
import { onExchangedSessionsReceived, SessionsByRoom } from '../../../store/chat/key-actions';
import { RootState } from '../../../store/root/public-api';
import { WenStorageService } from '../../storage/wen-storage.service';
import { WenOAuthService } from '../../user-management/wen-oauth.service';
import { SendKeyDecryptor } from '../decryption/send-key-decryptor';

@Injectable()
export class InboxProcessor {

  private inbox$ = connectable(this.socketIoService.chat.inbox.replay.listen.pipe(
    switchMap((response) => {
      return this.processInboxEvent(response);
    }),
    map((inboxResults) => {
      if (!inboxResults) {
        return inboxResults;
      }
      return inboxResults.filter(inboxResult => Boolean(inboxResult));
    })
  ), { connector: () => new ReplaySubject(1), resetOnDisconnect: false });

  constructor(
    private store: Store<RootState>,
    private oAuthService: WenOAuthService,
    private storageService: WenStorageService,
    private socketIoService: SocketIoService,
    private sendKeyDecryptor: SendKeyDecryptor,
    private chatTracer: ChatTracer,
  ) {
    this.inbox$.subscribe((inboxResult) => {
      if (!inboxResult?.length) {
        return;
      }
      const groups = groupBy(inboxResult, (item) => item.decryptedContent.roomId);
      const sessionsByRoom: SessionsByRoom = {};
      groups.forEach((roomKeyResponses, key) => {
        const sessions = roomKeyResponses.map(({ decryptedContent: { sessionId } }) => ({ sessionId }));
        sessionsByRoom[key] = sessions;
      });
      this.store.dispatch(onExchangedSessionsReceived({
        sessionsByRoom
      }));
    });
  }

  startInboxProcessing() {
    this.inbox$.connect();
    return this.inbox$.pipe(
      first()
    );
  }

  private processInboxEvent(response: InboxReplayResponse) {
    if (!response?.length) {
      return of(null);
    }
    const processedKeys$ = response.map(result => {
      const { eventType } = result.payload;
      switch (eventType) {
        case ToUsersEventType.SHARE_KEY:
          const { id: eventId } = result;
          const insertTimestamp = SendToUsersEventResponseUtil.fixInsertTimestamp(result);
          this.chatTracer.addShareKeyIncomingHint({ description: 'Recveived share key from inbox', eventId, insertTimestamp });
          return this.sendKeyDecryptor.decryptShareKeyEvent(result).pipe(
            tap((keyResult) => {
              if (keyResult) {
                this.ackEvent(result.id);
              }
            }),
          );
        case ToUsersEventType.EXCHANGE_ROOM_KEY_REQUEST:
        case ToUsersEventType.EXCHANGE_ROOM_KEY_RESPONSE:
          // TODO: remove this when backend doesn't replay exchange events anymore
          this.ackEvent(result.id);
          return of(null);
        default:
          return of(null);
      }
    });
    return forkJoin(processedKeys$);
  }

  private ackEvent(eventId: string) {
    this.socketIoService.chat.inbox.ack.emit({
      userId: this.oAuthService.userId,
      deviceId: this.storageService.getDeviceId(),
      eventId,
      lastAckTimestamp: DateUtil.currentDateString(),
      type: RoomInboxAckType.READ
    });
  }

}
