import { Injectable } from '@angular/core';
import { JanEvent, JanMessage, JanPublisher, JanSession } from '@app/shared/model';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { catchError, filter, map, switchMap, take, tap } from 'rxjs/operators';
import { v1 as uuid } from 'uuid';
import { BehaviorSubject, forkJoin, from, interval, Observable, of, Subscription } from 'rxjs';

export interface JanPublishResponse {
  publisherId: number;
  peerConnection: RTCPeerConnection;
}

@Injectable({
  providedIn: 'root',
})
export class JanService {
  private socket$: WebSocketSubject<JanMessage> | undefined;
  private iceServers: any[] | undefined;

  private publishers: JanPublisher[] = [];
  private publishersSubject = new BehaviorSubject<JanPublisher[]>(this.publishers);
  public publishers$ = this.publishersSubject.asObservable();

  private attendees: JanPublisher[] = [];
  private attendeesSubject = new BehaviorSubject<JanPublisher[]>(this.attendees);
  public attendees$ = this.attendeesSubject.asObservable();

  private keepAliveSubscription: Subscription | undefined;

  private newPublishersSubscription: Subscription | undefined;
  private unpublishedSubscription: Subscription | undefined;

  private newAttendeesSubscription: Subscription | undefined;
  private joiningAttendeesSubscription: Subscription | undefined;

  private cameraPeerConnection: RTCPeerConnection | undefined;
  private screenPeerConnection: RTCPeerConnection | undefined;
  private videoPeerConnection: RTCPeerConnection | undefined;
  private peerConnections: { [key: number]: RTCPeerConnection } = {};

  private mainSession: JanSession | undefined;
  private cameraSession: JanSession | undefined;
  private screenSession: JanSession | undefined;
  private videoSession: JanSession | undefined;

  constructor() {}

  public connect(url: string, iceServers: any[], token?: string): Observable<JanSession> {
    this.iceServers = iceServers;

    if (this.socket$) {
      throw new Error('Already connected to Janus');
    }

    return this.connectToJanus(url, token).pipe(
      tap((createResponse) => {
        this.keepAlive(createResponse.data.id, token);
        this.observePublishers();
        this.observeAttendees();
      }),
      switchMap((createResponse) => {
        return this.attachPlugin(createResponse.data.id, token).pipe(
          map((attachResponse) => {
            this.mainSession = {
              sessionId: createResponse.data.id,
              pluginSessionId: attachResponse.data.id,
            };

            return this.mainSession;
          }),
        );
      }),
    );
  }

  public disconnect(token: string): void {
    if (!this.socket$) {
      throw new Error('You cannot disconnect as you are not connected.');
    }

    this.keepAliveSubscription?.unsubscribe();

    this.newPublishersSubscription?.unsubscribe();
    this.unpublishedSubscription?.unsubscribe();

    this.newAttendeesSubscription?.unsubscribe();
    this.joiningAttendeesSubscription?.unsubscribe();

    this.publishers = [];
    this.publishersSubject.next(this.publishers);

    this.attendees = [];
    this.attendeesSubject.next(this.attendees);

    this.cameraPeerConnection?.close();
    this.screenPeerConnection?.close();
    this.videoPeerConnection?.close();

    for (const [key, pc] of Object.entries(this.peerConnections)) {
      pc.close();
    }

    this.peerConnections = {};

    if (this.cameraSession) {
      this.leave(this.cameraSession, token);
      this.cameraSession = undefined;
    }

    if (this.screenSession) {
      this.leave(this.screenSession, token);
      this.screenSession = undefined;
    }

    if (this.mainSession) {
      this.leave(this.mainSession, token);
      this.mainSession = undefined;
    }

    this.socket$?.complete();
    this.socket$ = undefined;
  }

  public unpublishCamera(token: string) {
    if (!this.cameraSession) {
      return;
    }

    this.cameraPeerConnection.close();
    this.cameraPeerConnection = undefined;

    this.leave(this.cameraSession, token);

    this.cameraSession = undefined;
  }

  public unpublishScreen(token: string) {
    if (!this.screenSession) {
      return;
    }

    this.screenPeerConnection.close();
    this.screenPeerConnection = undefined;

    this.leave(this.screenSession, token);

    this.screenSession = undefined;
  }

