import { ReplaySubject } from 'rxjs';
import { Injectable } from '@angular/core';
import { filter, take } from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class NotifierService {
  private eventBusOpenSubject = new ReplaySubject<any>(1);
  readonly eventBusOpenObs = this.eventBusOpenSubject.asObservable();
  private eventBus: any;

  registerHandler(name: string, handler: (error: any, message: any) => void): void {
    if (this.eventBus?.state !== 1) {
      console.warn('[notifier event bus] [registerHandler] event bus is not OPEN');
      return;
    }
    if (this.eventBus.handlers[name] && this.eventBus.handlers[name].includes(handler)) {
      console.log('[notifier event bus] handler exists, skipped: ', name);
      return;
    }
    console.log('[notifier event bus] handler register: ', name);
    try {
      this.eventBus.registerHandler(name, handler);
    } catch (e) {
      console.error(e);
    }
  }

  addHandler(params: {name: string; headers?: any; handler: (error: any, message: any) => void}): void {
    if (this.eventBus?.state !== 1) {
      console.warn('[notifier event bus] [registerHandler] event bus is not OPEN');
      return;
    }
    if (this.eventBus.handlers[params.name] && this.eventBus.handlers[params.name].includes(params.handler)) {
      console.log('[notifier event bus] handler exists, skipped: ', params.name);
      return;
    }
    console.log('[notifier event bus] handler register: ', params.name);
    try {
      this.eventBus.registerHandler(params.name, params.headers, (err: any, msg: any) => params.handler(err,msg));
    } catch (e) {
      console.error(e);
    }
  }

  unregisterHandler(name: string, handler: (error: any, message: any) => void) {
    if (this.eventBus?.state !== 1) {
      console.warn('[notifier event bus] [registerHandler] event bus is not OPEN');
      return;
    }
    console.log('[notifier event bus] handler unregister: ', name);
    try {
      this.eventBus.unregisterHandler(name, handler);
    } catch (e) {
      console.error(e);
    }
  }

  unregisterHandlers(regexp: RegExp): void {
    if (this.eventBus?.state !== 1) {
      console.warn('[notifier event bus] [registerHandler] event bus is not OPEN');
      return;
    }
    console.log('[notifier event bus] handlers unregister: ', regexp);
    try {
      Object.keys(this.eventBus.handlers)
        .filter(handlerName => regexp.test(handlerName))
        .forEach(handlerName => {
          const handlerFnArr: Array<any> = this.eventBus.handlers[handlerName];
          handlerFnArr.forEach(handlerFn => this.eventBus.unregisterHandler(handlerName, handlerFn));
        });
      console.log('[notifier event bus] handlers: ', Object.keys(this.eventBus.handlers));
    } catch (e) {
      console.error(e);
    }
  }

  start() {
    // @ts-ignore
    import('@vertx/eventbus-bridge-client.js')
      .then(module => module.default)
      .then(eventBusModule => {
        console.info('[notifier event bus] enabled?', localStorage['enableNotifier'] !== 'false', localStorage['enableNotifier']);
        if (localStorage['enableNotifier'] !== 'false') {
          this.eventBus = new eventBusModule(`/api/notifier/eventbus`);
          this.eventBus.enableReconnect(true);
          const component = this;
          this.eventBus.onopen = () => {
            console.log('[notifier event bus] onopen fired');
            component.eventBusOpenSubject.next(this.eventBus);
          };
          this.eventBus.onclose = () => {
            console.log('[notifier event bus] onclose fired');
            component.eventBusOpenSubject.next(undefined);
          };
        } else {
          console.warn('[notifier event bus] localStorage.enableNotifier is "false"');
        }
      });
  }

  destroy() {
    try {
      this.eventBus?.close();
    } catch (e) {
      console.error(e);
    }
  }

  send(address: string, message: any, headers: any): Promise<any> {
    return new Promise<any>((resolve, reject) => {
      this.eventBusOpenObs.pipe(filter(eb => eb !== undefined), take(1)).subscribe(eb => {
        if (eb.state === 1) {
          console.log('[notifier event bus] send: ', address, message, headers);
          try {
            eb.send(address, message, headers, (error: any, response: any) => {
              if (error) {
                reject(error);
              } else {
                resolve(response);
              }
            });
          } catch (e) {
            console.error(e);
            reject(e);
          }
        } else {
          console.warn('[notifier event bus] [send] event bus is not OPEN');
          reject('eb:closed');
        }
      });
    });
  }
}
