package org.openremote.manager.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.security.auth.Subject;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.keycloak.KeycloakSecurityContext;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.security.AuthContext;
import org.openremote.container.security.keycloak.AccessTokenAuthContext;
import org.openremote.container.security.keycloak.KeycloakIdentityProvider;
import org.openremote.container.timer.TimerService;
import org.openremote.manager.event.ClientEventService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.manager.security.ManagerKeycloakIdentityProvider;
import org.openremote.model.Container;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.UserAssetLink;
import org.openremote.model.util.ValueUtil;

/* loaded from: input_file:org/openremote/manager/mqtt/MQTTHandler.class */
public abstract class MQTTHandler {
    public static final String TOKEN_MULTI_LEVEL_WILDCARD = "#";
    public static final String TOKEN_SINGLE_LEVEL_WILDCARD = "+";
    protected ClientEventService clientEventService;
    protected MQTTBrokerService mqttBrokerService;
    protected MessageBrokerService messageBrokerService;
    protected ManagerKeycloakIdentityProvider identityProvider;
    protected ExecutorService executorService;
    protected TimerService timerService;
    protected boolean isKeycloak;
    protected ClientSession clientSession;
    protected ClientProducer producer;

    public int getPriority() {
        return 0;
    }

    public String getName() {
        return getClass().getSimpleName();
    }

    public void init(Container container, Configuration configuration) throws Exception {
        this.mqttBrokerService = (MQTTBrokerService) container.getService(MQTTBrokerService.class);
        this.clientEventService = (ClientEventService) container.getService(ClientEventService.class);
        this.messageBrokerService = container.getService(MessageBrokerService.class);
        ManagerIdentityService service = container.getService(ManagerIdentityService.class);
        this.executorService = container.getExecutor();
        this.timerService = container.getService(TimerService.class);
        if (service.isKeycloakEnabled()) {
            this.isKeycloak = true;
            this.identityProvider = (ManagerKeycloakIdentityProvider) service.getIdentityProvider();
        } else {
            getLogger().warning("MQTT connections are not supported when not using Keycloak identity provider");
            this.isKeycloak = false;
        }
        addPublishTopicServerConfiguration(container, configuration);
    }

    public void start(Container container) throws Exception {
        this.clientSession = this.mqttBrokerService.createSession();
        this.producer = this.clientSession.createProducer();
        Set<String> publishListenerTopics = getPublishListenerTopics();
        if (publishListenerTopics != null) {
            Iterator<String> it = publishListenerTopics.iterator();
            while (it.hasNext()) {
                addPublishConsumer(it.next());
            }
        }
    }

    public void stop() throws Exception {
        if (this.clientSession != null) {
            this.clientSession.close();
            this.clientSession = null;
        }
    }

