import { Inject, Injectable } from '@angular/core';
import { BehaviorSubject, EMPTY, Observable, Subject, combineLatest, interval, merge, throwError, timer } from 'rxjs';
import {
  filter,
  finalize,
  first,
  map,
  mapTo,
  mergeMap,
  publishBehavior,
  refCount,
  repeatWhen,
  retryWhen,
  takeUntil,
  takeWhile,
  tap,
} from 'rxjs/operators';

import { AuthUser } from '../models/auth.model';
import { Device, UA_DEVICE_TYPE } from '../models/device.model';
import { ENVIRONMENT, Environment } from '../models/environment.model';
import { NeverError } from '../models/error.model';
import { WebSocketMessage, WebSocketSubject, WebSocketSyncData } from '../models/web-socket.model';
import { AuthUsecase } from '../usecases/auth.usecase';
import { ProgressUsecase } from '../usecases/progress.usecase';
import { WebSocketUsecase } from '../usecases/web-socket.usecase';

const RETRY_LIMIT = 3;
const INIT_DELAY = 1000;

const KEEP_ALIVE_MESSAGE = JSON.stringify({ action: 'keepalive' });

@Injectable()
export class WebSocketInteractor extends WebSocketUsecase {
  get enabled(): boolean {
    return this._enabled;
  }
  get message$(): Observable<WebSocketMessage> {
    return this._message;
  }
  get error$(): Observable<CloseEvent> {
    return this._error;
  }
  get close$(): Observable<void> {
    return this._close;
  }
  get isOpen$(): Observable<boolean> {
    return this._isOpen$;
  }

  private readonly _enabled: boolean;
  private readonly _message = new Subject<WebSocketMessage>();
  private readonly _open = new Subject<void>();
  private readonly _error = new Subject<CloseEvent>();
  private readonly _close = new Subject<void>();
  private readonly _isOpen$: Observable<boolean>;
  private readonly _connectionId = new BehaviorSubject<string>('none');
  private _webSocket?: WebSocketSubject;
  private _authUser?: AuthUser;

  private _retryCount = 0;

  constructor(
    @Inject(ENVIRONMENT) private _environment: Environment,
    private _authUsecase: AuthUsecase,
    private _progressUsecase: ProgressUsecase,
  ) {
    super();
    const open$ = this._open.pipe(mapTo(true));
    const close$ = merge(this.error$, this.close$).pipe(mapTo(false));
    this._isOpen$ = merge(open$, close$).pipe(publishBehavior<boolean>(false), refCount());
    this._enabled = !!this._environment.wssUrl;
    if (!this.enabled) {
      return;
    }

    this._authUsecase.payload$
      .pipe(
        mergeMap(payload => (payload ? interval((parseInt(payload.keepaliveInterval, 10) || 30) * 1000) : EMPTY)),
        takeUntil(close$),
        retryWhen(() => open$),
        repeatWhen(() => open$),
      )
      .subscribe(() => this._webSocket?.next(KEEP_ALIVE_MESSAGE));

    this._authUsecase.authState$.subscribe(({ status, user }) => {
      switch (status) {
        case 'signedIn':
          this._authUser = user;
          this.onSignIn();
          break;
        case 'signIn':
          this._authUser = undefined;
          this.onSignOut();
          break;
        default:
          // nop
          break;
      }
    });

    this.message$
      .pipe(
        filter(message => message.action === 'sync' && message.data?.source === 'device'),
        map(({ data }) => data as WebSocketSyncData<Device>),
      )
      .subscribe(data => {
        switch (data.reason) {
          case 'create':
          case 'update': {
            const device = data.payload as Device;
            if (this._authUser && device.owner === this._authUser?.attributes?.name && device.type === UA_DEVICE_TYPE) {
              this._connectionId.next(device.connectionId);
            }
            break;
          }
          case 'delete':
            // nop
            break;
          default:
            throw new NeverError(data.reason);
        }
      });
  }

  retry(): void {
    if (this._webSocket && !this._webSocket.observers.length) {
      this._webSocket.unsubscribe();
      this._webSocket = undefined;
    }
    this._authUsecase.token$.pipe(first()).subscribe({
      next: () => this.onSignIn(),
      error: () => this._error.next(),
    });
  }

  sendMessage(message: WebSocketMessage): void {
    this._webSocket?.next(JSON.stringify(message));
  }

  private onSignIn(): void {
    this._retryCount = 0;
    const progressId = this._progressUsecase.show();
    const token$ = this._authUsecase.token$;
    const connectionId$ = this._connectionId.pipe(map(connectionId => encodeURIComponent(connectionId)));
    const protocols$ = combineLatest([token$, connectionId$]);
    const ws = new WebSocketSubject(this._environment.wssUrl, protocols$);
    ws.pipe(
      tap(message => {
        if (message == null) {
          this._retryCount = 0;
          this._progressUsecase.dismiss(progressId);
          this._open.next();
        }
      }),
      filter((message): message is string => typeof message === 'string'),
      map(message => JSON.parse(message)),
      retryWhen(errors => errors.pipe(mergeMap(error => this.expBackoff(error)))),
      repeatWhen(notifications => notifications.pipe(takeWhile(() => !ws.closed))),
      finalize(() => this._progressUsecase.dismiss(progressId)),
    ).subscribe({
      next: this._message.next.bind(this._message),
      error: this._error.next.bind(this._error),
      complete: this._close.next.bind(this._close),
    });
    this._webSocket = ws;
  }

  private onSignOut(): void {
    if (this._webSocket) {
      this._webSocket.unsubscribe();
      this._webSocket = undefined;
    }
    this._connectionId.next('none');
  }

  private expBackoff(error: unknown): Observable<number | never> {
    return this._retryCount < RETRY_LIMIT ? timer(INIT_DELAY * Math.pow(2, this._retryCount++)) : throwError(error);
  }
}
