package io.warp10.plugins.mqtt;

import com.google.common.base.Charsets;
import io.warp10.continuum.store.DirectoryClient;
import io.warp10.continuum.store.StoreClient;
import io.warp10.script.MemoryWarpScriptStack;
import io.warp10.script.WarpScriptStack;
import io.warp10.script.WarpScriptStopException;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/warp10/plugins/mqtt/MQTTConsumer.class */
public class MQTTConsumer extends Thread {
    private static final int DEFAULT_QSIZE = 1024;
    private static final String PARAM_MACRO = "macro";
    private static final String PARAM_PARALLELISM = "parallelism";
    private static final String PARAM_PARTITIONER = "partitioner";
    private static final String PARAM_QSIZE = "qsize";
    private static final String PARAM_MQTT_HOST = "host";
    private static final String PARAM_MQTT_USER = "user";
    private static final String PARAM_MQTT_PASSWORD = "password";
    private static final String PARAM_MQTT_PORT = "port";
    private static final String PARAM_MQTT_CLIENTID = "clientid";
    private static final String PARAM_MQTT_QOS = "qos";
    private static final String PARAM_MQTT_TIMEOUT = "timeout";
    private static final String PARAM_MQTT_AUTOACK = "autoack";
    private static final String PARAM_MQTT_TOPICS = "topics";
    private static final String PARAM_MQTT_CLEANSESSION = "cleansession";
    private final MemoryWarpScriptStack stack;
    private final WarpScriptStack.Macro macro;
    private final WarpScriptStack.Macro partitioner;
    private final String mqttuser;
    private final String mqttpassword;
    private final String mqtthost;
    private final String mqttclientid;
    private final boolean mqttautoack;
    private final QoS mqttqos;
    private long timeout;
    private final boolean mqttcleansession;
    private final int parallelism;
    private final int mqttport;
    private final Topic[] topics;
    private BlockingConnection connection;
    private boolean done;
    private final String warpscript;
    private final LinkedBlockingQueue<Message> queue;
    private final LinkedBlockingQueue<Message>[] queues;
    private Thread[] executors;
    private static final Logger LOG = LoggerFactory.getLogger(MQTTConsumer.class);
    private static final QoS DEFAULT_QOS = QoS.AT_LEAST_ONCE;

