package org.gecko.adapter.mqtt.common;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.mqtt.MQTTContext;
import org.gecko.adapter.mqtt.MQTTContextBuilder;
import org.gecko.adapter.mqtt.MqttConfig;
import org.gecko.adapter.mqtt.QoS;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.util.pushstream.PushStreamHelper;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.util.pushstream.PushEventSource;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.SimplePushEventSource;

/* loaded from: input_file:org/gecko/adapter/mqtt/common/AbstractMqttService.class */
public abstract class AbstractMqttService implements MessagingService, AutoCloseable {
    private static final Logger logger = Logger.getLogger(AbstractMqttService.class.getName());
    private static final int RECONNECT_DELAY_MS = 5000;
    private Timer reconnectTimer;
    protected GeckoMqttClient mqtt;
    private Map<String, MqttPushEventSource> subscriptions = new ConcurrentHashMap();
    private MqttConfig config;

    @Activate
    public void doActivate(MqttConfig mqttConfig) {
        this.config = mqttConfig;
    }

    @Deactivate
    public void doDeactivate() throws Exception {
        close();
        if (this.reconnectTimer != null) {
            this.reconnectTimer.cancel();
        }
    }

    protected abstract GeckoMqttClient createClient(MqttConfig mqttConfig, String str);

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.mqtt != null) {
            if (this.mqtt.isConnected()) {
                this.mqtt.disconnect();
            }
            this.mqtt.close();
        }
        this.subscriptions.values().forEach((v0) -> {
            v0.close();
        });
    }

    public PushStream<Message> subscribe(String str) throws Exception {
        return subscribe(str, new MQTTContextBuilder().withQoS(QoS.AT_LEAST_ONE).build());
    }

    public PushStream<Message> subscribe(String str, MessagingContext messagingContext) throws Exception {
        String replace = str.replace('*', '#');
        PushEventSource pushEventSource = (SimplePushEventSource) this.subscriptions.get(replace);
        if (pushEventSource == null) {
            synchronized (this.subscriptions) {
                pushEventSource = this.subscriptions.computeIfAbsent(replace, str2 -> {
                    return new MqttPushEventSource(str, messagingContext, this.config, this::createClient);
                });
            }
        }
        return (PushStream) PushStreamHelper.configurePushStreamBuilder(pushEventSource, messagingContext).build();
    }

    public void publish(String str, ByteBuffer byteBuffer) throws Exception {
        publish(str, byteBuffer, new MQTTContextBuilder().withQoS(QoS.AT_MOST_ONE).build());
    }

    public void publish(String str, ByteBuffer byteBuffer, MessagingContext messagingContext) throws Exception {
        if (this.mqtt == null) {
            try {
                this.mqtt = createClient(this.config, generateClientId());
                this.mqtt.connectionLost(this::startReconnectTimer);
            } catch (Exception e) {
                logger.log(Level.SEVERE, e, () -> {
                    return "Error connecting to MQTT broker " + this.config.brokerUrl();
                });
                throw e;
            }
        }
        QoS qoS = QoS.AT_MOST_ONE;
        boolean z = false;
        if (messagingContext instanceof MQTTContext) {
            MQTTContext mQTTContext = (MQTTContext) messagingContext;
            if (mQTTContext.getQoS() != null) {
                qoS = mQTTContext.getQoS();
            }
            z = mQTTContext.isRetained();
        }
        this.mqtt.publish(str, byteBuffer.array(), qoS.ordinal(), z);
    }

    private String generateClientId() {
        return "gecko-" + UUID.randomUUID().toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startReconnectTimer(final Throwable th) {
        if (th != null) {
            logger.log(Level.INFO, th, () -> {
                return "Connection to MQTT broker lost: " + th.getMessage() + ". Waiting before reconnecting.";
            });
        }
        if (this.reconnectTimer != null) {
            this.reconnectTimer.cancel();
            this.reconnectTimer = null;
        }
        this.reconnectTimer = new Timer();
        this.reconnectTimer.schedule(new TimerTask() { // from class: org.gecko.adapter.mqtt.common.AbstractMqttService.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                if (AbstractMqttService.this.mqtt == null) {
                    AbstractMqttService.logger.log(Level.SEVERE, "Trying to reconnect a null client.");
                    return;
                }
                if (AbstractMqttService.this.mqtt.isConnected()) {
                    return;
                }
                AbstractMqttService.logger.log(Level.INFO, "Create new client and reconnect");
                AbstractMqttService.this.mqtt.close();
                try {
                    AbstractMqttService.this.mqtt = AbstractMqttService.this.createClient(AbstractMqttService.this.config, AbstractMqttService.this.generateClientId());
                    GeckoMqttClient geckoMqttClient = AbstractMqttService.this.mqtt;
                    AbstractMqttService abstractMqttService = AbstractMqttService.this;
                    geckoMqttClient.connectionLost(th2 -> {
                        abstractMqttService.startReconnectTimer(th2);
                    });
                } catch (Exception e) {
                    AbstractMqttService.logger.log(Level.SEVERE, e, () -> {
                        return "Error trying to reconnect to MQTT broker.";
                    });
                    AbstractMqttService.this.startReconnectTimer(th);
                }
            }
        }, 5000L);
    }

    public String toString() {
        return this.mqtt + " " + this.subscriptions.toString();
    }
}
