package org.openmuc.framework.driver.mqtt;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.openmuc.framework.config.ArgumentSyntaxException;
import org.openmuc.framework.config.ChannelScanInfo;
import org.openmuc.framework.config.ScanException;
import org.openmuc.framework.data.ByteArrayValue;
import org.openmuc.framework.data.Flag;
import org.openmuc.framework.data.Record;
import org.openmuc.framework.data.ValueType;
import org.openmuc.framework.dataaccess.Channel;
import org.openmuc.framework.datalogger.spi.LoggingRecord;
import org.openmuc.framework.driver.spi.ChannelRecordContainer;
import org.openmuc.framework.driver.spi.ChannelValueContainer;
import org.openmuc.framework.driver.spi.Connection;
import org.openmuc.framework.driver.spi.ConnectionException;
import org.openmuc.framework.driver.spi.RecordsReceivedListener;
import org.openmuc.framework.lib.mqtt.MqttConnection;
import org.openmuc.framework.lib.mqtt.MqttReader;
import org.openmuc.framework.lib.mqtt.MqttSettings;
import org.openmuc.framework.lib.mqtt.MqttWriter;
import org.openmuc.framework.parser.spi.ParserService;
import org.openmuc.framework.parser.spi.SerializationException;
import org.openmuc.framework.security.SslManagerInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmuc/framework/driver/mqtt/MqttDriverConnection.class */
public class MqttDriverConnection implements Connection {
    private static final Logger logger = LoggerFactory.getLogger(MqttDriverConnection.class);
    private final MqttConnection mqttConnection;
    private final MqttWriter mqttWriter;
    private final MqttReader mqttReader;
    private final Map<String, ParserService> parsers = new HashMap();
    private final Map<String, Long> lastLoggedRecords = new HashMap();
    private final List<ChannelRecordContainer> recordContainerList = new ArrayList();
    private final Properties settings = new Properties();

    public MqttDriverConnection(String str, String str2) throws ArgumentSyntaxException {
        MqttSettings mqttSettings = getMqttSettings(str, str2);
        this.mqttConnection = new MqttConnection(mqttSettings);
        this.mqttWriter = new MqttWriter(this.mqttConnection, "mqttdriver");
        this.mqttReader = new MqttReader(this.mqttConnection, "mqttdriver");
        if (mqttSettings.isSsl()) {
            logger.debug("not connecting: waiting for ssl");
        } else {
            this.mqttConnection.connect();
        }
    }

    private MqttSettings getMqttSettings(String str, String str2) throws ArgumentSyntaxException {
        try {
            this.settings.load(new StringReader(str2.replaceAll(";", "\n")));
            return new MqttSettings(str, Integer.parseInt(this.settings.getProperty("port")), this.settings.getProperty("username"), this.settings.getProperty("password"), Boolean.parseBoolean(this.settings.getProperty("ssl")), Long.parseLong(this.settings.getProperty("maxBufferSize", "0")), Long.parseLong(this.settings.getProperty("maxFileSize", "0")), Integer.parseInt(this.settings.getProperty("maxFileCount", "1")), Integer.parseInt(this.settings.getProperty("connectionRetryInterval", "10")), Integer.parseInt(this.settings.getProperty("connectionAliveInterval", "10")), this.settings.getProperty("persistenceDirectory", "data/driver/mqtt"), this.settings.getProperty("lastWillTopic", ""), this.settings.getProperty("lastWillPayload", "").getBytes(), Boolean.parseBoolean(this.settings.getProperty("lastWillAlways", "false")), this.settings.getProperty("firstWillTopic", ""), this.settings.getProperty("firstWillPayload", "").getBytes(), 0, 0, Boolean.parseBoolean(this.settings.getProperty("webSocket", "false")), Boolean.parseBoolean(this.settings.getProperty("retainedMessages", "false")));
        } catch (IOException e) {
            throw new ArgumentSyntaxException("Could not read settings string");
        }
    }

    public List<ChannelScanInfo> scanForChannels(String str) throws UnsupportedOperationException, ArgumentSyntaxException, ScanException, ConnectionException {
        throw new UnsupportedOperationException();
    }

    public Object read(List<ChannelRecordContainer> list, Object obj, String str) throws UnsupportedOperationException, ConnectionException {
        throw new UnsupportedOperationException();
    }

