package org.openremote.manager.mqtt;

import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.charset.StandardCharsets;
import java.security.cert.CertificateException;
import java.security.cert.CertificateExpiredException;
import java.security.cert.CertificateNotYetValidException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.camel.builder.RouteBuilder;
import org.keycloak.KeycloakSecurityContext;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.persistence.PersistenceService;
import org.openremote.container.timer.TimerService;
import org.openremote.manager.asset.AssetStorageService;
import org.openremote.manager.provisioning.ProvisioningService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.manager.security.ManagerKeycloakIdentityProvider;
import org.openremote.model.Container;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.Asset;
import org.openremote.model.asset.UserAssetLink;
import org.openremote.model.provisioning.ErrorResponseMessage;
import org.openremote.model.provisioning.ProvisioningConfig;
import org.openremote.model.provisioning.ProvisioningMessage;
import org.openremote.model.provisioning.ProvisioningUtil;
import org.openremote.model.provisioning.SuccessResponseMessage;
import org.openremote.model.provisioning.X509ProvisioningConfig;
import org.openremote.model.provisioning.X509ProvisioningMessage;
import org.openremote.model.security.User;
import org.openremote.model.syslog.SyslogCategory;
import org.openremote.model.util.TextUtil;
import org.openremote.model.util.UniqueIdentifierGenerator;
import org.openremote.model.util.ValueUtil;

/* loaded from: input_file:org/openremote/manager/mqtt/UserAssetProvisioningMQTTHandler.class */
public class UserAssetProvisioningMQTTHandler extends MQTTHandler {
    protected static final Logger LOG = SyslogCategory.getLogger(SyslogCategory.API, UserAssetProvisioningMQTTHandler.class);
    public static final String PROVISIONING_TOKEN = "provisioning";
    public static final String REQUEST_TOKEN = "request";
    public static final String RESPONSE_TOKEN = "response";
    public static final String UNIQUE_ID_PLACEHOLDER = "%UNIQUE_ID%";
    public static final String PROVISIONING_USER_PREFIX = "ps-";
    protected ProvisioningService provisioningService;
    protected TimerService timerService;
    protected AssetStorageService assetStorageService;
    protected ManagerKeycloakIdentityProvider identityProvider;
    protected boolean isKeycloak;
    protected final ConcurrentMap<Long, Set<RemotingConnection>> provisioningConfigAuthenticatedConnectionMap = new ConcurrentHashMap();
    protected Timer provisioningTimer;

    /* loaded from: input_file:org/openremote/manager/mqtt/UserAssetProvisioningMQTTHandler$ProvisioningPersistenceRouteBuilder.class */
    protected static class ProvisioningPersistenceRouteBuilder extends RouteBuilder {
        UserAssetProvisioningMQTTHandler mqttHandler;

        public ProvisioningPersistenceRouteBuilder(UserAssetProvisioningMQTTHandler userAssetProvisioningMQTTHandler) {
            this.mqttHandler = userAssetProvisioningMQTTHandler;
        }

