
import { EventEmitter, Injectable, NgZone } from "@angular/core";
import { environment } from "src/environments/environment";
import { UserSession } from "../_models";
import { Observable } from "rxjs";
import { Subject } from 'rxjs';
 
@Injectable()
export class SSEService {
    private eventSource: EventSource;
    //public message = new Subject<string>();    
    constructor(private zone: NgZone) {}

    observeMessages(): Observable<string> {
        return new Observable<string>(observer => {
            this.eventSource = this.getEventSource();
            
            this.eventSource.onopen = (ev) => {
              if(!environment.production) console.log('Connection to server opened.', ev);
            };
            
            this.eventSource.onerror = (ev) => {
              if(!environment.production) console.log('EventSource failed.', ev);
            };

            this.eventSource.addEventListener('message',(evt : any) => {
              if(!environment.production) console.log("sse", String(evt.data));

              this.zone.run(() => {
                observer.next(evt.data);
              });
            });
          });
    }
    
    private getEventSource(): EventSource {
        let userSession = JSON.parse(localStorage.getItem('currentUser'));
        let url: string = `${environment.sseUrl}/stream?accessToken=${userSession.accessToken}`;
        if (this.eventSource) {
            return this.eventSource;
        } else {
            this.eventSource = new EventSource(url);
            return this.eventSource;
        }        
    }
}