  public unpublishVideo(token: string) {
    if (!this.videoSession) {
      return;
    }

    this.videoPeerConnection.close();
    this.videoPeerConnection = undefined;

    this.leave(this.videoSession, token);

    this.videoSession = undefined;
  }

  private connectToJanus(url: string, token: string | undefined): Observable<JanMessage> {
    const create: JanMessage = {
      janus: JanEvent.Create,
      transaction: uuid(),
    };

    if (token) {
      create.token = token;
    }

    this.socket$ = webSocket<JanMessage>({ url, protocol: 'janus-protocol' });
    this.socket$.next(create);

    return this.socket$.pipe(
      take(1),
      catchError(() => {
        throw new Error('Cannot connect to the video room server.');
      }),
      tap((response) => {
        if (response.error) {
          throw new Error(response.error.reason);
        }
      }),
    );
  }

  public attachPlugin(sessionId: number, token?: string): Observable<JanMessage> {
    if (!this.socket$) {
      throw new Error('No connection');
    }

    const transaction = uuid();

    const attach: JanMessage = {
      janus: JanEvent.Attach,
      plugin: 'janus.plugin.videoroom',
      transaction,
      session_id: sessionId,
    };

    if (token) {
      attach.token = token;
    }

    this.socket$.next(attach);

    return this.socket$.pipe(
      filter((message) => message.transaction === transaction),
      take(1),
    );
  }

  private keepAlive(sessionId: number, token?: string): void {
    this.keepAliveSubscription = interval(30000)
      .pipe(
        tap((_) => {
          if (!this.socket$) {
            throw new Error('No connection');
          }

          const keepAlive: JanMessage = {
            janus: JanEvent.KeepAlive,
            session_id: sessionId,
            transaction: uuid(),
          };

          if (token) {
            keepAlive.token = token;
          }

          this.socket$.next(keepAlive);
        }),
      )
      .subscribe();
  }

  private observeAttendees(): void {
    if (!this.socket$) {
      throw new Error('No connection');
    }

    this.joiningAttendeesSubscription = this.socket$
      .pipe(
        filter(
          (response) =>
            response.janus === 'event' &&
            response.plugindata?.data?.joining &&
            response.sender === this.mainSession.pluginSessionId,
        ),
        map((response) => [response.plugindata?.data?.joining]),
        map((attendee) =>
          attendee.filter(
            (attendee) =>
              attendee.id !== this.cameraSession?.publisherId &&
              attendee.id !== this.screenSession?.publisherId &&
              attendee.id !== this.videoSession?.publisherId,
          ),
        ),
      )
      .subscribe((attendees) => {
        this.attendees = [...this.attendees, ...attendees];
        this.attendeesSubject.next(this.attendees);
      });

    this.newAttendeesSubscription = this.socket$
      .pipe(
        filter(
          (response) =>
            response.janus === 'event' &&
            response.plugindata?.data?.attendees &&
            response.sender === this.mainSession.pluginSessionId,
        ),
        map((response) => response.plugindata?.data?.attendees),
        map((attendees) =>
          attendees.filter(
            (attendee) =>
              attendee.id !== this.cameraSession?.publisherId &&
              attendee.id !== this.screenSession?.publisherId &&
              attendee.id !== this.videoSession?.publisherId,
          ),
        ),
      )
      .subscribe((attendees) => {
        this.attendees = [...this.attendees, ...attendees];
        this.attendeesSubject.next(this.attendees);
      });
  }

