/*
 * Decompiled with CFR 0.152.
 */
package org.openmuc.framework.lib.mqtt;

import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Iterator;
import java.util.Locale;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.openmuc.framework.lib.mqtt.MessageTuple;
import org.openmuc.framework.lib.mqtt.MqttBufferHandler;
import org.openmuc.framework.lib.mqtt.MqttConnection;
import org.openmuc.framework.lib.mqtt.MqttSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

public class MqttWriter {
    private static final Logger logger = LoggerFactory.getLogger(MqttWriter.class);
    private final MqttConnection connection;
    private boolean connected = false;
    private final AtomicBoolean cancelReconnect = new AtomicBoolean(false);
    private LocalDateTime timeOfConnectionLoss;
    private final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss", Locale.getDefault());
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private final MqttBufferHandler buffer;
    private final String pid;

    public MqttWriter(MqttConnection connection, String pid) {
        this.connection = connection;
        this.addConnectedListener();
        this.addDisconnectedListener();
        MqttSettings s = connection.getSettings();
        this.buffer = new MqttBufferHandler(s.getMaxBufferSize(), s.getMaxFileCount(), s.getMaxFileSize(), s.getPersistenceDirectory());
        this.pid = pid;
    }

    private void emptyFileBuffer() {
        this.log("Clearing file buffer.", new Object[0]);
        String[] buffers = this.buffer.getBuffers();
        if (buffers.length == 0) {
            this.log("File buffer already empty.", new Object[0]);
        }
        int messageCount = 0;
        int chunkSize = this.connection.getSettings().getRecoveryChunkSize();
        int delay = this.connection.getSettings().getRecoveryDelay();
        for (String buffer : buffers) {
            Iterator<MessageTuple> iterator = this.buffer.getMessageIterator(buffer);
            while (iterator.hasNext()) {
                if (!this.connected) {
                    this.warn("Recovery from file buffer interrupted by connection loss.", new Object[0]);
                    return;
                }
                MessageTuple messageTuple = iterator.next();
                if (logger.isTraceEnabled()) {
                    this.trace("Resend from file: {}", new String(messageTuple.message));
                }
                this.write(messageTuple.topic, messageTuple.message);
                if (!this.connection.getSettings().isRecoveryLimitSet() || ++messageCount != chunkSize) continue;
                messageCount = 0;
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        this.log("Empty file buffer done.", new Object[0]);
    }

    private void addConnectedListener() {
        this.connection.addConnectedListener(context -> {
            String serverHost = "UNKNOWN";
            String serverPort = "UNKNOWN";
            if (context.getClientConfig() != null) {
                serverHost = context.getClientConfig().getServerHost();
                serverPort = String.valueOf(context.getClientConfig().getServerPort());
            }
            this.log("connected to broker {}:{}", serverHost, serverPort);
            this.connected = true;
            MqttSettings settings = this.connection.getSettings();
            if (settings.isFirstWillSet()) {
                this.write(settings.getFirstWillTopic(), settings.getFirstWillPayload());
            }
            Thread recovery = new Thread(this::emptyBuffer, "MqttRecovery");
            recovery.start();
        });
    }

    private void emptyBuffer() {
        this.log("Clearing memory (RAM) buffer.", new Object[0]);
        if (this.buffer.isEmpty()) {
            this.log("Memory buffer already empty.", new Object[0]);
        }
        int messageCount = 0;
        int chunkSize = this.connection.getSettings().getRecoveryChunkSize();
        int delay = this.connection.getSettings().getRecoveryDelay();
        while (!this.buffer.isEmpty()) {
            if (!this.connected) {
                this.warn("Recovery from memory buffer interrupted by connection loss.", new Object[0]);
                return;
            }
            MessageTuple messageTuple = this.buffer.removeNextMessage();
            if (logger.isTraceEnabled()) {
                this.trace("Resend from memory: {}", new String(messageTuple.message));
            }
            this.write(messageTuple.topic, messageTuple.message);
            if (!this.connection.getSettings().isRecoveryLimitSet() || ++messageCount != chunkSize) continue;
            messageCount = 0;
            try {
                Thread.sleep(delay);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.log("Empty memory buffer done.", new Object[0]);
        this.emptyFileBuffer();
    }

    private void addDisconnectedListener() {
        this.connection.addDisconnectedListener(context -> {
            if (this.cancelReconnect.getAndSet(false)) {
                logger.debug("Called during shutdown");
                context.getReconnector().reconnect(false);
            }
            if (context.getReconnector().isReconnect()) {
                logger.debug("Disconnected");
                String serverHost = context.getClientConfig().getServerHost();
                String cause = context.getCause().getMessage();
                String source = context.getSource().name();
                if (this.connected) {
                    this.handleDisconnect(serverHost, cause);
                } else {
                    this.handleFailedReconnect(serverHost, cause, source);
                }
            }
        });
    }

    private void handleFailedReconnect(String serverHost, String cause, String source) {
        logger.debug("handling failed reconnect");
        if (this.isInitialConnect()) {
            this.timeOfConnectionLoss = LocalDateTime.now();
        }
        long d = Duration.between(this.timeOfConnectionLoss, LocalDateTime.now()).getSeconds() * 1000L;
        String duration = this.sdf.format(new Date(d - (long)TimeZone.getDefault().getRawOffset()));
        this.warn("Reconnect failed: broker '{}'. Source: '{}'. Cause: '{}'. Connection lost at: {}, duration {}", serverHost, source, cause, this.dateFormatter.format(this.timeOfConnectionLoss), duration);
    }

    public boolean isInitialConnect() {
        return this.timeOfConnectionLoss == null;
    }

    private void handleDisconnect(String serverHost, String cause) {
        this.timeOfConnectionLoss = LocalDateTime.now();
        this.connected = false;
        this.warn("Connection lost: broker '{}'. Cause: '{}'", serverHost, cause);
    }

    public void write(String topic, byte[] message) {
        if (this.connected) {
            this.startPublishing(topic, message);
        } else {
            this.warn("No connection to broker - adding message to buffer", new Object[0]);
            this.buffer.add(topic, message);
        }
    }

    private void startPublishing(String topic, byte[] message) {
        this.publish(topic, message).whenComplete((publish, exception) -> {
            if (exception != null) {
                this.warn("Connection issue: {} message could not be sent. Adding message to buffer", exception.getMessage());
                this.buffer.add(topic, message);
            } else if (logger.isTraceEnabled()) {
                this.trace("Message successfully delivered on topic {}", topic);
            }
        });
    }

    CompletableFuture<Mqtt3Publish> publish(String topic, byte[] message) {
        return (CompletableFuture)((Mqtt3PublishBuilder.Send.Complete)((Mqtt3PublishBuilder.Send.Complete)((Mqtt3PublishBuilder.Send.Complete)this.connection.getClient().publishWith().topic(topic)).retain(this.connection.getSettings().isRetainedMessages())).payload(message)).send();
    }

    public MqttConnection getConnection() {
        return this.connection;
    }

    public boolean isConnected() {
        return this.connection != null && this.connected;
    }

    private void log(String message, Object ... args) {
        message = MessageFormatter.arrayFormat((String)message, (Object[])args).getMessage();
        logger.info("[{}] {}", (Object)this.pid, (Object)message);
    }

    private void debug(String message, Object ... args) {
        message = MessageFormatter.arrayFormat((String)message, (Object[])args).getMessage();
        logger.debug("[{}] {}", (Object)this.pid, (Object)message);
    }

    private void warn(String message, Object ... args) {
        message = MessageFormatter.arrayFormat((String)message, (Object[])args).getMessage();
        logger.warn("[{}] {}", (Object)this.pid, (Object)message);
    }

    private void error(String message, Object ... args) {
        message = MessageFormatter.arrayFormat((String)message, (Object[])args).getMessage();
        logger.error("[{}] {}", (Object)this.pid, (Object)message);
    }

    private void trace(String message, Object ... args) {
        message = MessageFormatter.arrayFormat((String)message, (Object[])args).getMessage();
        logger.trace("[{}] {}", (Object)this.pid, (Object)message);
    }

    public void shutdown() {
        this.connected = false;
        this.cancelReconnect.set(true);
        logger.info("shutting down ");
        this.log("Saving buffers.", new Object[0]);
        this.buffer.persist();
    }
}

