import { Injectable } from '@angular/core';
import { EventSourceMessage, fetchEventSource } from '@microsoft/fetch-event-source';
import { environment } from '@env/environment';
import { BehaviorSubject } from 'rxjs';
import { take } from 'rxjs/operators';
// services
import { AuthApiService } from '@app/auth/services';

class RetriableError extends Error {}
class FatalError extends Error {
  constructor(message: string) {
    super();
    this.name = 'SSEReconnectError';
    this.message = message;
  }
}

@Injectable({
  providedIn: 'root',
})
export class SseService {
  private ctrl: AbortController | undefined;

  private $connectFlag: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(true);
  public connectFlag$ = this.$connectFlag.asObservable();

  constructor(private readonly authApiService: AuthApiService) {}

  setConnectFlag(flag: boolean) {
    this.$connectFlag.next(flag);
  }

  closeEventSource() {
    if (this.ctrl) {
      this.ctrl.abort();
      this.ctrl = undefined;
    }
  }

  // using fetch-event-source
  async fetchEventSource(url: string, token: string, onMessage: (ev: EventSourceMessage) => void) {
    this.ctrl = new AbortController();
    return await fetchEventSource(`${environment.apiUrl}${url}`, {
      method: 'GET',
      headers: {
        Authorization: `Bearer ${token}`,
      },
      signal: this.ctrl.signal,
      // The keepalive option can be used to allow the request to outlive the page.
      // keepalive: true,
      openWhenHidden: true,
      onopen: async (res) => {
        if (res.ok && res.status === 200) {
          this.setConnectFlag(false);
        } else if (res.status === 401) {
          // refresh token by fetch current user info
          // refresh-token-interceptor will 401 and refresh access token
          this.authApiService
            .getUserInfo()
            .pipe(take(1))
            .subscribe(() => {
              // after refresh token, retry to connect
              this.setConnectFlag(true);
            });
          throw new FatalError('can not reconnect sse due to 401');
        } else {
          throw new RetriableError();
        }
      },
      onmessage: onMessage,
      onerror: (err) => {
        if (err instanceof FatalError) {
          // rethrow to stop the operation
          throw err;
        } else {
          console.warn('sse onerror', err);
          // do nothing to automatically retry. You can also
          // return a specific retry interval here.
          // e.g. return 5000; // retry after 5 seconds
          return 5000;
        }
      },
      onclose: () => {
        // If the server sends a close signal, onclose is called. (Assumption)
        // This case needs to be verified in the production environment to see if it actually occurs.
        console.warn('sse onclose');
        this.closeEventSource();
      },
    });
  }
}