  private observePublishers(): void {
    if (!this.socket$) {
      throw new Error('No connection');
    }

    this.newPublishersSubscription = this.socket$
      .pipe(
        filter(
          (response) =>
            response.janus === 'event' &&
            response.plugindata?.data?.publishers &&
            response.sender === this.mainSession.pluginSessionId,
        ),
        map((response) => response.plugindata?.data?.publishers),
        map((publishers) =>
          publishers.filter(
            (publisher) =>
              publisher.id !== this.cameraSession?.publisherId &&
              publisher.id !== this.screenSession?.publisherId &&
              publisher.id !== this.videoSession?.publisherId,
          ),
        ),
      )
      .subscribe((publishers) => {
        this.publishers = [...this.publishers, ...publishers];
        this.publishersSubject.next(this.publishers);
      });

    this.unpublishedSubscription = this.socket$
      .pipe(
        filter(
          (response) =>
            response.janus === 'event' &&
            (response.plugindata?.data?.unpublished || response.plugindata?.data?.leaving) &&
            response.sender === this.mainSession.pluginSessionId,
        ),
        map((response) => response.plugindata?.data?.unpublished || response.plugindata?.data?.leaving),
      )
      .subscribe((unpublishedId) => {
        this.peerConnections[unpublishedId]?.close();
        delete this.peerConnections[unpublishedId];

        this.publishers = this.publishers.filter((publisher) => publisher.id !== unpublishedId);
        this.publishersSubject.next(this.publishers);

        this.attendees = this.attendees.filter((attendee) => attendee.id !== unpublishedId);
        this.attendeesSubject.next(this.attendees);
      });
  }

  public join(
    session: JanSession,
    asPublisher: boolean,
    id: string | null,
    room: string,
    display?: string,
    feed?: number,
    pin?: string,
    token?: string,
    transaction: string = uuid(),
  ): Observable<number> {
    if (!this.socket$) {
      throw new Error('No connection');
    }

    const message: any = {
      request: 'join',
      ptype: asPublisher ? 'publisher' : 'subscriber',
      room: room.toString(),
    };

    if (pin) {
      message.pin = pin;
    }

    if (display) {
      message.display = display;
    }

    if (feed) {
      message.feed = feed;
    }

    if (id) {
      message.id = id;
    }

    this.send(session, message, token, {}, transaction);

    return this.socket$.pipe(
      filter((response) => response.transaction === transaction && response.janus === 'event'),
      take(1),
      map((response) => response.plugindata.data.id),
    );
  }

  public sendCamera(body: any, token: string) {
    this.send(this.cameraSession, body, token);
  }

  private send(session: JanSession, body: any, token?: string, jsep: object = {}, transaction = uuid()): void {
    if (!this.socket$) {
      throw new Error('No connection');
    }

    const message: any = {
      janus: JanEvent.Message,
      transaction,
      session_id: session.sessionId,
      handle_id: session.pluginSessionId,
      body,
      ...jsep,
    };

    if (token) {
      message.token = token;
    }

    this.socket$.next(message);
  }

  public publishCameraStream(
    stream: MediaStream,
    token: string,
    pubId: string,
    room: string,
    display: string,
    pin: string,
  ): Observable<JanPublishResponse> {
    if (this.cameraSession) {
      throw new Error('Camera stream already published');
    }

    return this.attachPlugin(this.mainSession.sessionId, token).pipe(
      switchMap((response) => {
        this.cameraSession = {
          sessionId: this.mainSession.sessionId,
          pluginSessionId: response.data.id,
        };

        return this.join(this.cameraSession, true, pubId, room, display, null, pin, token);
      }),
      switchMap((publisherId: number) => {
        this.cameraSession.publisherId = publisherId;
        return forkJoin({
          publisherId: of(publisherId),
          peerConnection: this.publishStream(this.cameraSession, stream, token),
        });
      }),
    );
  }

  public publishScreenStream(
    stream: MediaStream,
    token: string,
    pubId: string,
    room: string,
    display: string,
    pin: string,
  ): Observable<JanPublishResponse> {
    if (this.screenSession) {
      throw new Error('Screen stream already published');
    }

    return this.attachPlugin(this.mainSession.sessionId, token).pipe(
      switchMap((response) => {
        this.screenSession = {
          sessionId: this.mainSession.sessionId,
          pluginSessionId: response.data.id,
        };

        return this.join(this.screenSession, true, pubId, room, display, null, pin, token);
      }),
      switchMap((publisherId) => {
        this.screenSession.publisherId = publisherId;

        return forkJoin({
          publisherId: of(publisherId),
          peerConnection: this.publishStream(this.screenSession, stream, token, true),
        });
      }),
    );
  }