    protected void addPublishTopicServerConfiguration(Container container, Configuration configuration) {
        Set<String> publishListenerTopics = getPublishListenerTopics();
        if (publishListenerTopics != null) {
            publishListenerTopics.forEach(str -> {
                String coreAddressFromMqttTopic = MQTTUtil.getCoreAddressFromMqttTopic(str, this.mqttBrokerService.wildcardConfiguration);
                AddressSettings publishTopicAddressSettings = getPublishTopicAddressSettings(container, str);
                if (publishTopicAddressSettings != null) {
                    configuration.addAddressSetting("(" + coreAddressFromMqttTopic + ")", publishTopicAddressSettings);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AddressSettings getPublishTopicAddressSettings(Container container, String str) {
        if (container.getMeterRegistry() != null) {
            return new AddressSettings().setEnableMetrics(true);
        }
        return null;
    }

    protected void addPublishConsumer(String str) throws Exception {
        try {
            getLogger().info("Adding publish consumer for topic '" + str + "': handler=" + getName());
            String coreAddressFromMqttTopic = MQTTUtil.getCoreAddressFromMqttTopic(str, this.mqttBrokerService.wildcardConfiguration);
            this.clientSession.createQueue(QueueConfiguration.of(coreAddressFromMqttTopic).setDurable(false).setRoutingType(RoutingType.MULTICAST).setPurgeOnNoConsumers(true).setAutoCreateAddress(true).setAutoCreated(true));
            this.clientSession.createConsumer(coreAddressFromMqttTopic).setMessageHandler(clientMessage -> {
                Topic parse = Topic.parse(MQTTUtil.getMqttTopicFromCoreAddress(clientMessage.getAddress(), this.mqttBrokerService.wildcardConfiguration));
                String stringProperty = clientMessage.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
                RemotingConnection connectionFromClientID = this.mqttBrokerService.getConnectionFromClientID(stringProperty);
                if (connectionFromClientID == null) {
                    getLogger().finer(() -> {
                        return "Client is no longer connected so dropping publish to topic '" + str + "': clientID=" + stringProperty;
                    });
                    return;
                }
                getLogger().finer(() -> {
                    return "onPublish '" + parse + "': " + MQTTBrokerService.connectionToString(connectionFromClientID);
                });
                try {
                    onPublish(connectionFromClientID, parse, clientMessage.getReadOnlyBodyBuffer().byteBuf());
                } catch (Exception e) {
                    getLogger().info("An error occurred whilst handling onPublish to topic '" + str + "': clientID=" + stringProperty);
                }
            });
        } catch (Exception e) {
            getLogger().log(Level.SEVERE, "Failed to create handler consumer for topic '" + str + "': handler=" + getName(), (Throwable) e);
            throw e;
        }
    }

    public void onConnect(RemotingConnection remotingConnection) {
    }

    public void onDisconnect(RemotingConnection remotingConnection) {
    }

    public void onConnectionLost(RemotingConnection remotingConnection) {
    }

    public void onConnectionAuthenticated(RemotingConnection remotingConnection) {
    }

    public boolean handlesTopic(Topic topic) {
        return topicTokenCountGreaterThan(topic, 2) && topicMatches(topic);
    }

    public boolean checkCanSubscribe(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        if (keycloakSecurityContext == null) {
            getLogger().finest("Anonymous connection subscriptions not supported by this handler, topic=" + topic + ", " + MQTTBrokerService.connectionToString(remotingConnection));
            return false;
        }
        if (topicRealmAllowed(keycloakSecurityContext, topic) && topicClientIdMatches(remotingConnection, topic)) {
            return canSubscribe(remotingConnection, keycloakSecurityContext, topic);
        }
        getLogger().finest("Topic realm and client ID tokens must match the connection, topic=" + topic + ", " + MQTTBrokerService.connectionToString(remotingConnection));
        return false;
    }

    public boolean checkCanPublish(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        if (keycloakSecurityContext == null) {
            getLogger().finest("Anonymous connection publishes not supported by this handler topic=" + topic + ", " + MQTTBrokerService.connectionToString(remotingConnection));
            return false;
        }
        if (topicRealmAllowed(keycloakSecurityContext, topic) && topicClientIdMatches(remotingConnection, topic)) {
            return canPublish(remotingConnection, keycloakSecurityContext, topic);
        }
        getLogger().finest("Topic realm and client ID tokens must match the connection topic=" + topic + ", " + MQTTBrokerService.connectionToString(remotingConnection));
        return false;
    }

    public void onUserAssetLinksChanged(RemotingConnection remotingConnection, List<PersistenceEvent<UserAssetLink>> list) {
    }

    public void publishMessage(String str, Object obj, MqttQoS mqttQoS) {
        try {
            if (this.clientSession != null) {
                synchronized (this.clientSession) {
                    ClientMessage createMessage = this.clientSession.createMessage(false);
                    createMessage.putIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, mqttQoS.value());
                    createMessage.writeBodyBufferBytes((byte[]) ValueUtil.asJSON(obj).map((v0) -> {
                        return v0.getBytes();
                    }).orElseThrow(() -> {
                        return new IllegalStateException("Failed to convert payload to JSON string: " + obj);
                    }));
                    this.producer.send(MQTTUtil.getCoreAddressFromMqttTopic(str, this.mqttBrokerService.getWildcardConfiguration()), createMessage);
                }
            }
        } catch (Exception e) {
            getLogger().log(Level.WARNING, "Couldn't publish to MQTT client: topic=" + str, (Throwable) e);
        }
    }

    protected abstract boolean topicMatches(Topic topic);

    protected abstract Logger getLogger();

    public abstract boolean canSubscribe(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic);

    public abstract boolean canPublish(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic);

    public abstract void onSubscribe(RemotingConnection remotingConnection, Topic topic);

    public abstract void onUnsubscribe(RemotingConnection remotingConnection, Topic topic);

    public abstract Set<String> getPublishListenerTopics();

    public abstract void onPublish(RemotingConnection remotingConnection, Topic topic, ByteBuf byteBuf);

    public static String topicRealm(Topic topic) {
        return topicTokenIndexToString(topic, 0);
    }

    public static String topicClientID(Topic topic) {
        return topicTokenIndexToString(topic, 1);
    }

    public static boolean topicRealmAllowed(KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        return (keycloakSecurityContext != null && keycloakSecurityContext.getRealm().equals(topicRealm(topic))) || KeycloakIdentityProvider.isSuperUser(keycloakSecurityContext);
    }

    public static boolean topicClientIdMatches(RemotingConnection remotingConnection, Topic topic) {
        return remotingConnection != null && Objects.equals(remotingConnection.getClientID(), topicTokenIndexToString(topic, 1));
    }

    public static boolean topicTokenCountGreaterThan(Topic topic, int i) {
        return topic.getTokens() != null && topic.getTokens().size() > i;
    }

    public static String topicTokenIndexToString(Topic topic, int i) {
        if (topicTokenCountGreaterThan(topic, i)) {
            return topic.getTokens().get(i);
        }
        return null;
    }

    protected static Subject getSubjectFromConnection(RemotingConnection remotingConnection) {
        if (remotingConnection != null) {
            return remotingConnection.getSubject();
        }
        return null;
    }

    protected static KeycloakSecurityContext getSecurityContextFromSubject(Subject subject) {
        return KeycloakIdentityProvider.getSecurityContext(subject);
    }

    protected static Optional<AuthContext> getAuthContextFromConnection(RemotingConnection remotingConnection) {
        return Optional.ofNullable(getSubjectFromConnection(remotingConnection)).map(MQTTHandler::getSecurityContextFromSubject).map(MQTTHandler::getAuthContextFromSecurityContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static AuthContext getAuthContextFromSecurityContext(KeycloakSecurityContext keycloakSecurityContext) {
        if (keycloakSecurityContext == null) {
            return null;
        }
        return new AccessTokenAuthContext(keycloakSecurityContext.getRealm(), keycloakSecurityContext.getToken());
    }
}
