import { Injectable, Inject } from "@angular/core";
import { HttpClient, HttpHeaders, HttpParams } from "@angular/common/http";
import { Observable } from "rxjs";
import { Paho } from "ng2-mqtt/mqttws31";

@Injectable({
  providedIn: "root",
})
export class MqttService {
  broker: string;
  topicList: Array<Number>;
  client: any;

  constructor(@Inject(String) broker: string, topics: Array<Number>) {
    this.broker = broker;
    this.topicList = topics;
  }
  listen() {
    return new Observable((subscriber) => {
      let clientId = JSON.parse(localStorage.getItem("user")).mobile;
      this.client = new Paho.MQTT.Client(
        this.broker,
        8084,
        `${clientId}_${Date.now()}`
      );

      this.client.onConnectionLost = (responseObject) => {
        if (responseObject.errorCode !== 0) {
          console.log("onConnectionLost:" + responseObject.errorMessage);
        }
      };

      this.client.onMessageArrived = (message) => {
        let data = null;
        let fixedData = message.payloadString.replaceAll("NaN", "null");
        try {
          data = JSON.parse(fixedData);
        } catch (error) {
          console.error(error);
        }
        data.retained = message.retained
        // var containsNull = false;
        // Object.keys(data).forEach((key) => {
        //   if (data.key == null) {
        //     containsNull = true;
        //   }
        // });
        localStorage.setItem("lastMessage", JSON.stringify(data))
        if (Boolean(data)) subscriber.next(data);
      };

      this.client.connect({
        onSuccess: async () => {
          for (let i = 0; i < this.topicList.length; i++) {
            await this.client.subscribe("raw/" + this.topicList[i]);
            console.log(this.topicList[i]);
          }
        },
        userName: "backend",
        password: "backend",
        useSSL: true,
      });
    });
  }

  listenTYB() {
    return new Observable((subscriber) => {
      let clientId = JSON.parse(localStorage.getItem("user")).mobile;
      this.client = new Paho.MQTT.Client(
        this.broker,
        8084,
        `${clientId}_${Date.now()}`
      );

      this.client.onConnectionLost = (responseObject) => {
        if (responseObject.errorCode !== 0) {
          console.log("onConnectionLost:" + responseObject.errorMessage);
        }
      };

      this.client.onMessageArrived = (message) => {
        let data = null;
        let fixedData = message.payloadString.replaceAll("NaN", "null");
        try {
          data = JSON.parse(fixedData);
        } catch (error) {
          console.error(error);
        }
        data.retained = message.retained
        // var containsNull = false;
        // Object.keys(data).forEach((key) => {
        //   if (data.key == null) {
        //     containsNull = true;
        //   }
        // });
        
        if (Boolean(data)) subscriber.next(data);
      };

      this.client.connect({
        onSuccess: async () => {
          for (let i = 0; i < this.topicList.length; i++) {
            await this.client.subscribe("tyb/server/" + this.topicList[i]);
            console.log(this.topicList[i]);
          }
        },
        userName: "backend",
        password: "backend",
        useSSL: true,
      });
    });
  }

  publish(topicName:string, messsage: any, cb?: any){
    console.log("We sent" , topicName, messsage);
    
    let message = new Paho.MQTT.Message(JSON.stringify(messsage))
    message.destinationName = topicName;
    this.client.send(message)
  }

  close() {
    console.log("CLosing conn..");
    return new Promise(async (resolve, reject) => {
      if (this.client && this.client.connected) {
        for (let i = 0; i < this.topicList.length; i++) {
          await this.client.unsubscribe(this.topicList[i]);
        }

        this.client.disconnect();
        this.client.end();
      }
    });
  }
}
