import { encode } from '@msgpack/msgpack';
import pRetry from 'p-retry';

import { logger } from '@/logger';

import { InterviewSessionRepository } from '../interview-session';
import { AsyncLock } from '../partition-queue/async-lock';
import { PartitionQueueProgressObserver } from '../partition-queue/partition-queue.progress.observer';
import { PartitionQueue, PullTimeoutError } from '../partition-queue/partition-queue.types';
import { RecordingSessionService } from '../recording-session/recording-session.types';
import { StateMachine } from '../state-machine';
import { matchState, StateMatcher } from '../state-matcher';
import { SubscriptionGroup } from '../subscription-group';

import { QueueSynchronizerState, QueueSynchronizerStateType } from './queue-synchronizer.types';

type BaseClientJsonMessage = {
  type: string;
  sequence: number;
};

type BaseClientDataMessage = [
  {
    type: 'media.data';
    sequence: number;
    size: number;
  },
  Blob,
];

type BaseClientMessage = BaseClientJsonMessage | BaseClientDataMessage;

interface InitializePayload {
  sessionId: string;
  type: 'audio' | 'video';
  promptId: string;
  imageId: string | null;
  recorderUserRuid: string | null;
  recorderUserId: string | null;
  recorderPersonId: string | null;
}

export class QueueSynchronizer<
  ClientMessage extends BaseClientMessage,
  Partition extends string,