    public void startListening(List<ChannelRecordContainer> list, RecordsReceivedListener recordsReceivedListener) throws UnsupportedOperationException, ConnectionException {
        ArrayList arrayList = new ArrayList();
        Iterator<ChannelRecordContainer> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getChannelAddress());
        }
        if (arrayList.isEmpty()) {
            return;
        }
        this.mqttReader.listen(arrayList, (str, bArr) -> {
            Channel channel = ((ChannelRecordContainer) list.get(arrayList.indexOf(str))).getChannel();
            Record record = getRecord(bArr, channel.getValueType());
            if (recordIsOld(channel.getId(), record)) {
                logger.debug("Ignored message because it has a older or equal timestamp as the previous message");
                return;
            }
            addMessageToContainerList(record, (ChannelRecordContainer) list.get(arrayList.indexOf(str)));
            if (this.recordContainerList.size() >= Integer.parseInt(this.settings.getProperty("recordCollectionSize", "1"))) {
                notifyListenerAndPurgeList(recordsReceivedListener);
            }
        });
    }

    private void notifyListenerAndPurgeList(RecordsReceivedListener recordsReceivedListener) {
        logTraceNewRecord();
        recordsReceivedListener.newRecords(this.recordContainerList);
        this.recordContainerList.clear();
    }

    private void addMessageToContainerList(Record record, ChannelRecordContainer channelRecordContainer) {
        ChannelRecordContainer copy = channelRecordContainer.copy();
        copy.setRecord(record);
        this.recordContainerList.add(copy);
    }

    private boolean recordIsOld(String str, Record record) {
        Long l = this.lastLoggedRecords.get(str);
        if (l == null) {
            this.lastLoggedRecords.put(str, record.getTimestamp());
            return false;
        }
        if (record.getTimestamp() == null || record.getTimestamp().longValue() <= l.longValue()) {
            return true;
        }
        this.lastLoggedRecords.put(str, record.getTimestamp());
        return false;
    }

    private Record getRecord(byte[] bArr, ValueType valueType) {
        return this.parsers.containsKey(this.settings.getProperty("parser")) ? this.parsers.get(this.settings.getProperty("parser")).deserialize(bArr, valueType) : new Record(new ByteArrayValue(bArr), Long.valueOf(System.currentTimeMillis()));
    }

    public Object write(List<ChannelValueContainer> list, Object obj) throws UnsupportedOperationException, ConnectionException {
        for (ChannelValueContainer channelValueContainer : list) {
            LoggingRecord loggingRecord = new LoggingRecord(channelValueContainer.getChannelAddress(), new Record(channelValueContainer.getValue(), Long.valueOf(System.currentTimeMillis())));
            if (!this.parsers.containsKey(this.settings.getProperty("parser"))) {
                logger.error("A parser is needed to write messages and none have been registered.");
                throw new UnsupportedOperationException();
            }
            try {
                this.mqttWriter.write(channelValueContainer.getChannelAddress(), this.parsers.get(this.settings.getProperty("parser")).serialize(loggingRecord));
                channelValueContainer.setFlag(Flag.VALID);
            } catch (SerializationException e) {
                logger.error(e.getMessage());
            }
        }
        return null;
    }

    public void disconnect() {
        this.mqttWriter.shutdown();
        this.mqttConnection.disconnect();
    }

    public void setParser(String str, ParserService parserService) {
        if (parserService == null) {
            this.parsers.remove(str);
        } else {
            this.parsers.put(str, parserService);
        }
    }

    private void logTraceNewRecord() {
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append("new records");
            for (ChannelRecordContainer channelRecordContainer : this.recordContainerList) {
                sb.append("\ntopic: " + ((Object) sb.append(channelRecordContainer.getChannelAddress())) + "\n");
                sb.append("record: " + channelRecordContainer.getRecord().toString());
            }
            logger.trace(sb.toString());
        }
    }

    public void setSslManager(SslManagerInterface sslManagerInterface) {
        logger.debug("setSslManager");
        if (this.mqttConnection.getSettings().isSsl()) {
            logger.debug("SSLManager registered in driver");
            this.mqttConnection.setSslManager(sslManagerInterface);
            if (sslManagerInterface.isLoaded()) {
                this.mqttConnection.connect();
            }
        }
    }
}
