package org.openmuc.framework.lib.mqtt;

import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Queue;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

/* loaded from: input_file:org/openmuc/framework/lib/mqtt/MqttWriter.class */
public class MqttWriter {
    private static final Logger logger = LoggerFactory.getLogger(MqttWriter.class);
    private static final Queue<MessageTuple> MESSAGE_BUFFER = new LinkedList();
    private final MqttConnection connection;
    private LocalDateTime timeOfConnectionLoss;
    private final MqttBufferHandler buffer;
    private final String pid;
    private boolean connected = false;
    private final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss", Locale.getDefault());
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public MqttWriter(MqttConnection mqttConnection, String str) {
        this.connection = mqttConnection;
        addConnectedListener();
        addDisconnectedListener();
        MqttSettings settings = mqttConnection.getSettings();
        this.buffer = new MqttBufferHandler(settings.getMaxBufferSize(), settings.getMaxFileCount(), settings.getMaxFileSize(), settings.getPersistenceDirectory());
        this.pid = str;
    }

    private void addConnectedListener() {
        this.connection.addConnectedListener(mqttClientConnectedContext -> {
            String str = "UNKNOWN";
            String str2 = "UNKNOWN";
            if (mqttClientConnectedContext.getClientConfig() != null) {
                str = mqttClientConnectedContext.getClientConfig().getServerHost();
                str2 = String.valueOf(mqttClientConnectedContext.getClientConfig().getServerPort());
            }
            log("connected to broker {}:{}", str, str2);
            this.connected = true;
            MqttSettings settings = this.connection.getSettings();
            if (settings.isFirstWillSet()) {
                write(settings.getFirstWillTopic(), settings.getFirstWillPayload());
            }
            emptyBuffer();
            emptyFileBuffer();
        });
    }

    private void emptyFileBuffer() {
        log("Clearing file buffer.", new Object[0]);
        String[] buffers = this.buffer.getBuffers();
        if (buffers.length == 0) {
            log("File buffer already empty.", new Object[0]);
        }
        for (String str : buffers) {
            Iterator<MessageTuple> messageIterator = this.buffer.getMessageIterator(str);
            while (messageIterator.hasNext()) {
                MessageTuple next = messageIterator.next();
                if (logger.isTraceEnabled()) {
                    trace("Resend from file: {}", new String(next.message));
                }
                write(next.topic, next.message);
            }
        }
        log("Empty file buffer done.", new Object[0]);
    }

    private void emptyBuffer() {
        log("Clearing memory (RAM) buffer.", new Object[0]);
        if (this.buffer.isEmpty()) {
            log("Memory buffer already empty.", new Object[0]);
        }
        while (!this.buffer.isEmpty()) {
            MessageTuple removeNextMessage = this.buffer.removeNextMessage();
            if (logger.isTraceEnabled()) {
                trace("Resend from memory: {}", new String(removeNextMessage.message));
            }
            write(removeNextMessage.topic, removeNextMessage.message);
        }
        log("Empty memory buffer done.", new Object[0]);
    }

    private void addDisconnectedListener() {
        this.connection.addDisconnectedListener(mqttClientDisconnectedContext -> {
            if (mqttClientDisconnectedContext.getReconnector().isReconnect()) {
                String serverHost = mqttClientDisconnectedContext.getClientConfig().getServerHost();
                String message = mqttClientDisconnectedContext.getCause().getMessage();
                if (this.connected) {
                    handleDisconnect(serverHost, message);
                } else {
                    handleFailedReconnect(serverHost, message);
                }
            }
        });
    }

    private void handleFailedReconnect(String str, String str2) {
        if (isInitialConnect()) {
            this.timeOfConnectionLoss = LocalDateTime.now();
        }
        warn("Reconnect failed: broker '{}'. Cause: '{}'. Connection lost at: {}, duration {}", str, str2, this.dateFormatter.format(this.timeOfConnectionLoss), this.sdf.format(new Date((Duration.between(this.timeOfConnectionLoss, LocalDateTime.now()).getSeconds() * 1000) - TimeZone.getDefault().getRawOffset())));
    }

    private boolean isInitialConnect() {
        return this.timeOfConnectionLoss == null;
    }

    private void handleDisconnect(String str, String str2) {
        this.timeOfConnectionLoss = LocalDateTime.now();
        this.connected = false;
        warn("Connection lost: broker '{}'. Cause: '{}'", str, str2);
    }

    public void write(String str, byte[] bArr) {
        if (this.connected) {
            startPublishing(str, bArr);
        } else {
            warn("No connection to broker - adding message to buffer", new Object[0]);
            this.buffer.add(str, bArr);
        }
    }

    private void startPublishing(String str, byte[] bArr) {
        publish(str, bArr).whenComplete((mqtt3Publish, th) -> {
            if (th != null) {
                warn("Connection issue: {} message could not be sent. Adding message to buffer", th.getMessage());
                this.buffer.add(str, bArr);
            }
        });
    }

    CompletableFuture<Mqtt3Publish> publish(String str, byte[] bArr) {
        return (CompletableFuture) this.connection.getClient().publishWith().topic(str).payload(bArr).send();
    }

    public MqttConnection getConnection() {
        return this.connection;
    }

    public boolean isConnected() {
        return this.connection != null;
    }

    private void log(String str, Object... objArr) {
        logger.info("[{}] {}", this.pid, MessageFormatter.arrayFormat(str, objArr).getMessage());
    }

    private void debug(String str, Object... objArr) {
        logger.debug("[{}] {}", this.pid, MessageFormatter.arrayFormat(str, objArr).getMessage());
    }

    private void warn(String str, Object... objArr) {
        logger.warn("[{}] {}", this.pid, MessageFormatter.arrayFormat(str, objArr).getMessage());
    }

    private void error(String str, Object... objArr) {
        logger.error("[{}] {}", this.pid, MessageFormatter.arrayFormat(str, objArr).getMessage());
    }

    private void trace(String str, Object... objArr) {
        logger.trace("[{}] {}", this.pid, MessageFormatter.arrayFormat(str, objArr).getMessage());
    }
}
