/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.activemq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.elasticsoftware.elasticactors.PhysicalNode;
import org.elasticsoftware.elasticactors.activemq.LocalMessageQueue;
import org.elasticsoftware.elasticactors.activemq.RemoteMessageQueue;
import org.elasticsoftware.elasticactors.messaging.MessageHandler;
import org.elasticsoftware.elasticactors.messaging.MessageQueue;
import org.elasticsoftware.elasticactors.messaging.MessageQueueFactory;
import org.elasticsoftware.elasticactors.messaging.MessageQueueFactoryFactory;
import org.elasticsoftware.elasticactors.messaging.MessagingService;
import org.elasticsoftware.elasticactors.serialization.internal.InternalMessageDeserializer;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

public final class ActiveMQArtemisMessagingService
implements MessagingService {
    private static final Logger logger = LoggerFactory.getLogger(ActiveMQArtemisMessagingService.class);
    private static final String QUEUE_NAME_FORMAT = "%s/%s";
    private static final String EA_ADDRESS_FORMAT = "ea.%s";
    private static final int SERVER_DEFAULT_PORT = 61616;
    private final String activeMQHosts;
    private final String activeMQUsername;
    private final String activeMQPassword;
    private final String elasticActorsCluster;
    private final LocalMessageQueueFactory localMessageQueueFactory;
    private final RemoteMessageQueueFactory remoteMessageQueueFactory;
    private final RemoteActorSystemMessageQueueFactoryFactory remoteActorSystemMessageQueueFactoryFactory;
    private final ThreadBoundExecutor queueExecutor;
    private final InternalMessageDeserializer internalMessageDeserializer;
    private ServerLocator serverLocator;
    private ClientSessionFactory clientSessionFactory;
    private final boolean useMessageHandler;
    private final boolean useImmediateReceive;

    public ActiveMQArtemisMessagingService(String activeMQHosts, String activeMQUsername, String activeMQPassword, String elasticActorsCluster, ThreadBoundExecutor queueExecutor, InternalMessageDeserializer internalMessageDeserializer, boolean useMessageHandler, boolean useImmediateReceive) {
        this.activeMQHosts = activeMQHosts;
        this.activeMQUsername = activeMQUsername;
        this.activeMQPassword = activeMQPassword;
        this.elasticActorsCluster = elasticActorsCluster;
        this.queueExecutor = queueExecutor;
        this.internalMessageDeserializer = internalMessageDeserializer;
        this.localMessageQueueFactory = new LocalMessageQueueFactory();
        this.remoteMessageQueueFactory = new RemoteMessageQueueFactory();
        this.remoteActorSystemMessageQueueFactoryFactory = new RemoteActorSystemMessageQueueFactoryFactory();
        this.useMessageHandler = useMessageHandler;
        this.useImmediateReceive = useImmediateReceive;
    }

    @PostConstruct
    public void start() throws Exception {
        logger.info("Starting up messaging service");
        Set hosts = StringUtils.commaDelimitedListToSet((String)this.activeMQHosts);
        TransportConfiguration[] transportConfigurations = new TransportConfiguration[hosts.size()];
        int i = 0;
        for (String hostAndPort : hosts) {
            int port = 61616;
            String host = null;
            int idx = hostAndPort.lastIndexOf(":");
            if (idx > -1) {
                port = Integer.parseInt(hostAndPort.substring(idx + 1));
                host = hostAndPort.substring(0, idx);
            } else {
                host = hostAndPort;
            }
            transportConfigurations[i++] = this.createConnector(host, port);
        }
        this.serverLocator = ActiveMQClient.createServerLocatorWithHA((TransportConfiguration[])transportConfigurations);
        this.serverLocator.setBlockOnDurableSend(true);
        this.serverLocator.setBlockOnNonDurableSend(false);
        this.serverLocator.setUseGlobalPools(false);
        this.serverLocator.setClientFailureCheckPeriod(4000L);
        this.serverLocator.setFailoverOnInitialConnection(true);
        this.serverLocator.setScheduledThreadPoolMaxSize(1);
        this.serverLocator.setThreadPoolMaxSize(3);
        this.serverLocator.setInitialConnectAttempts(1);
        this.serverLocator.setMaxRetryInterval(32000L);
        this.serverLocator.setRetryInterval(1000L);
        this.serverLocator.setRetryIntervalMultiplier(2.0);
        this.serverLocator.setReconnectAttempts(-1);
        this.serverLocator.setConnectionTTL(-1L);
        this.serverLocator.setProducerMaxRate(-1);
        this.serverLocator.setProducerWindowSize(-1);
        this.serverLocator.setConsumerMaxRate(-1);
        this.serverLocator.setConsumerWindowSize(-1);
        this.clientSessionFactory = this.serverLocator.createSessionFactory();
    }

    private TransportConfiguration createConnector(String host, int port) {
        HashMap<String, Object> connectionParams = new HashMap<String, Object>();
        connectionParams.put("host", host);
        connectionParams.put("nioRemotingThreads", 2);
        connectionParams.put("port", port);
        connectionParams.put("tcpNoDelay", true);
        connectionParams.put("tcpSendBufferSize", 40000);
        connectionParams.put("tcpReceiveBufferSize", 40000);
        connectionParams.put("useNio", false);
        return new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
    }

    @PreDestroy
    public void stop() {
        logger.info("Stopping messaging service");
        this.clientSessionFactory.close();
        this.serverLocator.close();
    }

    public void sendWireMessage(String queueName, byte[] serializedMessage, PhysicalNode receiver) throws IOException {
    }

    private void ensureQueueExists(ClientSession clientSession, String queueName, String routingKey) throws ActiveMQException {
        ClientSession.QueueQuery queueQuery = clientSession.queueQuery(SimpleString.toSimpleString((String)queueName));
        if (!queueQuery.isExists()) {
            clientSession.createQueue(String.format(EA_ADDRESS_FORMAT, this.elasticActorsCluster), queueName, String.format("routingKey='%s'", routingKey), true);
        }
    }

    public MessageQueueFactory getLocalMessageQueueFactory() {
        return this.localMessageQueueFactory;
    }

    public MessageQueueFactory getRemoteMessageQueueFactory() {
        return this.remoteMessageQueueFactory;
    }

    public MessageQueueFactoryFactory getRemoteActorSystemMessageQueueFactoryFactory() {
        return this.remoteActorSystemMessageQueueFactoryFactory;
    }

    private final class RemoteActorSystemMessageQueueFactoryFactory
    implements MessageQueueFactoryFactory {
        private RemoteActorSystemMessageQueueFactoryFactory() {
        }

        public MessageQueueFactory create(String clusterName) {
            return new RemoteActorSystemMessageQueueFactory(clusterName);
        }
    }

    private final class RemoteActorSystemMessageQueueFactory
    implements MessageQueueFactory {
        private final String clusterName;

        private RemoteActorSystemMessageQueueFactory(String clusterName) {
            this.clusterName = clusterName;
        }

        public MessageQueue create(String name, MessageHandler messageHandler) throws Exception {
            String queueName = String.format(ActiveMQArtemisMessagingService.QUEUE_NAME_FORMAT, this.clusterName, name);
            ClientSession clientSession = ActiveMQArtemisMessagingService.this.clientSessionFactory.createSession(ActiveMQArtemisMessagingService.this.activeMQUsername, ActiveMQArtemisMessagingService.this.activeMQPassword, false, true, true, false, ActiveMQArtemisMessagingService.this.serverLocator.getAckBatchSize());
            ActiveMQArtemisMessagingService.this.ensureQueueExists(clientSession, queueName, name);
            RemoteMessageQueue messageQueue = new RemoteMessageQueue(queueName, name, clientSession, clientSession.createProducer(String.format(ActiveMQArtemisMessagingService.EA_ADDRESS_FORMAT, this.clusterName)));
            messageQueue.initialize();
            return messageQueue;
        }
    }

    private final class RemoteMessageQueueFactory
    implements MessageQueueFactory {
        private RemoteMessageQueueFactory() {
        }

        public MessageQueue create(String name, MessageHandler messageHandler) throws Exception {
            String queueName = String.format(ActiveMQArtemisMessagingService.QUEUE_NAME_FORMAT, ActiveMQArtemisMessagingService.this.elasticActorsCluster, name);
            ClientSession clientSession = ActiveMQArtemisMessagingService.this.clientSessionFactory.createSession(ActiveMQArtemisMessagingService.this.activeMQUsername, ActiveMQArtemisMessagingService.this.activeMQPassword, false, true, true, false, ActiveMQArtemisMessagingService.this.serverLocator.getAckBatchSize());
            ActiveMQArtemisMessagingService.this.ensureQueueExists(clientSession, queueName, name);
            RemoteMessageQueue messageQueue = new RemoteMessageQueue(queueName, name, clientSession, clientSession.createProducer(String.format(ActiveMQArtemisMessagingService.EA_ADDRESS_FORMAT, ActiveMQArtemisMessagingService.this.elasticActorsCluster)));
            messageQueue.initialize();
            return messageQueue;
        }
    }

    private final class LocalMessageQueueFactory
    implements MessageQueueFactory {
        private LocalMessageQueueFactory() {
        }

        public MessageQueue create(String name, MessageHandler messageHandler) throws Exception {
            String queueName = String.format(ActiveMQArtemisMessagingService.QUEUE_NAME_FORMAT, ActiveMQArtemisMessagingService.this.elasticActorsCluster, name);
            ClientSession clientSession = ActiveMQArtemisMessagingService.this.clientSessionFactory.createSession(ActiveMQArtemisMessagingService.this.activeMQUsername, ActiveMQArtemisMessagingService.this.activeMQPassword, false, true, true, false, ActiveMQArtemisMessagingService.this.serverLocator.getAckBatchSize());
            ActiveMQArtemisMessagingService.this.ensureQueueExists(clientSession, queueName, name);
            LocalMessageQueue messageQueue = new LocalMessageQueue(ActiveMQArtemisMessagingService.this.queueExecutor, ActiveMQArtemisMessagingService.this.internalMessageDeserializer, queueName, name, clientSession, clientSession.createProducer(String.format(ActiveMQArtemisMessagingService.EA_ADDRESS_FORMAT, ActiveMQArtemisMessagingService.this.elasticActorsCluster)), messageHandler, ActiveMQArtemisMessagingService.this.useMessageHandler, ActiveMQArtemisMessagingService.this.useImmediateReceive);
            messageQueue.initialize();
            return messageQueue;
        }
    }
}