        public void configure() throws Exception {
            from("seda://PersistenceTopic?multipleConsumers=true&concurrentConsumers=1&waitForTaskToComplete=NEVER&purgeWhenStopping=true&discardIfNoConsumers=true&size=25000").routeId("Persistence-ProvisioningConfig").filter(PersistenceService.isPersistenceEventForEntityType(ProvisioningConfig.class)).process(exchange -> {
                PersistenceEvent persistenceEvent = (PersistenceEvent) exchange.getIn().getBody(PersistenceEvent.class);
                boolean z = persistenceEvent.getCause() == PersistenceEvent.Cause.DELETE;
                if (persistenceEvent.getCause() == PersistenceEvent.Cause.UPDATE) {
                    z = persistenceEvent.hasPropertyChanged("disabled") || persistenceEvent.hasPropertyChanged("data");
                }
                if (z) {
                    UserAssetProvisioningMQTTHandler.LOG.fine("Provisioning config modified or deleted so forcing connected clients to disconnect: " + persistenceEvent.getEntity());
                    this.mqttHandler.forceClientDisconnects(((ProvisioningConfig) persistenceEvent.getEntity()).getId().longValue());
                }
            });
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void init(Container container, Configuration configuration) throws Exception {
        super.init(container, configuration);
        if (container.getMeterRegistry() != null) {
            this.provisioningTimer = container.getMeterRegistry().timer("or.provisioning", Tags.empty());
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void start(Container container) throws Exception {
        super.start(container);
        this.provisioningService = (ProvisioningService) container.getService(ProvisioningService.class);
        this.timerService = container.getService(TimerService.class);
        this.assetStorageService = (AssetStorageService) container.getService(AssetStorageService.class);
        ManagerIdentityService service = container.getService(ManagerIdentityService.class);
        if (!service.isKeycloakEnabled()) {
            LOG.warning("MQTT connections are not supported when not using Keycloak identity provider");
            this.isKeycloak = false;
        } else {
            this.isKeycloak = true;
            this.identityProvider = (ManagerKeycloakIdentityProvider) service.getIdentityProvider();
            container.getService(MessageBrokerService.class).getContext().addRoutes(new ProvisioningPersistenceRouteBuilder(this));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.openremote.manager.mqtt.MQTTHandler
    public AddressSettings getPublishTopicAddressSettings(Container container, String str) {
        AddressSettings publishTopicAddressSettings = super.getPublishTopicAddressSettings(container, str);
        if (publishTopicAddressSettings != null) {
            publishTopicAddressSettings.setMaxSizeMessages(1000L);
        }
        return publishTopicAddressSettings;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean handlesTopic(Topic topic) {
        return topicMatches(topic);
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean checkCanSubscribe(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        if (canSubscribe(remotingConnection, keycloakSecurityContext, topic)) {
            return true;
        }
        getLogger().fine("Cannot subscribe to this topic, topic=" + topic + ", " + MQTTBrokerService.connectionToString(remotingConnection));
        return false;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean checkCanPublish(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        if (canPublish(remotingConnection, keycloakSecurityContext, topic)) {
            return true;
        }
        getLogger().fine("Cannot publish to this topic, topic=" + topic + ", " + MQTTBrokerService.connectionToString(remotingConnection));
        return false;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean topicMatches(Topic topic) {
        return isProvisioningTopic(topic) && topic.getTokens().size() == 3 && (isRequestTopic(topic) || isResponseTopic(topic));
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    protected Logger getLogger() {
        return LOG;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean canSubscribe(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        if (this.isKeycloak) {
            return (!isResponseTopic(topic) || "#".equals(topicTokenIndexToString(topic, 1)) || "+".equals(topicTokenIndexToString(topic, 1))) ? false : true;
        }
        LOG.fine("Identity provider is not keycloak");
        return false;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onSubscribe(RemotingConnection remotingConnection, Topic topic) {
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onUnsubscribe(RemotingConnection remotingConnection, Topic topic) {
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public Set<String> getPublishListenerTopics() {
        return Set.of("provisioning/+/request");
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean canPublish(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        if (this.isKeycloak) {
            return (!isRequestTopic(topic) || "#".equals(topicTokenIndexToString(topic, 1)) || "+".equals(topicTokenIndexToString(topic, 1))) ? false : true;
        }
        LOG.fine("Identity provider is not keycloak");
        return false;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onPublish(RemotingConnection remotingConnection, Topic topic, ByteBuf byteBuf) {
        if (remotingConnection.getTransportConnection().isOpen()) {
            this.executorService.submit(() -> {
                if (this.provisioningTimer != null) {
                    this.provisioningTimer.record(() -> {
                        processProvisioningRequest(remotingConnection, topic, byteBuf);
                    });
                } else {
                    processProvisioningRequest(remotingConnection, topic, byteBuf);
                }
            });
        } else {
            LOG.finest(() -> {
                return "Skipping provisioning request as connection is now closed: " + MQTTBrokerService.connectionToString(remotingConnection);
            });
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onConnectionLost(RemotingConnection remotingConnection) {
        this.provisioningConfigAuthenticatedConnectionMap.values().forEach(set -> {
            set.remove(remotingConnection);
        });
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onDisconnect(RemotingConnection remotingConnection) {
        this.provisioningConfigAuthenticatedConnectionMap.values().forEach(set -> {
            set.remove(remotingConnection);
        });
    }

    protected void processProvisioningRequest(RemotingConnection remotingConnection, Topic topic, ByteBuf byteBuf) {
        ProvisioningMessage provisioningMessage = (ProvisioningMessage) ValueUtil.parse(byteBuf.toString(StandardCharsets.UTF_8), ProvisioningMessage.class).orElseGet(() -> {
            LOG.info("Failed to parse provisioning request message from client: " + MQTTBrokerService.connectionToString(remotingConnection));
            publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.MESSAGE_INVALID), MqttQoS.AT_MOST_ONCE);
            return null;
        });
        if (provisioningMessage != null && (provisioningMessage instanceof X509ProvisioningMessage)) {
            processX509ProvisioningMessage(remotingConnection, topic, (X509ProvisioningMessage) provisioningMessage);
        }
    }

    protected static boolean isProvisioningTopic(Topic topic) {
        return PROVISIONING_TOKEN.equals(topicTokenIndexToString(topic, 0));
    }

    protected static boolean isRequestTopic(Topic topic) {
        return REQUEST_TOKEN.equals(topicTokenIndexToString(topic, 2));
    }

    protected static boolean isResponseTopic(Topic topic) {
        return RESPONSE_TOKEN.equals(topicTokenIndexToString(topic, 2));
    }

    protected String getResponseTopic(Topic topic) {
        return "provisioning/" + topicTokenIndexToString(topic, 1) + "/response";
    }

    protected void processX509ProvisioningMessage(RemotingConnection remotingConnection, Topic topic, X509ProvisioningMessage x509ProvisioningMessage) {
        LOG.fine(() -> {
            return "Processing X.509 provisioning message: " + MQTTBrokerService.connectionToString(remotingConnection);
        });
        if (TextUtil.isNullOrEmpty(x509ProvisioningMessage.getCert())) {
            LOG.info("Certificate is missing from X509 provisioning message" + MQTTBrokerService.connectionToString(remotingConnection));
            publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.CERTIFICATE_INVALID), MqttQoS.AT_MOST_ONCE);
            return;
        }
        try {
            X509Certificate x509Certificate = ProvisioningUtil.getX509Certificate(x509ProvisioningMessage.getCert());
            X509ProvisioningConfig matchingX509ProvisioningConfig = getMatchingX509ProvisioningConfig(remotingConnection, x509Certificate);
            if (matchingX509ProvisioningConfig == null) {
                LOG.fine("No matching provisioning config found for X.509 certificate: " + MQTTBrokerService.connectionToString(remotingConnection));
                publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.UNAUTHORIZED), MqttQoS.AT_MOST_ONCE);
                return;
            }
            if (matchingX509ProvisioningConfig.isDisabled()) {
                LOG.fine("Matching provisioning config is disabled for X.509 certificate: " + MQTTBrokerService.connectionToString(remotingConnection));
                publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.CONFIG_DISABLED), MqttQoS.AT_MOST_ONCE);
                return;
            }
            String subjectCN = ProvisioningUtil.getSubjectCN(x509Certificate.getSubjectX500Principal());
            String str = topicTokenIndexToString(topic, 1);
            if (TextUtil.isNullOrEmpty(subjectCN)) {
                LOG.info(() -> {
                    return "X.509 certificate missing unique ID in subject CN: " + MQTTBrokerService.connectionToString(remotingConnection);
                });
                publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.UNIQUE_ID_MISMATCH), MqttQoS.AT_MOST_ONCE);
                return;
            }
            if (TextUtil.isNullOrEmpty(str) || !subjectCN.equals(str)) {
                LOG.info(() -> {
                    return "X.509 certificate unique ID doesn't match topic unique ID: " + MQTTBrokerService.connectionToString(remotingConnection);
                });
                publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.UNIQUE_ID_MISMATCH), MqttQoS.AT_MOST_ONCE);
                return;
            }
            String realm = matchingX509ProvisioningConfig.getRealm();
            try {
                LOG.finest("Checking service user: " + str);
                User createClientServiceUser = getCreateClientServiceUser(realm, this.identityProvider, str, matchingX509ProvisioningConfig);
                if (!createClientServiceUser.getEnabled().booleanValue()) {
                    LOG.info(() -> {
                        return "Service user exists and has been disabled so cannot continue:  " + MQTTBrokerService.connectionToString(remotingConnection);
                    });
                    publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.USER_DISABLED), MqttQoS.AT_MOST_ONCE);
                    return;
                }
                LOG.finest("Service user exists and is enabled");
                try {
                    LOG.finest(() -> {
                        return "Checking provisioned asset: " + str;
                    });
                    Asset<?> createClientAsset = getCreateClientAsset(this.assetStorageService, realm, str, createClientServiceUser, matchingX509ProvisioningConfig);
                    if (createClientAsset != null && !matchingX509ProvisioningConfig.getRealm().equals(createClientAsset.getRealm())) {
                        LOG.warning("Client asset realm mismatch");
                        publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.ASSET_ERROR), MqttQoS.AT_MOST_ONCE);
                    } else {
                        this.mqttBrokerService.authenticateConnection(remotingConnection, realm, createClientServiceUser.getUsername(), createClientServiceUser.getSecret());
                        this.provisioningConfigAuthenticatedConnectionMap.compute(matchingX509ProvisioningConfig.getId(), (l, set) -> {
                            if (set == null) {
                                set = new HashSet();
                            }
                            set.add(remotingConnection);
                            return set;
                        });
                        LOG.fine("Client successfully provisioned: " + str);
                        publishMessage(getResponseTopic(topic), new SuccessResponseMessage(realm, createClientAsset), MqttQoS.AT_MOST_ONCE);
                    }
                } catch (Exception e) {
                    LOG.log(Level.WARNING, "Failed to retrieve/create asset: " + MQTTBrokerService.connectionToString(remotingConnection) + ", config=" + matchingX509ProvisioningConfig, (Throwable) e);
                    publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.SERVER_ERROR), MqttQoS.AT_MOST_ONCE);
                }
            } catch (Exception e2) {
                LOG.log(Level.WARNING, "Failed to retrieve/create service user: " + MQTTBrokerService.connectionToString(remotingConnection), (Throwable) e2);
                publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.SERVER_ERROR), MqttQoS.AT_MOST_ONCE);
            }
        } catch (CertificateException e3) {
            LOG.log(Level.INFO, "Failed to parse X.509 certificate: " + MQTTBrokerService.connectionToString(remotingConnection), (Throwable) e3);
            publishMessage(getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.CERTIFICATE_INVALID), MqttQoS.AT_MOST_ONCE);
        }
    }

    protected X509ProvisioningConfig getMatchingX509ProvisioningConfig(RemotingConnection remotingConnection, X509Certificate x509Certificate) {
        return (X509ProvisioningConfig) this.provisioningService.getProvisioningConfigs().stream().filter(provisioningConfig -> {
            return provisioningConfig instanceof X509ProvisioningConfig;
        }).map(provisioningConfig2 -> {
            return (X509ProvisioningConfig) provisioningConfig2;
        }).filter(x509ProvisioningConfig -> {
            try {
                X509Certificate certificate = x509ProvisioningConfig.getCertificate();
                if (certificate != null && certificate.getSubjectX500Principal().getName().equals(x509Certificate.getIssuerX500Principal().getName())) {
                    LOG.finest(() -> {
                        return "Client certificate issuer matches provisioning config CA certificate subject: " + MQTTBrokerService.connectionToString(remotingConnection) + ", config=" + x509ProvisioningConfig;
                    });
                    Date from = Date.from(this.timerService.getNow());
                    try {
                        x509Certificate.verify(certificate.getPublicKey());
                        LOG.finest(() -> {
                            return "Client certificate verified against CA certificate: " + MQTTBrokerService.connectionToString(remotingConnection) + ", config=" + x509ProvisioningConfig;
                        });
                        if (x509ProvisioningConfig.getData().isIgnoreExpiryDate()) {
                            return true;
                        }
                        LOG.finest(() -> {
                            return "Validating client certificate validity: " + MQTTBrokerService.connectionToString(remotingConnection) + ", timestamp=" + from;
                        });
                        x509Certificate.checkValidity(from);
                        return true;
                    } catch (CertificateExpiredException | CertificateNotYetValidException e) {
                        LOG.log(Level.INFO, "Client certificate failed validity check: " + MQTTBrokerService.connectionToString(remotingConnection) + ", timestamp=" + from, e);
                    } catch (Exception e2) {
                        LOG.log(Level.INFO, "Client certificate failed verification against CA certificate: " + MQTTBrokerService.connectionToString(remotingConnection) + ", config=" + x509ProvisioningConfig, (Throwable) e2);
                    }
                }
                return false;
            } catch (Exception e3) {
                LOG.log(Level.WARNING, "Failed to extract certificate from provisioning config: " + MQTTBrokerService.connectionToString(remotingConnection) + ", config=" + x509ProvisioningConfig, (Throwable) e3);
                return false;
            }
        }).findFirst().orElse(null);
    }

    public static User getCreateClientServiceUser(String str, ManagerKeycloakIdentityProvider managerKeycloakIdentityProvider, String str2, ProvisioningConfig<?, ?> provisioningConfig) throws RuntimeException {
        String str3 = "ps-" + str2;
        User userByUsername = managerKeycloakIdentityProvider.getUserByUsername(str, "service-account-" + str3);
        if (userByUsername != null) {
            LOG.fine("Service user found: realm=" + str + ", username=" + str3);
            return userByUsername;
        }
        LOG.finest("Creating service user: realm=" + str + ", username=" + str3);
        User username = new User().setServiceAccount(true).setEnabled(true).setUsername(str3);
        String generateId = UniqueIdentifierGenerator.generateId();
        User createUpdateUser = managerKeycloakIdentityProvider.createUpdateUser(str, username, generateId, true);
        if (provisioningConfig.getUserRoles() == null || provisioningConfig.getUserRoles().length <= 0) {
            LOG.finest("No user roles defined: realm=" + str + ", username=" + str3);
        } else {
            LOG.finest("Setting user roles: realm=" + str + ", username=" + str3 + ", roles=" + Arrays.toString(provisioningConfig.getUserRoles()));
            managerKeycloakIdentityProvider.updateUserRoles(str, createUpdateUser.getId(), ManagerKeycloakIdentityProvider.DEFAULT_REALM_KEYCLOAK_THEME_DEFAULT, (String[]) Arrays.stream(provisioningConfig.getUserRoles()).map((v0) -> {
                return v0.getValue();
            }).toArray(i -> {
                return new String[i];
            }));
        }
        if (provisioningConfig.isRestrictedUser()) {
            LOG.finest("User will be made restricted: realm=" + str + ", username=" + str3);
            managerKeycloakIdentityProvider.updateUserRealmRoles(str, createUpdateUser.getId(), managerKeycloakIdentityProvider.addRealmRoles(str, createUpdateUser.getId(), "restricted_user"));
        }
        createUpdateUser.setSecret(generateId);
        return createUpdateUser;
    }

    public static Asset<?> getCreateClientAsset(AssetStorageService assetStorageService, String str, String str2, User user, ProvisioningConfig<?, ?> provisioningConfig) throws RuntimeException {
        String generateId = UniqueIdentifierGenerator.generateId(str + str2);
        Asset<?> find = assetStorageService.find(generateId);
        if (find != null) {
            LOG.finest("Asset exists");
            return find;
        }
        LOG.finest("Creating client asset: realm=" + str + ", username=" + user.getUsername());
        if (TextUtil.isNullOrEmpty(provisioningConfig.getAssetTemplate())) {
            LOG.finest("Provisioning config doesn't contain an asset template: " + provisioningConfig);
            return null;
        }
        Asset asset = (Asset) ValueUtil.parse(provisioningConfig.getAssetTemplate().replaceAll(UNIQUE_ID_PLACEHOLDER, str2), Asset.class).orElseThrow(() -> {
            return new RuntimeException("Failed to de-serialise asset template into an asset instance: " + provisioningConfig);
        });
        asset.setId(generateId);
        asset.setRealm(str);
        Asset<?> merge = assetStorageService.merge(asset);
        if (provisioningConfig.isRestrictedUser()) {
            assetStorageService.storeUserAssetLinks(Collections.singletonList(new UserAssetLink(str, user.getId(), generateId)));
        }
        return merge;
    }

    protected void forceClientDisconnects(long j) {
        this.provisioningConfigAuthenticatedConnectionMap.computeIfPresent(Long.valueOf(j), (l, set) -> {
            set.forEach(remotingConnection -> {
                try {
                    Logger logger = LOG;
                    MQTTBrokerService.connectionToString(remotingConnection);
                    logger.fine("Force disconnecting client that is using provisioning config ID '" + j + "': " + logger);
                    remotingConnection.disconnect(false);
                } catch (Exception e) {
                    getLogger().log(Level.WARNING, "Failed to disconnect client: " + MQTTBrokerService.connectionToString(remotingConnection), (Throwable) e);
                }
            });
            set.clear();
            return set;
        });
    }
}
