package it.netgrid.bauer.impl.impl;

import com.google.inject.Inject;
import it.netgrid.bauer.impl.MqttClientManager;
import it.netgrid.bauer.impl.MqttMessageConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttPersistenceException;
import org.eclipse.paho.mqttv5.common.MqttSecurityException;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:it/netgrid/bauer/impl/impl/ThreadedMqttClientManager.class */
public class ThreadedMqttClientManager implements MqttClientManager, Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ThreadedMqttClientManager.class);
    private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() { // from class: it.netgrid.bauer.impl.impl.ThreadedMqttClientManager.1
        private int count = 1;

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            int i = this.count;
            this.count = i + 1;
            thread.setName(String.format("%d#%s", Integer.valueOf(i), runnable));
            return thread;
        }
    };
    private final LinkedBlockingQueue<MqttSubscription> pendingSubscriptions;
    private final List<MqttSubscription> activeSubscriptions;
    private final MqttClient client;
    private final ExecutorService executor;
    private final Map<MqttMessageConsumer, Future<?>> consumers = new HashMap();
    private Future<?> subscribeTask;
    private Future<?> connectTask;

    @Inject
    public ThreadedMqttClientManager(MqttClient mqttClient) {
        this.client = mqttClient;
        this.client.setCallback(this);
        this.activeSubscriptions = new ArrayList();
        this.pendingSubscriptions = new LinkedBlockingQueue<>();
        this.executor = Executors.newThreadPerTaskExecutor(THREAD_FACTORY);
    }

    @Override // it.netgrid.bauer.impl.MqttClientManager
    public void publish(String str, MqttMessage mqttMessage) throws IOException {
        try {
            this.client.publish(str, mqttMessage);
        } catch (MqttPersistenceException e) {
            log.warn(String.format("Unable to persist: %s - %s", str, mqttMessage.toDebugString()));
            throw new IOException(e);
        } catch (MqttException e2) {
            log.warn(String.format("Unable to publish: %s - %s", str, mqttMessage.toDebugString()));
            throw new IOException(e2);
        }
    }

    @Override // it.netgrid.bauer.impl.MqttClientManager
    public void addConsumer(MqttMessageConsumer mqttMessageConsumer) throws IOException {
        if (this.consumers.containsKey(mqttMessageConsumer)) {
            return;
        }
        this.consumers.put(mqttMessageConsumer, null);
        this.consumers.put(mqttMessageConsumer, this.executor.submit(mqttMessageConsumer));
        try {
            this.pendingSubscriptions.put(mqttMessageConsumer.getMqttSubscription());
        } catch (InterruptedException e) {
            log.error(String.format("Subscription failed %s", mqttMessageConsumer.toString()));
        }
    }

    @Override // org.eclipse.paho.mqttv5.client.MqttCallback
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        if (this.subscribeTask != null) {
            this.subscribeTask.cancel(true);
            this.subscribeTask = null;
        }
        log.warn(String.format("MQTT-Connection: LOST", mqttDisconnectResponse));
    }

    @Override // org.eclipse.paho.mqttv5.client.MqttCallback
    public void mqttErrorOccurred(MqttException mqttException) {
        log.error(String.format("MQTT-Error: %s", mqttException.toString(), mqttException));
    }

    @Override // org.eclipse.paho.mqttv5.client.MqttCallback
    public void messageArrived(final String str, final MqttMessage mqttMessage) throws Exception {
        this.consumers.keySet().forEach(new Consumer<MqttMessageConsumer>(this) { // from class: it.netgrid.bauer.impl.impl.ThreadedMqttClientManager.2
            @Override // java.util.function.Consumer
            public void accept(MqttMessageConsumer mqttMessageConsumer) {
                try {
                    mqttMessageConsumer.consume(str, mqttMessage);
                } catch (IOException e) {
                    ThreadedMqttClientManager.log.error(String.format("Unable to consume from %s by %s", str, mqttMessageConsumer));
                }
            }
        });
    }

    @Override // org.eclipse.paho.mqttv5.client.MqttCallback
    public void deliveryComplete(IMqttToken iMqttToken) {
        log.debug(String.format("MQTT-Delivered: %s", iMqttToken));
    }

    @Override // org.eclipse.paho.mqttv5.client.MqttCallback
    public void connectComplete(boolean z, String str) {
        Logger logger = log;
        Object[] objArr = new Object[2];
        objArr[0] = z ? "RECONNECTED" : "CONNECTED";
        objArr[1] = str;
        logger.info(String.format("MQTT-Connection: %s to %s", objArr));
        this.subscribeTask = this.executor.submit(this);
    }

    @Override // org.eclipse.paho.mqttv5.client.MqttCallback
    public void authPacketArrived(int i, MqttProperties mqttProperties) {
        Object obj = "UNKNOWN";
        switch (i) {
            case 0:
                obj = "AUTHENTICATED";
                break;
            case 24:
                obj = "AUTH-CONTINUE";
                break;
            case 25:
                obj = "RE-AUTH";
                break;
        }
        log.info(String.format("MQTT-Connection: %s", obj));
    }

    @Override // java.lang.Runnable
    public void run() {
        boolean z = false;
        while (!Thread.currentThread().isInterrupted() && this.activeSubscriptions.size() > 0 && !z) {
            try {
                int size = this.activeSubscriptions.size();
                MqttSubscription[] mqttSubscriptionArr = new MqttSubscription[size];
                for (int i = 0; i < size; i++) {
                    mqttSubscriptionArr[i] = this.activeSubscriptions.get(i);
                }
                this.client.subscribe(mqttSubscriptionArr);
                z = true;
                break;
            } catch (MqttException e) {
                log.warn(String.format("Unable to restore subscriptions. Retry in seconds...", new Object[0]));
                try {
                    Thread.sleep((long) (1000.0d + (Math.random() * 4000.0d)));
                } catch (InterruptedException e2) {
                    log.info(String.format("Shutting down...", new Object[0]));
                }
            }
        }
        while (!Thread.currentThread().isInterrupted()) {
            runSubscribeOnce();
        }
    }

    public void runSubscribeOnce() {
        MqttSubscription mqttSubscription = null;
        try {
            try {
                try {
                    MqttSubscription take = this.pendingSubscriptions.take();
                    this.client.subscribe(new MqttSubscription[]{take});
                    this.activeSubscriptions.add(take);
                    mqttSubscription = null;
                    if (0 != 0) {
                        this.pendingSubscriptions.add(null);
                    }
                } catch (MqttException e) {
                    log.warn(String.format("Unable to subscribe to %s: %s", mqttSubscription.getTopic(), e.getMessage()));
                    if (mqttSubscription != null) {
                        this.pendingSubscriptions.add(mqttSubscription);
                    }
                }
            } catch (InterruptedException e2) {
                log.warn(String.format(e2.getMessage(), new Object[0]));
                if (mqttSubscription != null) {
                    this.pendingSubscriptions.add(mqttSubscription);
                }
            }
        } catch (Throwable th) {
            if (mqttSubscription != null) {
                this.pendingSubscriptions.add(mqttSubscription);
            }
            throw th;
        }
    }

    public int pendingSubscriptions() {
        return this.pendingSubscriptions.size();
    }

    @Override // it.netgrid.bauer.impl.MqttClientManager
    public boolean connectCompleted() {
        return this.connectTask.isDone();
    }

    @Override // it.netgrid.bauer.impl.MqttClientManager
    public void connect(final MqttConnectionOptions mqttConnectionOptions) throws IOException {
        this.connectTask = this.executor.submit(new Runnable(this) { // from class: it.netgrid.bauer.impl.impl.ThreadedMqttClientManager.3
            final /* synthetic */ ThreadedMqttClientManager this$0;

            {
                this.this$0 = this;
            }

            @Override // java.lang.Runnable
            public void run() {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        this.this$0.client.connect(mqttConnectionOptions);
                        return;
                    } catch (MqttSecurityException e) {
                        ThreadedMqttClientManager.log.error(String.format("Security error: %s", e.getMessage()));
                        try {
                            Thread.sleep((long) ((mqttConnectionOptions.getAutomaticReconnectMinDelay() * 1000) + (Math.random() * mqttConnectionOptions.getAutomaticReconnectMaxDelay() * 1000.0d)));
                        } catch (InterruptedException e2) {
                            ThreadedMqttClientManager.log.info(String.format("Shutting down...", new Object[0]));
                        }
                    } catch (MqttException e3) {
                        ThreadedMqttClientManager.log.warn(String.format("Unable to connect. Retry in seconds...", new Object[0]));
                        Thread.sleep((long) ((mqttConnectionOptions.getAutomaticReconnectMinDelay() * 1000) + (Math.random() * mqttConnectionOptions.getAutomaticReconnectMaxDelay() * 1000.0d)));
                    }
                }
            }
        });
    }
}