> extends StateMachine<QueueSynchronizerState> {
  public progress: PartitionQueueProgressObserver<Record<Partition, ClientMessage>>;

  // In memory queue used to save items when the storage is full.
  // As soon as there's storage available again, this will be pushed to the real, persistent queue.
  private memoryQueue: ClientMessage[] = [];

  // We can only push one item at a time.
  // This is needed to handle the memory queue without racing conditions.
  private sendLock = new AsyncLock();

  constructor(
    private sessionRepository: InterviewSessionRepository,
    private recordingSessionService: RecordingSessionService,
    private queue: PartitionQueue<Record<Partition, ClientMessage>>,
    private partition: Exclude<Partition, number | symbol>,
    private priority?: number,
  ) {
    super({ type: QueueSynchronizerStateType.Empty });
    this.progress = new PartitionQueueProgressObserver(queue);
  }

  private isRunning() {
    return matchState(this.getState())
      .withType(QueueSynchronizerStateType.Ready, () => true)
      .withType(QueueSynchronizerStateType.Finishing, () => true)
      .otherwise(() => false);
  }

  private blobToUint8Array(blob: Blob): Promise<Uint8Array> {
    return new Promise<Uint8Array>((resolve, reject) => {
      const reader = new FileReader();
      reader.addEventListener('load', (e) => {
        if (e.target?.result) {
          resolve(new Uint8Array(e.target.result as ArrayBuffer));
        }
      });
      reader.addEventListener('error', (e) => {
        reject(e.target?.error);
      });
      reader.readAsArrayBuffer(blob);
    });
  }

  private concatenateUint8Arrays(arrays: Uint8Array[]): Uint8Array {
    const totalLength = arrays.reduce((acc, arr) => acc + arr.length, 0);
    const result = new Uint8Array(totalLength);
    let offset = 0;

    for (const array of arrays) {
      result.set(array, offset);
      offset += array.length;
    }

    return result;
  }

  private async encodeMessage(message: ClientMessage): Promise<Uint8Array> {
    if (Array.isArray(message)) {
      return this.concatenateUint8Arrays([
        encode({ type: 'json', data: message[0] }),
        encode({ type: 'binary', data: await this.blobToUint8Array(message[1]) }),
      ]);
    }

    return encode({ type: 'json', data: message });
  }

  private async run(payload: InitializePayload): Promise<void> {
    // Only send the metadata message one time.
    // The message is not saved to the queue so we need to add it manually.
    const isNewSession = (this.sessionRepository.getInterviewSession(payload.sessionId)?.nextChunkIndex ?? 0) === 0;
    let encodedMessages = isNewSession
      ? encode({
          type: 'metadata',
          data: {
            sessionId: payload.sessionId,
            promptId: payload.promptId,
            imageId: payload.imageId,
            type: payload.type,
            recorderUserRuid: payload.recorderUserRuid,
            recorderUserId: payload.recorderUserId,
            recorderPersonId: payload.recorderPersonId,
          },
        })
      : new Uint8Array();
    let lastEncodedSequenceId = -1;
    let isLastMessage = false;

    while (this.isRunning()) {
      const nextItem = await this.queue.pull(this.partition, { pullTimeout: 7000 }).catch((error) => {
        if (error instanceof PullTimeoutError) {
          // If there's no message to be sent, try to persist the in memory queue.
          // We only upload data from the persistent queue.
          this.tryToPersistMemoryQueue();
          return null;
        }
        return Promise.reject(error);
      });

      if (nextItem !== null) {
        const message = nextItem.payload;
        const encodedMessage = await this.encodeMessage(message);
        encodedMessages = this.concatenateUint8Arrays([encodedMessages, encodedMessage]);
        lastEncodedSequenceId = nextItem.sequenceId;
        isLastMessage = Array.isArray(message) === false && message.type === 'session.end';
      }

      const outOfStorage = matchState(this.getState())
        .withType(QueueSynchronizerStateType.Ready, (state) => state.outOfStorage)
        .withType(QueueSynchronizerStateType.Finishing, (state) => state.outOfStorage)
        .otherwise(() => false);

      if (encodedMessages.length >= 4 * 1024 * 1024 || isLastMessage || (outOfStorage && encodedMessages.length > 0)) {
        const chunkIndex = this.sessionRepository.getInterviewSession(payload.sessionId)?.nextChunkIndex ?? 0;

        await pRetry(async (attempt) => {
          logger.info('QUEUE_SYNCHRONIZER.UPLOADING_CHUNK', { attempt, size: encodedMessages.length, chunkIndex });

          try {
            await this.recordingSessionService.uploadChunk(payload.sessionId, chunkIndex, encodedMessages);
          } catch (error) {
            logger.warn('QUEUE_SYNCHRONIZER.UPLOAD_CHUNK_FAILED', { error, attempt });
            throw error;
          }
        });

        await this.queue.commit(this.partition, lastEncodedSequenceId);
        this.sessionRepository.setNextChunkIndex(payload.sessionId, chunkIndex + 1);
        encodedMessages = new Uint8Array();

        if (isLastMessage) {
          this.setState({
            type: QueueSynchronizerStateType.Finished,
            sessionId: payload.sessionId,
          });
        }
      } else {
        logger.info('QUEUE_SYNCHRONIZER.WAITING_FOR_MORE_DATA', { size: encodedMessages.length });
      }
    }
  }

  private tryToPersistMemoryQueue(): void {
    if (this.memoryQueue.length === 0) {
      return;
    }

    const memoryQueue = this.memoryQueue;
    this.memoryQueue = [];

    while (memoryQueue.length) {
      const msg = memoryQueue.shift();
      if (!msg) {
        return;
      }
      this.doSend(msg, true);
    }
  }

  initialize(payload: InitializePayload): void {
    matchState(this.getState())
      .withType(QueueSynchronizerStateType.Empty, StateMatcher.returnState)
      .otherwise(() => {
        throw new Error('The QueueSynchronizer has already been initialized');
      });

    // Move the messages from the in memory queue to the persistent queue
    const subscriptions = new SubscriptionGroup();
    subscriptions.add(
      this.queue.addListener('commit', () => {
        this.tryToPersistMemoryQueue();
      }),
    );

    // Clear the listeners when the state changes to Finished
    this.subscribeType(
      QueueSynchronizerStateType.Finished,
      () => {
        subscriptions.destroy();
      },
      { once: true },
    );

    // Set the state
    this.setState({
      type: QueueSynchronizerStateType.Ready,
      sessionId: payload.sessionId,
      outOfStorage: false,
    });

    // Start the progress observer
    this.progress.startObserving();
    this.subscribeType(QueueSynchronizerStateType.Finished, () => this.progress.stopObserving(), { once: true });

    // Run the synchronizer
    this.run(payload).catch((error) => {
      this.setState({
        type: QueueSynchronizerStateType.UnrecoverableError,
        error,
      });
    });
  }

  finish(): void {
    const state = matchState(this.getState())
      .withType(QueueSynchronizerStateType.Ready, StateMatcher.returnState)
      .withType(QueueSynchronizerStateType.Finishing, StateMatcher.returnNull)
      .withType(QueueSynchronizerStateType.Finished, StateMatcher.returnNull)
      .otherwise(() => {
        throw new Error('The QueueSynchronizer is not in a valid state to be finished');
      });

    if (state === null) {
      console.warn('The QueueSynchronizer is already finishing or finished');
      return;
    }

    this.setState({
      ...state,
      type: QueueSynchronizerStateType.Finishing,
    });
  }

  private async doSend(msg: ClientMessage, fromMemoryQueue: boolean): Promise<void> {
    await this.sendLock.acquire();

    try {
      // Check if the queue is in a valid state
      const state = matchState(this.getState())
        .withType(QueueSynchronizerStateType.Ready, StateMatcher.returnState)
        .withType(QueueSynchronizerStateType.Finishing, StateMatcher.returnState)
        .otherwise(() => {
          throw new Error('The QueueSynchronizer is not in a valid state to send a message');
        });

      if (state.type === QueueSynchronizerStateType.Finishing && fromMemoryQueue === false) {
        throw new Error('The QueueSynchronizer is not in a valid state to send a message');
      }

      // Before queuing the message, check if there's a message in the memory queue pending.
      // If there is, we should just push to the memory queue directly instead of trying to push to the persistent one.
      // This will avoid out of order messages.
      if (this.memoryQueue.length) {
        this.memoryQueue.push(msg);
        return;
      }

      // Queue the message
      try {
        const sequenceId = Array.isArray(msg) ? msg[0].sequence : msg.sequence;
        await this.queue.push(this.partition, { payload: msg, sequenceId }, this.priority);

        // If the message was successfully pushed, we need to reset the outOfStorage flag
        if (state.outOfStorage === true) {
          this.setState({
            ...state,
            outOfStorage: false,
          });
        }
      } catch (error) {
        const typedError = error as { name?: string; message?: string };
        if (typedError?.name !== 'AbortError' || !typedError.message?.startsWith('QuotaExceededError')) {
          throw error;
        }

        // Change the state to out of storage
        if (state.outOfStorage === false) {
          this.setState({
            ...state,
            outOfStorage: true,
          });
        }

        // Keep the message in memory until there's available storage again
        this.memoryQueue.push(msg);
      }
    } finally {
      this.sendLock.release();
    }
  }

  async send(msg: ClientMessage): Promise<void> {
    return this.doSend(msg, false);
  }

  destroy(): void {
    logger.info('QUEUE_SYNCHRONIZER.DESTROY');

    const state = this.getState();
    switch (state.type) {
      case QueueSynchronizerStateType.Finishing:
      case QueueSynchronizerStateType.Finished:
      case QueueSynchronizerStateType.UnrecoverableError:
      case QueueSynchronizerStateType.Destroyed:
        logger.info('QUEUE_SYNCHRONIZER.DESTROY_IGNORED', state);
        return;
      default:
        break;
    }

    this.setState({
      ...state,
      type: QueueSynchronizerStateType.Destroyed,
    });
  }
}
