package org.openremote.manager.mqtt;

import io.netty.buffer.ByteBuf;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.camel.builder.RouteBuilder;
import org.keycloak.KeycloakSecurityContext;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.persistence.PersistenceService;
import org.openremote.container.security.keycloak.KeycloakIdentityProvider;
import org.openremote.manager.asset.AssetProcessingService;
import org.openremote.manager.asset.AssetStorageService;
import org.openremote.manager.gateway.GatewayService;
import org.openremote.manager.security.ManagerIdentityProvider;
import org.openremote.model.Container;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.Asset;
import org.openremote.model.attribute.Attribute;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.attribute.AttributeMap;
import org.openremote.model.attribute.AttributeRef;
import org.openremote.model.query.AssetQuery;
import org.openremote.model.query.filter.AttributePredicate;
import org.openremote.model.query.filter.NameValuePredicate;
import org.openremote.model.query.filter.ValuePredicate;
import org.openremote.model.syslog.SyslogCategory;
import org.openremote.model.util.Pair;
import org.openremote.model.value.MetaItemType;
import org.openremote.model.value.ValueType;

/* loaded from: input_file:org/openremote/manager/mqtt/ConnectionMonitorHandler.class */
public class ConnectionMonitorHandler extends MQTTHandler {
    protected static final Logger LOG = SyslogCategory.getLogger(SyslogCategory.API, ConnectionMonitorHandler.class);
    protected MQTTBrokerService mqttBrokerService;
    protected ExecutorService executorService;
    protected AssetStorageService assetStorageService;
    protected AssetProcessingService assetProcessingService;
    protected GatewayService gatewayService;
    protected PersistenceService persistenceService;
    protected ConcurrentMap<String, Set<AttributeRef>> userIDAttributeRefs = new ConcurrentHashMap();

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void init(Container container, Configuration configuration) throws Exception {
        super.init(container, configuration);
        this.executorService = container.getExecutor();
        this.mqttBrokerService = (MQTTBrokerService) container.getService(MQTTBrokerService.class);
        this.assetStorageService = (AssetStorageService) container.getService(AssetStorageService.class);
        this.assetProcessingService = (AssetProcessingService) container.getService(AssetProcessingService.class);
        this.gatewayService = (GatewayService) container.getService(GatewayService.class);
        this.persistenceService = container.getService(PersistenceService.class);
        container.getService(MessageBrokerService.class).getContext().addRoutes(new RouteBuilder() { // from class: org.openremote.manager.mqtt.ConnectionMonitorHandler.1
            public void configure() throws Exception {
                from("seda://PersistenceTopic?multipleConsumers=true&concurrentConsumers=1&waitForTaskToComplete=NEVER&purgeWhenStopping=true&discardIfNoConsumers=true&size=25000").routeId("Persistence-MQTTConnectedAttributes").filter(PersistenceService.isPersistenceEventForEntityType(Asset.class)).filter(GatewayService.isNotForGateway(ConnectionMonitorHandler.this.gatewayService)).process(exchange -> {
                    PersistenceEvent persistenceEvent = (PersistenceEvent) exchange.getIn().getBody(PersistenceEvent.class);
                    if (persistenceEvent.hasPropertyChanged("attributes")) {
                        Asset asset = (Asset) persistenceEvent.getEntity();
                        AttributeMap attributeMap = (AttributeMap) persistenceEvent.getPreviousState("attributes");
                        AttributeMap attributeMap2 = (AttributeMap) persistenceEvent.getCurrentState("attributes");
                        if (attributeMap != null) {
                            attributeMap.stream().filter(ConnectionMonitorHandler::attributeMatches).forEach(attribute -> {
                                attribute.getMetaItem(MetaItemType.USER_CONNECTED).flatMap((v0) -> {
                                    return v0.getValue();
                                }).ifPresent(str -> {
                                    ConnectionMonitorHandler.this.removeSessionAttribute(str, new AttributeRef(asset.getId(), attribute.getName()));
                                });
                            });
                        }
                        if (attributeMap2 != null) {
                            ConnectionMonitorHandler.this.addSessionAttributes(asset.getRealm(), attributeMap2.stream().filter(ConnectionMonitorHandler::attributeMatches).map(attribute2 -> {
                                return new Pair(asset.getId(), attribute2);
                            }).toList());
                        }
                    }
                }).end();
            }
        });
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void start(Container container) throws Exception {
        this.executorService.submit(() -> {
            ((Map) this.assetStorageService.findAll(new AssetQuery().attributes(new AttributePredicate[]{new AttributePredicate().meta(new NameValuePredicate[]{new NameValuePredicate(MetaItemType.USER_CONNECTED, (ValuePredicate) null)})})).stream().collect(Collectors.groupingBy((v0) -> {
                return v0.getRealm();
            }))).forEach((str, list) -> {
                addSessionAttributes(str, list.stream().flatMap(asset -> {
                    return asset.getAttributes().stream().filter(ConnectionMonitorHandler::attributeMatches).map(attribute -> {
                        return new Pair(asset.getId(), attribute);
                    });
                }).toList());
            });
        });
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void stop() throws Exception {
        super.stop();
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onConnect(RemotingConnection remotingConnection) {
        super.onConnect(remotingConnection);
        Pair<String, Set<AttributeRef>> userIDAndAttributeRefs = getUserIDAndAttributeRefs(remotingConnection);
        if (userIDAndAttributeRefs != null) {
            updateUserConnectedStatus((String) userIDAndAttributeRefs.key, (Collection) userIDAndAttributeRefs.value, true);
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onDisconnect(RemotingConnection remotingConnection) {
        super.onDisconnect(remotingConnection);
        Pair<String, Set<AttributeRef>> userIDAndAttributeRefs = getUserIDAndAttributeRefs(remotingConnection);
        if (userIDAndAttributeRefs != null) {
            updateUserConnectedStatus((String) userIDAndAttributeRefs.key, (Collection) userIDAndAttributeRefs.value, false);
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onConnectionLost(RemotingConnection remotingConnection) {
        super.onConnectionLost(remotingConnection);
        Pair<String, Set<AttributeRef>> userIDAndAttributeRefs = getUserIDAndAttributeRefs(remotingConnection);
        if (userIDAndAttributeRefs != null) {
            updateUserConnectedStatus((String) userIDAndAttributeRefs.key, (Collection) userIDAndAttributeRefs.value, false);
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onConnectionAuthenticated(RemotingConnection remotingConnection) {
        super.onConnectionAuthenticated(remotingConnection);
        Pair<String, Set<AttributeRef>> userIDAndAttributeRefs = getUserIDAndAttributeRefs(remotingConnection);
        if (userIDAndAttributeRefs != null) {
            updateUserConnectedStatus((String) userIDAndAttributeRefs.key, (Collection) userIDAndAttributeRefs.value, true);
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    protected boolean topicMatches(Topic topic) {
        return false;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    protected Logger getLogger() {
        return LOG;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean canSubscribe(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        return false;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean canPublish(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        return false;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public Set<String> getPublishListenerTopics() {
        return null;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onPublish(RemotingConnection remotingConnection, Topic topic, ByteBuf byteBuf) {
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onSubscribe(RemotingConnection remotingConnection, Topic topic) {
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onUnsubscribe(RemotingConnection remotingConnection, Topic topic) {
    }

    protected void addSessionAttributes(String str, List<Pair<String, Attribute<?>>> list) {
        LOG.finest("Adding '" + list.size() + "' attributes(s) with user linked attributes in realm: " + str);
        List list2 = list.stream().map(pair -> {
            return (String) ((Attribute) pair.getValue()).getMetaValue(MetaItemType.USER_CONNECTED).orElse(null);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).distinct().map(str2 -> {
            return str2.startsWith("service-account-") ? str2 : "service-account-" + str2;
        }).toList();
        List<String> userIds = ManagerIdentityProvider.getUserIds(this.persistenceService, str, list2);
        list.forEach(pair2 -> {
            ((Attribute) pair2.getValue()).getMetaValue(MetaItemType.USER_CONNECTED).ifPresent(str3 -> {
                String str3 = (String) userIds.get(list2.indexOf("service-account-" + str3));
                if (str3 == null) {
                    LOG.warning("Invalid username so skipping add session attributes: realm=" + str + ", username=" + str3);
                } else {
                    addSessionAttribute(str3, new AttributeRef((String) pair2.key, ((Attribute) pair2.getValue()).getName()));
                }
            });
        });
    }

    protected void addSessionAttribute(String str, AttributeRef attributeRef) {
        LOG.finest("Adding userID '" + str + "' monitoring for attribute: " + attributeRef);
        updateUserConnectedStatus(str, Collections.singletonList(attributeRef), !this.mqttBrokerService.getUserConnections(str).isEmpty());
        this.userIDAttributeRefs.computeIfAbsent(str, str2 -> {
            return ConcurrentHashMap.newKeySet();
        }).add(attributeRef);
    }

    protected void removeSessionAttribute(String str, AttributeRef attributeRef) {
        LOG.finest("Removing userID '" + str + "' monitoring for attribute: " + attributeRef);
        updateUserConnectedStatus(str, Collections.singletonList(attributeRef), false);
        this.userIDAttributeRefs.computeIfPresent(str, (str2, set) -> {
            set.remove(attributeRef);
            if (set.isEmpty()) {
                return null;
            }
            return set;
        });
    }

    protected void updateUserConnectedStatus(String str, Collection<AttributeRef> collection, boolean z) {
        Set<RemotingConnection> userConnections = this.mqttBrokerService.getUserConnections(str);
        if (z) {
            if (userConnections.size() > 1) {
                LOG.finest("Connections already exist for user so skipping status update: " + str);
                return;
            }
        } else if (!userConnections.isEmpty()) {
            LOG.finest("Other connections remain for user so skipping status update: " + str);
            return;
        }
        LOG.fine("Updating connected status for '" + str + "' on " + collection.size() + " attribute(s) connected=" + z);
        collection.forEach(attributeRef -> {
            this.assetProcessingService.sendAttributeEvent(new AttributeEvent(attributeRef, Boolean.valueOf(z)), getClass().getSimpleName());
        });
    }

    protected Pair<String, Set<AttributeRef>> getUserIDAndAttributeRefs(RemotingConnection remotingConnection) {
        String subjectId = KeycloakIdentityProvider.getSubjectId(remotingConnection.getSubject());
        if (subjectId != null) {
            Set<AttributeRef> set = this.userIDAttributeRefs.get(subjectId);
            if (set == null) {
                return null;
            }
            return new Pair<>(subjectId, set);
        }
        if (!LOG.isLoggable(Level.FINEST)) {
            return null;
        }
        Logger logger = LOG;
        MQTTBrokerService mQTTBrokerService = this.mqttBrokerService;
        logger.finest("Anonymous connection so cannot determine userID: " + MQTTBrokerService.connectionToString(remotingConnection));
        return null;
    }

    protected static boolean attributeMatches(Attribute<?> attribute) {
        return Objects.equals(attribute.getType(), ValueType.BOOLEAN) && attribute.hasMeta(MetaItemType.USER_CONNECTED);
    }
}