  public publishVideoStream(
    stream: MediaStream,
    token: string,
    pubId: string,
    room: string,
    display: string,
    pin: string,
  ): Observable<JanPublishResponse> {
    if (this.videoSession) {
      throw new Error('Video stream already published');
    }

    return this.attachPlugin(this.mainSession.sessionId, token).pipe(
      switchMap((response) => {
        this.videoSession = {
          sessionId: this.mainSession.sessionId,
          pluginSessionId: response.data.id,
        };

        return this.join(this.videoSession, true, pubId, room, display, null, pin, token);
      }),
      switchMap((publisherId) => {
        this.videoSession.publisherId = publisherId;

        return forkJoin({
          publisherId: of(publisherId),
          peerConnection: this.publishStream(this.videoSession, stream, token, false, true),
        });
      }),
    );
  }

  private publishStream(
    session: JanSession,
    stream: MediaStream,
    token?: string,
    isScreenShare = false,
    isVideoShare = false,
  ): Observable<RTCPeerConnection> {
    if (!this.socket$) {
      throw new Error('No connection');
    }

    const offerOptions = { offerToReceiveAudio: !isScreenShare, offerToReceiveVideo: true };

    const publishTransaction = uuid();
    const pc = new RTCPeerConnection({ iceServers: this.iceServers });

    if (isVideoShare) {
      this.videoPeerConnection = pc;
    } else if (isScreenShare) {
      this.screenPeerConnection = pc;
    } else {
      this.cameraPeerConnection = pc;
    }

    stream.getTracks().forEach((track) => {
      if (!pc) {
        throw new Error('Peer connection failed to be created');
      }
      pc.addTrack(track, stream);
    });

    return forkJoin({
      response: this.socket$.pipe(
        filter((message) => message.janus === 'event' && message.transaction === publishTransaction),
        take(1),
        tap((message) => {
          if (!pc) {
            throw new Error('Peer connection failed to be created');
          }
          pc.setRemoteDescription(message.jsep);
        }),
      ),
      connect: from(pc.createOffer(offerOptions)).pipe(
        switchMap((jsep) => {
          if (!pc) {
            throw new Error('Peer connection failed to be created');
          }
          return from(pc.setLocalDescription(jsep)).pipe(map(() => jsep));
        }),
        tap((jsep) => {
          const request: any = {
            request: 'publish',
          };

          if (isVideoShare) {
            request.bitrate = 1024000;
          }

          this.send(session, request, token, { jsep }, publishTransaction);
        }),
      ),
    }).pipe(
      map(({ response }) => {
        if (response.plugindata?.data?.error_code === 432) {
          const error = new Error(response.plugindata.data.error);
          error['code'] = 'JanusRoomFull';
          throw error;
        }
        return pc;
      }),
    );
  }

  public leave(session: JanSession, token: string) {
    this.send(session, { request: 'leave' }, token);
  }

  public subscribeToStream(
    session: JanSession,
    publisher: JanPublisher,
    room: string,
    token?: string,
    pin?: string,
  ): Observable<any> {
    const subscribeTransaction = uuid();

    this.peerConnections[publisher.id] = new RTCPeerConnection({ iceServers: this.iceServers });

    return this.attachPlugin(session.sessionId, token)
      .pipe(
        tap((response: JanMessage) => {
          const newSession: JanSession = {
            sessionId: session.sessionId,
            pluginSessionId: response.data.id,
          };

          if (!this.socket$) {
            throw new Error('No connection');
          }

          forkJoin({
            response: this.socket$.pipe(
              filter((message) => message.janus === 'event' && message.transaction === subscribeTransaction),
              take(1),

              switchMap((message) => from(this.peerConnections[publisher.id].setRemoteDescription(message.jsep))),
              switchMap(() => from(this.peerConnections[publisher.id].createAnswer())),
              switchMap((jsep) =>
                from(this.peerConnections[publisher.id].setLocalDescription(jsep)).pipe(map(() => jsep)),
              ),
              tap((jsep) => this.send(newSession, { request: 'start' }, token, { jsep })),
            ),
            joinAsSubscriber: this.join(
              newSession,
              false,
              null,
              room,
              undefined,
              publisher.id,
              pin,
              token,
              subscribeTransaction,
            ),
          })
            .pipe(
              catchError((error) => {
                return of(null);
              }),
            )
            .subscribe();
        }),
      )
      .pipe(map(() => this.peerConnections[publisher.id]));
  }
}
