/*
 * Decompiled with CFR 0.152.
 */
package jp.ad.sinet.stream.plugins.mqtt;

import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.Consistency;
import jp.ad.sinet.stream.api.SinetStreamIOException;
import jp.ad.sinet.stream.plugins.mqtt.MqttAsyncBaseIO;
import jp.ad.sinet.stream.spi.PluginAsyncMessageWriter;
import jp.ad.sinet.stream.spi.WriterParameters;
import jp.ad.sinet.stream.utils.MessageUtils;
import jp.ad.sinet.stream.utils.Timestamped;
import lombok.Generated;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.jdeferred2.DeferredCallable;
import org.jdeferred2.DeferredManager;
import org.jdeferred2.Promise;
import org.jdeferred2.impl.DefaultDeferredManager;

public class MqttAsyncMessageWriter
extends MqttAsyncBaseIO
implements PluginAsyncMessageWriter {
    @Generated
    private static final Logger log = Logger.getLogger(MqttAsyncMessageWriter.class.getName());
    private final String topic;
    private final DefaultDeferredManager manager;
    private final Semaphore sem;

    MqttAsyncMessageWriter(WriterParameters parameters) {
        super(parameters.getService(), parameters.getConsistency(), parameters.getClientId(), parameters.getConfig(), parameters.getValueType(), parameters.isDataEncryption());
        this.topic = parameters.getTopic();
        this.connect();
        this.manager = this.createDeferredManager();
        this.sem = new Semaphore(this.getPermitNum(), true);
    }

    private int getPermitNum() {
        if (Consistency.AT_MOST_ONCE.equals((Object)this.consistency)) {
            return this.connectOptions.getMaxInflight();
        }
        int ret = this.connectOptions.getMaxInflight() / 2;
        return ret > 0 ? ret : 1;
    }

    private DefaultDeferredManager createDeferredManager() {
        int nthreads = MessageUtils.toInteger((Object)this.getConfig().getOrDefault("thread_pool_num", 4));
        return new DefaultDeferredManager(Executors.newFixedThreadPool(nthreads));
    }

    @Override
    protected void doClose() {
        super.doClose();
        this.manager.shutdown();
    }

    public Promise<Void, Throwable, Void> write(Timestamped<byte[]> message) {
        try {
            this.sem.acquire();
            log.finer(() -> "MQTT publish: " + this.getClientId() + ": " + Arrays.toString((byte[])message.getValue()));
            final IMqttDeliveryToken token = this.publish((byte[])message.getValue());
            return this.manager.when((DeferredCallable)new DeferredCallable<Void, Void>(DeferredManager.StartPolicy.AUTO){

                public Void call() {
                    try {
                        token.waitForCompletion();
                        Void void_ = null;
                        return void_;
                    }
                    catch (Throwable e) {
                        throw new SinetStreamIOException(e);
                    }
                    finally {
                        MqttAsyncMessageWriter.this.sem.release();
                    }
                }
            });
        }
        catch (Throwable e) {
            this.sem.release();
            return this.manager.reject((Object)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private IMqttDeliveryToken publish(byte[] message) {
        try {
            MqttAsyncMessageWriter mqttAsyncMessageWriter = this;
            synchronized (mqttAsyncMessageWriter) {
                return ((MqttAsyncClient)this.client).publish(this.topic, message, this.consistency.getQos(), this.retain);
            }
        }
        catch (MqttException e) {
            throw new SinetStreamIOException((Throwable)e);
        }
    }

    @Generated
    public String getTopic() {
        return this.topic;
    }
}

