import { Injectable } from '@angular/core';
import { QueueingSubject } from 'queueing-subject';
import { Observable } from 'rxjs';
import { StompService } from 'ng2-stomp-service';
import { Subscription } from 'rxjs';
import { interval } from 'rxjs';
import { HttpService } from './http.service';
import { Logger } from './logger.service';
import { AuthService } from '../auth/auth.service';
import { NotificationService } from './notification.service';
import { AppConstants, JsonHelper } from '../util/index';
import { EventNotification, User, ResponseJson } from '../models/index';

@Injectable()
export class EventService {
  private inputStream: QueueingSubject<any>;
  public outputStream: Observable<any>;
  private subscription: Subscription;
  private subscription2: Subscription;
  private lastQueryTime = '';
  public pullUserEventUpdate: Subscription;
  private userEvents: Array<EventNotification> = [];

  constructor(private httpService: HttpService, private stomp: StompService, private log: Logger, private notifService: NotificationService) {
    stomp.configure({
      host: AppConstants.WebSocketEndPoint,
      debug: false,
      queue: { 'init': false }
    });
  }

  pullUserEvents() {
      this.pullUserEventUpdate = interval(10000).subscribe(() => {
        this.queryUserEvents();
    });
  }

  private queryUserEvents() {
    this.httpService.get(AppConstants.QueryUserEventsURL + '?lastQueryTime=' + this.lastQueryTime, [this.postQueryEventsHandler]);
  }

  private postQueryEventsHandler = (responseJson: ResponseJson) => {
    if (responseJson.status === AppConstants.SUCCESS) {
      this.lastQueryTime = responseJson.message;
      this.userEvents = JsonHelper.parseArray(responseJson.data, EventNotification);

      this.userEvents.forEach(event => {
        if (event.eventType === 'RefreshData') {
          return;
        }

        if (event.severity > 2) {
          this.notifService.addSticky({ severity: 'info', summary: '', detail: event.message });
        } else {
          this.notifService.add({ severity: 'info', summary: '', detail: event.message });
        }
        this.log.debug(JsonHelper.toJson(event));
      });
    }
  }

  connect(loginUser: User) {
    this.stomp.startConnect().then(() => {
      this.stomp.done('init');

      this.subscription = this.stomp.subscribe('/topic/partitionEvent.' + loginUser.customerId, this.response);
      // this.subscription2 = this.stomp.subscribe('/topic/dataSetUpdateEvent', this.dataSetEventResponse);
      //this.stomp.send('/app/lastDataSetUpdate', {});
    });
  }

  disconnect() {
    if (this.subscription) {
      this.subscription.unsubscribe();
      this.subscription2.unsubscribe();
      this.stomp.disconnect().then(() => {
        this.log.debug('socket connection closed');
      });
    }
  }

  response = (data) => {
    const event = JsonHelper.parse(data, EventNotification);

    if (event.severity > 2) {
      this.notifService.addSticky({ severity: 'info', summary: '', detail: event.message });
    } else {
      this.notifService.add({ severity: 'info', summary: '', detail: event.message });
    }
    this.log.debug(JsonHelper.toJson(event));
  }

  dataSetEventResponse = (data) => {
    const event = JsonHelper.parse(data, EventNotification);
    this.log.debug(JsonHelper.toJson(event));
    this.log.debug('last data set refresh time: ' + event.createTimestamp);
  }
}




