package jp.ad.sinet.stream.plugins.mqtt;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.SinetStreamIOException;
import jp.ad.sinet.stream.spi.PluginMessageReader;
import jp.ad.sinet.stream.spi.PluginMessageWrapper;
import jp.ad.sinet.stream.spi.ReaderParameters;
import lombok.Generated;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;

/* loaded from: input_file:jp/ad/sinet/stream/plugins/mqtt/MqttMessageReader.class */
public class MqttMessageReader extends MqttSyncBaseIO implements PluginMessageReader, MqttReader {

    @Generated
    private static final Logger log = Logger.getLogger(MqttMessageReader.class.getName());
    private final Lock lock;
    private final Condition connectionChanged;
    private boolean connected;
    private final List<String> topics;
    private final Duration receiveTimeout;
    private final BlockingQueue<SinetMqttMessage> queue;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MqttMessageReader(ReaderParameters readerParameters) {
        super(readerParameters.getService(), readerParameters.getConsistency(), readerParameters.getClientId(), readerParameters.getConfig(), readerParameters.getValueType(), readerParameters.isDataEncryption());
        this.lock = new ReentrantLock();
        this.connectionChanged = this.lock.newCondition();
        this.connected = false;
        this.topics = Collections.unmodifiableList(readerParameters.getTopics());
        this.receiveTimeout = readerParameters.getReceiveTimeout();
        this.queue = new LinkedBlockingQueue();
        ((MqttClient) this.client).setCallback(new MqttReaderCallback(this));
        connect();
    }

    @Override // jp.ad.sinet.stream.plugins.mqtt.AbstractMqttIO, jp.ad.sinet.stream.plugins.mqtt.MqttReader
    public void connect() {
        super.connect();
        this.lock.lock();
        while (!this.connected) {
            try {
                this.connectionChanged.awaitUninterruptibly();
            } finally {
                this.lock.unlock();
            }
        }
    }

    @Override // jp.ad.sinet.stream.plugins.mqtt.MqttReader
    public void onConnectionComplete(boolean z, String str) {
        subscribe();
        this.lock.lock();
        try {
            this.connected = true;
            this.connectionChanged.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private void subscribe() {
        try {
            int[] iArr = new int[this.topics.size()];
            Arrays.fill(iArr, getConsistency().getQos());
            log.fine(() -> {
                return "MQTT subscribe: " + getClientId() + ": " + getTopic();
            });
            ((MqttClient) this.client).subscribe((String[]) this.topics.toArray(new String[0]), iArr);
        } catch (MqttException e) {
            throw new SinetStreamIOException(e);
        }
    }

    @Override // jp.ad.sinet.stream.plugins.mqtt.MqttReader
    public void disconnect() throws MqttException {
        ((MqttClient) this.client).disconnect();
        this.lock.lock();
        try {
            this.connected = false;
            this.connectionChanged.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    @Override // jp.ad.sinet.stream.plugins.mqtt.MqttReader
    public void onMessageArrived(SinetMqttMessage sinetMqttMessage) {
        try {
            this.queue.put(sinetMqttMessage);
        } catch (InterruptedException e) {
            throw e;
        }
    }

    private String getTopic() {
        return String.join(",", this.topics);
    }

    public PluginMessageWrapper read() {
        try {
            return this.queue.poll(this.receiveTimeout.toNanos(), TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            throw new SinetStreamIOException(e);
        }
    }

    @Generated
    public List<String> getTopics() {
        return this.topics;
    }

    @Generated
    public Duration getReceiveTimeout() {
        return this.receiveTimeout;
    }
}
