import { Injectable, OnDestroy, Inject, NgZone } from '@angular/core';
import { Observable, SubscriptionLike, Subject, Observer, interval } from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';

import { share, distinctUntilChanged, takeWhile } from 'rxjs/operators';
import { IWebsocketService, IWsMessage, WebSocketConfig } from './websocket.interface';
import { config } from './websocket.config';
import { AuthService } from '@services/auth';

@Injectable({
  providedIn: 'root',
})
export class WebsocketService implements IWebsocketService, OnDestroy {
  public status: Observable<boolean | string>;

  private config: WebSocketSubjectConfig<IWsMessage<any>>;

  private reconnectAttempts: number;

  private websocketSub: SubscriptionLike;

  private statusSub: SubscriptionLike;

  private reconnection$!: Observable<number> | null;

  private websocket$!: WebSocketSubject<IWsMessage<any>> | null;

  private connection$!: Observer<boolean>;

  private wsMessages$: Subject<IWsMessage<any>>;

  private reconnectInterval: number;

  private isConnected!: boolean | string;

  constructor(
    @Inject(config) private wsConfig: WebSocketConfig,
    private readonly authService: AuthService,
    private ngZone: NgZone,
  ) {
    this.ngZone.runOutsideAngular(() => {
      this.ngZone.run(() => {
        this.wsMessages$ = new Subject<IWsMessage<any>>();

        this.reconnectInterval = wsConfig.reconnectInterval || 5000; // pause between connections
        this.reconnectAttempts = wsConfig.reconnectAttempts || 10; // number of connection attempts

        this.config = {
          url: '',
          closeObserver: {
            next: () => {
              this.websocket$ = null;
              this.connection$.next(false);
            },
          },
          openObserver: {
            next: () => {
              console.log('WebSocket connected!');
              this.connection$.next(true);
            },
          },
        };

        // connection status
        this.status = new Observable<boolean>(observer => {
          this.connection$ = observer;
        }).pipe(share(), distinctUntilChanged());

        // run reconnect if not connection
        this.statusSub = this.status.subscribe(isConnected => {
          this.isConnected = isConnected;

          if (!this.reconnection$ && typeof isConnected === 'boolean' && !isConnected) {
            this.reconnect();
          }
        });

        this.websocketSub = this.wsMessages$.subscribe({
          next: () => null,
          error: (error: ErrorEvent) => console.error('WebSocket error!', error),
        });

        this.authService.tokens$.subscribe(({ access }) => {
          if (!access) {
            this.disconnect();
          } else {
            this.config.url = `${wsConfig.url}events/?auth_token=${access}`;
            this.connect();
          }
        });
      });
    });
  }

  ngOnDestroy() {
    this.websocketSub.unsubscribe();
    this.statusSub.unsubscribe();
  }

  /*
   * on message event
   * */
  public on<T = any>(): Observable<IWsMessage<T>> {
    return this.wsMessages$.asObservable();
  }

  /*
   * on message to server
   * */
  public send(event: string, data: any = {}): void {
    if (event && this.isConnected) {
      this.websocket$?.next(JSON.stringify({ event, data }) as any);
    } else {
      console.error('Send error!');
    }
  }

  /*
   * disconnect websocket
   * */
  public disconnect(): void {
    this.websocketSub?.unsubscribe();
    this.statusSub?.unsubscribe();
    this.websocket$?.complete();
  }

  /*
   * connect to WebSocked
   * */
  private connect(): void {
    this.websocket$ = new WebSocketSubject(this.config);
    this.websocket$.subscribe({
      next: message => this.wsMessages$.next(message),
      error: () => {
        if (!this.websocket$) {
          // run reconnect if errors
          this.reconnect();
        }
      },
    });
  }

  /*
   * reconnect if not connecting or errors
   * */
  private reconnect(): void {
    this.reconnection$ = interval(this.reconnectInterval).pipe(
      takeWhile((v, index) => index < this.reconnectAttempts && !this.websocket$),
    );

    this.reconnection$.subscribe({
      next: () => this.connect(),
      error: () => null,
      complete: () => {
        // Subject complete if reconnect attempts ending
        this.reconnection$ = null;
        if (!this.websocket$) {
          this.wsMessages$.complete();
          this.connection$.complete();
        }
      },
    });
  }
}
