/*
 * Decompiled with CFR 0.152.
 */
package jp.ad.sinet.stream.plugins.mqtt;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.SinetStreamIOException;
import jp.ad.sinet.stream.plugins.mqtt.MqttReader;
import jp.ad.sinet.stream.plugins.mqtt.MqttReaderCallback;
import jp.ad.sinet.stream.plugins.mqtt.MqttSyncBaseIO;
import jp.ad.sinet.stream.plugins.mqtt.SinetMqttMessage;
import jp.ad.sinet.stream.spi.PluginMessageReader;
import jp.ad.sinet.stream.spi.PluginMessageWrapper;
import jp.ad.sinet.stream.spi.ReaderParameters;
import lombok.Generated;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;

public class MqttMessageReader
extends MqttSyncBaseIO
implements PluginMessageReader,
MqttReader {
    @Generated
    private static final Logger log = Logger.getLogger(MqttMessageReader.class.getName());
    private final List<String> topics;
    private final Duration receiveTimeout;
    private final BlockingQueue<SinetMqttMessage> queue;

    MqttMessageReader(ReaderParameters parameters) {
        super(parameters.getService(), parameters.getConsistency(), parameters.getClientId(), parameters.getConfig(), parameters.getValueType(), parameters.isDataEncryption());
        this.topics = Collections.unmodifiableList(parameters.getTopics());
        this.receiveTimeout = parameters.getReceiveTimeout();
        this.queue = new LinkedBlockingQueue<SinetMqttMessage>();
        ((MqttClient)this.client).setCallback((MqttCallback)new MqttReaderCallback(this));
        this.connect();
    }

    @Override
    public void connect() {
        super.connect();
        try {
            int[] qos = new int[this.topics.size()];
            Arrays.fill(qos, this.getConsistency().getQos());
            log.fine(() -> "MQTT subscribe: " + this.getClientId() + ": " + this.getTopic());
            ((MqttClient)this.client).subscribe(this.topics.toArray(new String[0]), qos);
        }
        catch (MqttException e) {
            throw new SinetStreamIOException((Throwable)e);
        }
        this.reconnectDelay = this.reconnectMinDelay;
    }

    @Override
    public void disconnect() throws MqttException {
        ((MqttClient)this.client).disconnect();
    }

    @Override
    public void onMessageArrived(SinetMqttMessage message) {
        this.queue.put(message);
    }

    private String getTopic() {
        return String.join((CharSequence)",", this.topics);
    }

    public PluginMessageWrapper read() {
        try {
            return this.queue.poll(this.receiveTimeout.toNanos(), TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException e) {
            throw new SinetStreamIOException((Throwable)e);
        }
    }

    @Generated
    public List<String> getTopics() {
        return this.topics;
    }

    @Generated
    public Duration getReceiveTimeout() {
        return this.receiveTimeout;
    }
}