    public MQTTConsumer(Path path) throws Exception {
        this.timeout = 0L;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        FileInputStream fileInputStream = new FileInputStream(path.toFile());
        byte[] bArr = new byte[8192];
        while (true) {
            try {
                int read = fileInputStream.read(bArr);
                if (read < 0) {
                    break;
                } else {
                    byteArrayOutputStream.write(bArr, 0, read);
                }
            } finally {
                fileInputStream.close();
            }
        }
        this.warpscript = new String(byteArrayOutputStream.toByteArray(), Charsets.UTF_8);
        this.stack = new MemoryWarpScriptStack((StoreClient) null, (DirectoryClient) null, new Properties());
        this.stack.maxLimits();
        try {
            this.stack.execMulti(this.warpscript);
        } catch (Throwable th) {
            th.printStackTrace();
            LOG.error("Caught exception while loading '" + path.getFileName() + "'.", th);
        }
        Object pop = this.stack.pop();
        if (!(pop instanceof Map)) {
            throw new RuntimeException("MQTT consumer spec must leave a configuration map on top of the stack.");
        }
        Map map = (Map) pop;
        this.macro = (WarpScriptStack.Macro) map.get(PARAM_MACRO);
        this.partitioner = (WarpScriptStack.Macro) map.get(PARAM_PARTITIONER);
        this.mqttuser = null != map.get(PARAM_MQTT_USER) ? String.valueOf(map.get(PARAM_MQTT_USER)) : null;
        this.mqttpassword = null != map.get(PARAM_MQTT_PASSWORD) ? String.valueOf(map.get(PARAM_MQTT_PASSWORD)) : null;
        this.mqtthost = String.valueOf(map.get(PARAM_MQTT_HOST));
        this.mqttport = ((Number) map.get(PARAM_MQTT_PORT)).intValue();
        this.mqttclientid = String.valueOf(map.get(PARAM_MQTT_CLIENTID));
        this.parallelism = Integer.parseInt(null != map.get(PARAM_PARALLELISM) ? String.valueOf(map.get(PARAM_PARALLELISM)) : "1");
        this.mqttqos = QoS.valueOf(null != map.get(PARAM_MQTT_QOS) ? String.valueOf(map.get(PARAM_MQTT_QOS)) : DEFAULT_QOS.toString());
        this.mqttautoack = Boolean.TRUE.equals(map.get(PARAM_MQTT_AUTOACK));
        if (map.containsKey(PARAM_MQTT_CLEANSESSION)) {
            this.mqttcleansession = Boolean.TRUE.equals(map.get(PARAM_MQTT_CLEANSESSION));
        } else {
            this.mqttcleansession = true;
        }
        if (map.containsKey(PARAM_MQTT_TIMEOUT)) {
            this.timeout = Long.parseLong(String.valueOf(map.get(PARAM_MQTT_TIMEOUT)));
        }
        int parseInt = null != map.get(PARAM_QSIZE) ? Integer.parseInt(String.valueOf(map.get(PARAM_QSIZE))) : DEFAULT_QSIZE;
        if (null == this.partitioner) {
            this.queue = new LinkedBlockingQueue<>(parseInt);
            this.queues = null;
        } else {
            this.queue = null;
            this.queues = new LinkedBlockingQueue[this.parallelism];
            for (int i = 0; i < this.parallelism; i++) {
                this.queues[i] = new LinkedBlockingQueue<>(parseInt);
            }
        }
        Object obj = map.get(PARAM_MQTT_TOPICS);
        if (null == obj || !(obj instanceof List)) {
            throw new RuntimeException("Invalid topic list.");
        }
        this.topics = new Topic[((List) obj).size()];
        for (int i2 = 0; i2 < this.topics.length; i2++) {
            this.topics[i2] = new Topic(String.valueOf(((List) obj).get(i2)), this.mqttqos);
        }
        MQTT mqtt = new MQTT();
        mqtt.setCleanSession(this.mqttcleansession);
        mqtt.setHost(this.mqtthost, this.mqttport);
        if (null != this.mqttclientid) {
            mqtt.setClientId(this.mqttclientid);
        }
        if (null != this.mqttuser) {
            mqtt.setUserName(this.mqttuser);
        }
        if (null != this.mqttpassword) {
            mqtt.setPassword(this.mqttpassword);
        }
        this.connection = mqtt.blockingConnection();
        this.connection.connect();
        this.connection.subscribe(this.topics);
        setDaemon(true);
        setName("[MQTT Client " + this.mqttclientid + "]");
        start();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.executors = new Thread[this.parallelism];
        for (int i = 0; i < this.parallelism; i++) {
            final MemoryWarpScriptStack memoryWarpScriptStack = new MemoryWarpScriptStack(MQTTWarp10Plugin.getExposedStoreClient(), MQTTWarp10Plugin.getExposedDirectoryClient(), new Properties());
            memoryWarpScriptStack.maxLimits();
            final LinkedBlockingQueue<Message> linkedBlockingQueue = null == this.partitioner ? this.queue : this.queues[i];
            this.executors[i] = new Thread() { // from class: io.warp10.plugins.mqtt.MQTTConsumer.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            Message message = MQTTConsumer.this.timeout > 0 ? (Message) linkedBlockingQueue.poll(MQTTConsumer.this.timeout, TimeUnit.MILLISECONDS) : (Message) linkedBlockingQueue.take();
                            memoryWarpScriptStack.clear();
                            if (null != message) {
                                memoryWarpScriptStack.push(message);
                            } else {
                                memoryWarpScriptStack.push((Object) null);
                            }
                            memoryWarpScriptStack.exec(MQTTConsumer.this.macro);
                            if (MQTTConsumer.this.mqttautoack && null != message) {
                                message.ack();
                            }
                        } catch (WarpScriptStopException e) {
                        } catch (InterruptedException e2) {
                            return;
                        } catch (Exception e3) {
                            e3.printStackTrace();
                        }
                    }
                }
            };
            this.executors[i].setContextClassLoader(getContextClassLoader());
            this.executors[i].setName("[MQTT Executor #" + i + "]");
            this.executors[i].setDaemon(true);
            this.executors[i].start();
        }
        while (!this.done) {
            try {
                Message receive = this.connection.receive();
                try {
                    if (null != this.partitioner) {
                        this.stack.clear();
                        this.stack.push(receive);
                        this.stack.exec(this.partitioner);
                        this.queues[((Number) this.stack.pop()).intValue() % this.parallelism].put(receive);
                    } else {
                        this.queue.put(receive);
                    }
                } catch (Exception e) {
                }
            } catch (Exception e2) {
                LOG.error("Caught exception while receiving message", e2);
            }
        }
    }

    public void end() {
        this.done = true;
        try {
            this.connection.disconnect();
            for (Thread thread : this.executors) {
                thread.interrupt();
            }
        } catch (Exception e) {
        }
    }

    public String getWarpScript() {
        return this.warpscript;
    }
}
