package org.elasticsoftware.elasticactors.rabbitmq.sc;

import com.google.common.base.Throwables;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.MicrometerMetricsCollector;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.config.RecoveryPolicy;
import net.jodah.lyra.event.ChannelListener;
import net.jodah.lyra.util.Duration;
import org.elasticsoftware.elasticactors.PhysicalNode;
import org.elasticsoftware.elasticactors.cluster.metrics.MicrometerConfiguration;
import org.elasticsoftware.elasticactors.messaging.MessageHandler;
import org.elasticsoftware.elasticactors.messaging.MessageQueue;
import org.elasticsoftware.elasticactors.messaging.MessageQueueFactory;
import org.elasticsoftware.elasticactors.messaging.MessageQueueFactoryFactory;
import org.elasticsoftware.elasticactors.rabbitmq.ChannelListenerRegistry;
import org.elasticsoftware.elasticactors.rabbitmq.LoggingShutdownListener;
import org.elasticsoftware.elasticactors.rabbitmq.MessageAcker;
import org.elasticsoftware.elasticactors.rabbitmq.RabbitMQMessagingService;
import org.elasticsoftware.elasticactors.rabbitmq.ack.AsyncMessageAcker;
import org.elasticsoftware.elasticactors.rabbitmq.ack.BufferingMessageAcker;
import org.elasticsoftware.elasticactors.rabbitmq.ack.DirectMessageAcker;
import org.elasticsoftware.elasticactors.rabbitmq.ack.WriteBehindMessageAcker;
import org.elasticsoftware.elasticactors.serialization.internal.InternalMessageDeserializer;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/sc/SingleProducerRabbitMQMessagingService.class */
public final class SingleProducerRabbitMQMessagingService implements RabbitMQMessagingService, ChannelListenerRegistry, ChannelListener {
    private static final Logger logger = LoggerFactory.getLogger(SingleProducerRabbitMQMessagingService.class);
    private final String rabbitmqHosts;
    private final Integer rabbitmqPort;
    private static final String QUEUE_NAME_FORMAT = "%s/%s";
    private final String elasticActorsCluster;
    private static final String EA_EXCHANGE_FORMAT = "ea.%s";
    private final String exchangeName;
    private Connection clientConnection;
    private Channel consumerChannel;
    private Channel producerChannel;
    private final ThreadBoundExecutor queueExecutor;
    private final String username;
    private final String password;
    private final InternalMessageDeserializer internalMessageDeserializer;
    private final MessageAcker.Type ackType;
    private MessageAcker messageAcker;
    private final Integer prefetchCount;
    private final MicrometerConfiguration micrometerConfiguration;
    private final MicrometerConfiguration ackerMicrometerConfiguration;
    private final ConnectionFactory connectionFactory = new ConnectionFactory();
    private final ConcurrentMap<Channel, Set<ChannelListener>> channelListenerRegistry = new ConcurrentHashMap();
    private final LocalMessageQueueFactory localMessageQueueFactory = new LocalMessageQueueFactory();
    private final RemoteMessageQueueFactory remoteMessageQueueFactory = new RemoteMessageQueueFactory();
    private final RemoteActorSystemMessageQueueFactoryFactory remoteActorSystemMessageQueueFactoryFactory = new RemoteActorSystemMessageQueueFactoryFactory();

    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/sc/SingleProducerRabbitMQMessagingService$LocalMessageQueueFactory.class */
    private final class LocalMessageQueueFactory implements MessageQueueFactory {
        private LocalMessageQueueFactory() {
        }

        public MessageQueue create(String str, MessageHandler messageHandler) throws Exception {
            String format = String.format("%s/%s", SingleProducerRabbitMQMessagingService.this.elasticActorsCluster, str);
            SingleProducerRabbitMQMessagingService.this.ensureQueueExists(SingleProducerRabbitMQMessagingService.this.consumerChannel, format);
            LocalMessageQueue localMessageQueue = new LocalMessageQueue(SingleProducerRabbitMQMessagingService.this.queueExecutor, SingleProducerRabbitMQMessagingService.this, SingleProducerRabbitMQMessagingService.this.consumerChannel, SingleProducerRabbitMQMessagingService.this.producerChannel, SingleProducerRabbitMQMessagingService.this.exchangeName, format, messageHandler, SingleProducerRabbitMQMessagingService.this.internalMessageDeserializer, SingleProducerRabbitMQMessagingService.this.messageAcker);
            localMessageQueue.initialize();
            return localMessageQueue;
        }
    }

    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/sc/SingleProducerRabbitMQMessagingService$RemoteActorSystemMessageQueueFactory.class */
    private final class RemoteActorSystemMessageQueueFactory implements MessageQueueFactory {
        private final String clusterName;

