/*
 * Decompiled with CFR 0.152.
 */
package jp.ad.sinet.stream.plugins.mqtt;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.SinetStreamIOException;
import jp.ad.sinet.stream.plugins.mqtt.MqttAsyncBaseIO;
import jp.ad.sinet.stream.plugins.mqtt.MqttReader;
import jp.ad.sinet.stream.plugins.mqtt.MqttReaderCallback;
import jp.ad.sinet.stream.plugins.mqtt.SinetMqttMessage;
import jp.ad.sinet.stream.spi.PluginAsyncMessageReader;
import jp.ad.sinet.stream.spi.PluginMessageWrapper;
import jp.ad.sinet.stream.spi.ReaderParameters;
import lombok.Generated;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;

public class MqttAsyncMessageReader
extends MqttAsyncBaseIO
implements PluginAsyncMessageReader,
MqttReader {
    @Generated
    private static final Logger log = Logger.getLogger(MqttAsyncMessageReader.class.getName());
    private final List<String> topics;
    private final Duration receiveTimeout;
    private final ExecutorService executor;
    private List<Consumer<PluginMessageWrapper>> onMessageCallbacks = new CopyOnWriteArrayList<Consumer<PluginMessageWrapper>>();
    private List<Consumer<Throwable>> onFailureCallbacks = new CopyOnWriteArrayList<Consumer<Throwable>>();

    MqttAsyncMessageReader(ReaderParameters parameters) {
        super(parameters.getService(), parameters.getConsistency(), parameters.getClientId(), parameters.getConfig(), parameters.getValueType(), parameters.isDataEncryption());
        this.topics = Collections.unmodifiableList(parameters.getTopics());
        this.receiveTimeout = parameters.getReceiveTimeout();
        this.executor = Executors.newSingleThreadExecutor();
        ((MqttAsyncClient)this.client).setCallback((MqttCallback)new MqttReaderCallback(this));
        this.connect();
    }

    @Override
    protected void doClose() {
        super.doClose();
        this.executor.shutdown();
    }

    @Override
    public void onMessageArrived(SinetMqttMessage message) {
        for (Consumer<PluginMessageWrapper> action : this.onMessageCallbacks) {
            this.executor.submit(() -> action.accept(message));
        }
    }

    @Override
    public void onConnectionLost(Throwable cause) {
        for (Consumer<Throwable> action : this.onFailureCallbacks) {
            this.executor.submit(() -> action.accept(cause));
        }
    }

    @Override
    public void connect() {
        super.connect();
        try {
            int[] qos = new int[this.topics.size()];
            Arrays.fill(qos, this.getConsistency().getQos());
            log.fine(() -> "MQTT subscribe: " + this.getClientId() + ": " + this.getTopic());
            ((MqttAsyncClient)this.client).subscribe(this.topics.toArray(new String[0]), qos).waitForCompletion();
        }
        catch (MqttException e) {
            throw new SinetStreamIOException((Throwable)e);
        }
        this.reconnectDelay = this.reconnectMinDelay;
    }

    @Override
    public void disconnect() throws MqttException {
        ((MqttAsyncClient)this.client).disconnect().waitForCompletion();
    }

    public String getTopic() {
        return String.join((CharSequence)",", this.topics);
    }

    public void addOnMessageCallback(Consumer<PluginMessageWrapper> onMessage, Consumer<Throwable> onFailure) {
        if (Objects.nonNull(onMessage)) {
            this.onMessageCallbacks.add(onMessage);
        }
        if (Objects.nonNull(onFailure)) {
            this.onFailureCallbacks.add(onFailure);
        }
    }

    public void removeOnMessageCallback(Consumer<PluginMessageWrapper> onMessage, Consumer<Throwable> onFailure) {
        if (Objects.nonNull(onMessage)) {
            this.onMessageCallbacks.remove(onMessage);
        }
        if (Objects.nonNull(onFailure)) {
            this.onFailureCallbacks.remove(onFailure);
        }
    }

    public void clearOnMessageCallback() {
        this.onMessageCallbacks.clear();
        this.onFailureCallbacks.clear();
    }

    @Generated
    public List<String> getTopics() {
        return this.topics;
    }

    @Generated
    public Duration getReceiveTimeout() {
        return this.receiveTimeout;
    }
}