        private RemoteActorSystemMessageQueueFactory(String str) {
            this.clusterName = str;
        }

        public MessageQueue create(String str, MessageHandler messageHandler) throws Exception {
            String format = String.format("%s/%s", this.clusterName, str);
            SingleProducerRabbitMQMessagingService.this.ensureQueueExists(SingleProducerRabbitMQMessagingService.this.producerChannel, format);
            RemoteMessageQueue remoteMessageQueue = new RemoteMessageQueue(SingleProducerRabbitMQMessagingService.this, SingleProducerRabbitMQMessagingService.this.producerChannel, SingleProducerRabbitMQMessagingService.this.exchangeName, format);
            remoteMessageQueue.initialize();
            return remoteMessageQueue;
        }
    }

    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/sc/SingleProducerRabbitMQMessagingService$RemoteActorSystemMessageQueueFactoryFactory.class */
    private final class RemoteActorSystemMessageQueueFactoryFactory implements MessageQueueFactoryFactory {
        private RemoteActorSystemMessageQueueFactoryFactory() {
        }

        public MessageQueueFactory create(String str) {
            return new RemoteActorSystemMessageQueueFactory(str);
        }
    }

    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/sc/SingleProducerRabbitMQMessagingService$RemoteMessageQueueFactory.class */
    private final class RemoteMessageQueueFactory implements MessageQueueFactory {
        private RemoteMessageQueueFactory() {
        }

        public MessageQueue create(String str, MessageHandler messageHandler) throws Exception {
            String format = String.format("%s/%s", SingleProducerRabbitMQMessagingService.this.elasticActorsCluster, str);
            SingleProducerRabbitMQMessagingService.this.ensureQueueExists(SingleProducerRabbitMQMessagingService.this.producerChannel, format);
            return new RemoteMessageQueue(SingleProducerRabbitMQMessagingService.this, SingleProducerRabbitMQMessagingService.this.producerChannel, SingleProducerRabbitMQMessagingService.this.exchangeName, format);
        }
    }

    public SingleProducerRabbitMQMessagingService(String str, String str2, Integer num, String str3, String str4, MessageAcker.Type type, ThreadBoundExecutor threadBoundExecutor, InternalMessageDeserializer internalMessageDeserializer, Integer num2, @Nullable MicrometerConfiguration micrometerConfiguration, @Nullable MicrometerConfiguration micrometerConfiguration2) {
        this.rabbitmqHosts = str2;
        this.elasticActorsCluster = str;
        this.rabbitmqPort = num;
        this.queueExecutor = threadBoundExecutor;
        this.username = str3;
        this.password = str4;
        this.ackType = type;
        this.internalMessageDeserializer = internalMessageDeserializer;
        this.exchangeName = String.format("ea.%s", str);
        this.prefetchCount = num2;
        this.micrometerConfiguration = micrometerConfiguration;
        this.ackerMicrometerConfiguration = micrometerConfiguration2;
    }

    @PostConstruct
    public void start() throws IOException, TimeoutException {
        logger.info("Starting messaging service [{}]", getClass().getSimpleName());
        if (this.micrometerConfiguration != null) {
            this.connectionFactory.setMetricsCollector(new MicrometerMetricsCollector(this.micrometerConfiguration.getRegistry(), this.micrometerConfiguration.getMetricPrefix() + "rabbitmq", this.micrometerConfiguration.getTags()));
        }
        this.connectionFactory.setConnectionTimeout(1000);
        this.connectionFactory.setRequestedHeartbeat(4);
        this.connectionFactory.setAutomaticRecoveryEnabled(false);
        this.connectionFactory.setTopologyRecoveryEnabled(false);
        this.clientConnection = Connections.create(new ConnectionOptions(this.connectionFactory).withHosts(StringUtils.commaDelimitedListToStringArray(this.rabbitmqHosts)).withPort(this.rabbitmqPort.intValue()).withUsername(this.username).withPassword(this.password), new Config().withRecoveryPolicy(new RecoveryPolicy().withMaxAttempts(-1).withInterval(Duration.seconds(1L))).withChannelListeners(new ChannelListener[]{this}));
        this.consumerChannel = this.clientConnection.createChannel();
        this.consumerChannel.basicQos(this.prefetchCount.intValue());
        this.producerChannel = this.clientConnection.createChannel();
        this.consumerChannel.addShutdownListener(LoggingShutdownListener.INSTANCE);
        this.producerChannel.addShutdownListener(LoggingShutdownListener.INSTANCE);
        this.consumerChannel.exchangeDeclare(this.exchangeName, "direct", true);
        if (this.ackType == MessageAcker.Type.BUFFERED) {
            this.messageAcker = new BufferingMessageAcker(this.consumerChannel);
        } else if (this.ackType == MessageAcker.Type.WRITE_BEHIND) {
            this.messageAcker = new WriteBehindMessageAcker(this.consumerChannel);
        } else if (this.ackType == MessageAcker.Type.ASYNC) {
            this.messageAcker = new AsyncMessageAcker(this.consumerChannel, this.ackerMicrometerConfiguration);
        } else {
            this.messageAcker = new DirectMessageAcker(this.consumerChannel);
        }
        this.messageAcker.start();
    }

    @PreDestroy
    public void stop() {
        logger.info("Stopping messaging service");
        try {
            this.messageAcker.stop();
            this.producerChannel.close();
            this.consumerChannel.close();
            this.clientConnection.close();
        } catch (IOException | TimeoutException e) {
            logger.error("Failed to close all RabbitMQ Client resources", e);
        }
    }

    public void sendWireMessage(String str, byte[] bArr, PhysicalNode physicalNode) throws IOException {
        this.producerChannel.basicPublish(this.exchangeName, str, true, false, (AMQP.BasicProperties) null, bArr);
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.RabbitMQMessagingService
    public MessageQueueFactory getLocalMessageQueueFactory() {
        return this.localMessageQueueFactory;
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.RabbitMQMessagingService
    public MessageQueueFactory getRemoteMessageQueueFactory() {
        return this.remoteMessageQueueFactory;
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.RabbitMQMessagingService
    public MessageQueueFactoryFactory getRemoteActorSystemMessageQueueFactoryFactory() {
        return this.remoteActorSystemMessageQueueFactoryFactory;
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.ChannelListenerRegistry
    public void addChannelListener(Channel channel, ChannelListener channelListener) {
        Set<ChannelListener> set = this.channelListenerRegistry.get(channel);
        if (set == null) {
            set = Collections.newSetFromMap(new ConcurrentHashMap());
            if (this.channelListenerRegistry.putIfAbsent(channel, set) != null) {
                set = this.channelListenerRegistry.get(channel);
            }
        }
        set.add(channelListener);
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.ChannelListenerRegistry
    public void removeChannelListener(Channel channel, ChannelListener channelListener) {
        Set<ChannelListener> set = this.channelListenerRegistry.get(channel);
        if (set != null) {
            set.remove(channelListener);
        }
    }

    public void onCreate(Channel channel) {
        propagateChannelEvent(channel, channelListener -> {
            channelListener.onCreate(channel);
        }, "onCreate");
    }

    public void onCreateFailure(Throwable th) {
        logger.error("Channel creation failed, reason: " + System.lineSeparator() + Throwables.getStackTraceAsString(th));
    }

    public void onRecoveryStarted(Channel channel) {
        propagateChannelEvent(channel, channelListener -> {
            channelListener.onRecoveryStarted(channel);
        }, "onRecoveryStarted");
    }

    public void onRecovery(Channel channel) {
        propagateChannelEvent(channel, channelListener -> {
            channelListener.onRecovery(channel);
        }, "onRecovery");
    }

    public void onRecoveryCompleted(Channel channel) {
        propagateChannelEvent(channel, channelListener -> {
            channelListener.onRecoveryCompleted(channel);
        }, "onRecoveryCompleted");
    }

    public void onRecoveryFailure(Channel channel, Throwable th) {
        propagateChannelEvent(channel, channelListener -> {
            channelListener.onRecoveryFailure(channel, th);
        }, "onRecoveryFailure");
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.RabbitMQMessagingService
    public boolean isClientConnectionOpen() {
        return this.clientConnection != null && this.clientConnection.isOpen();
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.RabbitMQMessagingService
    public boolean areConsumerChannelsOpen() {
        return this.consumerChannel != null && this.consumerChannel.isOpen();
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.RabbitMQMessagingService
    public boolean areProducerChannelsOpen() {
        return this.producerChannel != null && this.producerChannel.isOpen();
    }

    private void propagateChannelEvent(Channel channel, Consumer<ChannelListener> consumer, String str) {
        Set<ChannelListener> set = this.channelListenerRegistry.get(channel);
        if (set != null) {
            for (ChannelListener channelListener : set) {
                try {
                    consumer.accept(channelListener);
                } catch (Exception e) {
                    logger.error("Exception while calling [{}] on ChannelListener [{}]", new Object[]{str, channelListener, e});
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ensureQueueExists(Channel channel, String str) throws IOException {
        channel.queueDeclare(str, true, false, false, (Map) null);
        channel.queueBind(str, this.exchangeName, str);
    }
}
