package org.elasticsoftware.elasticactors.rabbitmq.cpt;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.jodah.lyra.event.ChannelListener;
import org.elasticsoftware.elasticactors.MessageDeliveryException;
import org.elasticsoftware.elasticactors.messaging.InternalMessage;
import org.elasticsoftware.elasticactors.messaging.MessageHandler;
import org.elasticsoftware.elasticactors.messaging.MessageHandlerEventListener;
import org.elasticsoftware.elasticactors.messaging.MessageQueue;
import org.elasticsoftware.elasticactors.rabbitmq.ChannelListenerRegistry;
import org.elasticsoftware.elasticactors.rabbitmq.MessageAcker;
import org.elasticsoftware.elasticactors.serialization.internal.InternalMessageDeserializer;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundExecutor;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/cpt/LocalMessageQueue.class */
public final class LocalMessageQueue extends DefaultConsumer implements MessageQueue, ChannelListener {
    private static final Logger logger = LoggerFactory.getLogger(LocalMessageQueue.class);
    private final Channel consumerChannel;
    private final Channel producerChannel;
    private final String exchangeName;
    private final String queueName;
    private final MessageHandler messageHandler;
    private final TransientAck transientAck;
    private final ThreadBoundExecutor queueExecutor;
    private final CountDownLatch destroyLatch;
    private final InternalMessageDeserializer internalMessageDeserializer;
    private final AtomicBoolean recovering;
    private final ChannelListenerRegistry channelListenerRegistry;
    private final MessageAcker messageAcker;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/cpt/LocalMessageQueue$InternalMessageHandler.class */
    public static final class InternalMessageHandler implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final InternalMessage message;
        private final MessageHandler messageHandler;
        private final MessageHandlerEventListener listener;
        private final Logger logger;
        private final long startTime;

        private InternalMessageHandler(String str, InternalMessage internalMessage, MessageHandler messageHandler, MessageHandlerEventListener messageHandlerEventListener, Logger logger) {
            this.queueName = str;
            this.message = internalMessage;
            this.messageHandler = messageHandler;
            this.listener = messageHandlerEventListener;
            this.logger = logger;
            this.startTime = logger.isTraceEnabled() ? System.nanoTime() : 0L;
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m14getKey() {
            return this.queueName;
        }

        public void run() {
            try {
                try {
                    this.messageHandler.handleMessage(this.message, this.listener);
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("(local) Message of type [{}] with id [{}] took {} microsecs to execute on queue [{}]", new Object[]{this.message.getPayloadClass(), this.message.getId(), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - this.startTime)), this.queueName});
                    }
                } catch (Exception e) {
                    this.logger.error("Unexpected exception on #handleMessage", e);
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("(local) Message of type [{}] with id [{}] took {} microsecs to execute on queue [{}]", new Object[]{this.message.getPayloadClass(), this.message.getId(), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - this.startTime)), this.queueName});
                    }
                }
            } catch (Throwable th) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("(local) Message of type [{}] with id [{}] took {} microsecs to execute on queue [{}]", new Object[]{this.message.getPayloadClass(), this.message.getId(), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - this.startTime)), this.queueName});
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/cpt/LocalMessageQueue$MessageSender.class */
    public final class MessageSender implements ThreadBoundRunnable<String> {
        private final InternalMessage message;

        public MessageSender(InternalMessage internalMessage) {
            this.message = internalMessage;
        }

        public void run() {
            try {
                LocalMessageQueue.this.producerChannel.basicPublish(LocalMessageQueue.this.exchangeName, LocalMessageQueue.this.queueName, false, false, LocalMessageQueue.this.createProps(this.message), this.message.toByteArray());
            } catch (IOException e) {
                LocalMessageQueue.logger.error("IOException while publishing message", e);
            } catch (AlreadyClosedException e2) {
                LocalMessageQueue.this.recovering.set(true);
                LocalMessageQueue.logger.error("MessagingService is recovering");
            }
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m15getKey() {
            return LocalMessageQueue.this.queueName;
        }
    }

    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/cpt/LocalMessageQueue$RabbitMQAck.class */
    private final class RabbitMQAck implements MessageHandlerEventListener {
        private final Envelope envelope;

        private RabbitMQAck(Envelope envelope) {
            this.envelope = envelope;
        }

        public void onError(InternalMessage internalMessage, Throwable th) {
            onDone(internalMessage);
        }

        public void onDone(InternalMessage internalMessage) {
            LocalMessageQueue.this.messageAcker.ack(this.envelope.getDeliveryTag());
        }
    }

    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/cpt/LocalMessageQueue$RabbitMQMessageHandler.class */
    private static final class RabbitMQMessageHandler implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final InternalMessageDeserializer internalMessageDeserializer;
        private final byte[] body;
        private final MessageHandler messageHandler;
        private final MessageHandlerEventListener listener;
        private final Logger logger;
        private final long startTime;

        private RabbitMQMessageHandler(String str, byte[] bArr, InternalMessageDeserializer internalMessageDeserializer, MessageHandler messageHandler, MessageHandlerEventListener messageHandlerEventListener, Logger logger) {
            this.queueName = str;
            this.internalMessageDeserializer = internalMessageDeserializer;
            this.body = bArr;
            this.messageHandler = messageHandler;
            this.listener = messageHandlerEventListener;
            this.logger = logger;
            this.startTime = logger.isTraceEnabled() ? System.nanoTime() : 0L;
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m16getKey() {
            return this.queueName;
        }

        public void run() {
            InternalMessage internalMessage = null;
            try {
                try {
                    internalMessage = this.internalMessageDeserializer.deserialize(this.body);
                    this.messageHandler.handleMessage(internalMessage, this.listener);
                    if (this.logger.isTraceEnabled()) {
                        long micros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - this.startTime);
                        if (internalMessage != null) {
                            this.logger.trace("(rabbit) Message of type [{}] with id [{}] took {} microsecs to execute on queue [{}]", new Object[]{internalMessage.getPayloadClass(), internalMessage.getId(), Long.valueOf(micros), this.queueName});
                        }
                    }
                } catch (Exception e) {
                    this.logger.error("Unexpected exception on #handleMessage", e);
                    if (this.logger.isTraceEnabled()) {
                        long micros2 = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - this.startTime);
                        if (internalMessage != null) {
                            this.logger.trace("(rabbit) Message of type [{}] with id [{}] took {} microsecs to execute on queue [{}]", new Object[]{internalMessage.getPayloadClass(), internalMessage.getId(), Long.valueOf(micros2), this.queueName});
                        }
                    }
                }
            } catch (Throwable th) {
                if (this.logger.isTraceEnabled()) {
                    long micros3 = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - this.startTime);
                    if (internalMessage != null) {
                        this.logger.trace("(rabbit) Message of type [{}] with id [{}] took {} microsecs to execute on queue [{}]", new Object[]{internalMessage.getPayloadClass(), internalMessage.getId(), Long.valueOf(micros3), this.queueName});
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/cpt/LocalMessageQueue$TransientAck.class */
    public final class TransientAck implements MessageHandlerEventListener {
        private TransientAck() {
        }

        public void onError(InternalMessage internalMessage, Throwable th) {
            LocalMessageQueue.logger.error("Error handling transient message, payloadClass [{}]", internalMessage.getPayloadClass(), th);
        }

        public void onDone(InternalMessage internalMessage) {
        }
    }

    public LocalMessageQueue(ThreadBoundExecutor threadBoundExecutor, ChannelListenerRegistry channelListenerRegistry, Channel channel, Channel channel2, String str, String str2, MessageHandler messageHandler, InternalMessageDeserializer internalMessageDeserializer, MessageAcker messageAcker) {
        super(channel);
        this.transientAck = new TransientAck();
        this.destroyLatch = new CountDownLatch(1);
        this.recovering = new AtomicBoolean(false);
        this.queueExecutor = threadBoundExecutor;
        this.consumerChannel = channel;
        this.producerChannel = channel2;
        this.exchangeName = str;
        this.queueName = str2;
        this.messageHandler = messageHandler;
        this.internalMessageDeserializer = internalMessageDeserializer;
        this.messageAcker = messageAcker;
        this.channelListenerRegistry = channelListenerRegistry;
        this.channelListenerRegistry.addChannelListener(this.producerChannel, this);
    }

    public boolean offer(InternalMessage internalMessage) {
        if (this.recovering.get()) {
            throw new MessageDeliveryException("MessagingService is recovering", true);
        }
        if (internalMessage.isDurable()) {
            this.queueExecutor.execute(new MessageSender(internalMessage));
            return true;
        }
        this.queueExecutor.execute(new InternalMessageHandler(this.queueName, internalMessage, this.messageHandler, this.transientAck, logger));
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AMQP.BasicProperties createProps(InternalMessage internalMessage) {
        return internalMessage.getTimeout() < 0 ? internalMessage.isDurable() ? MessageProperties.PERSISTENT_BASIC : MessageProperties.BASIC : internalMessage.isDurable() ? new AMQP.BasicProperties.Builder().contentType("application/octet-stream").deliveryMode(2).priority(0).expiration(String.valueOf(internalMessage.getTimeout())).build() : new AMQP.BasicProperties.Builder().contentType("application/octet-stream").deliveryMode(1).priority(0).expiration(String.valueOf(internalMessage.getTimeout())).build();
    }

    public boolean add(InternalMessage internalMessage) {
        return offer(internalMessage);
    }

    public InternalMessage poll() {
        return null;
    }

    public String getName() {
        return this.queueName;
    }

    public void initialize() throws Exception {
        logger.info("Starting local message queue [{}->{}]", this.exchangeName, this.queueName);
        this.consumerChannel.basicConsume(this.queueName, false, this);
    }

    public void destroy() {
        try {
            try {
                logger.info("Stopping local message queue [{}->{}]", this.exchangeName, this.queueName);
                this.consumerChannel.basicCancel(getConsumerTag());
                this.destroyLatch.await(4L, TimeUnit.SECONDS);
                this.channelListenerRegistry.removeChannelListener(this.producerChannel, this);
            } catch (IOException e) {
                logger.error("IOException while cancelling consumer for queue {}", this.queueName, e);
                this.channelListenerRegistry.removeChannelListener(this.producerChannel, this);
            } catch (InterruptedException e2) {
                this.channelListenerRegistry.removeChannelListener(this.producerChannel, this);
            }
        } catch (Throwable th) {
            this.channelListenerRegistry.removeChannelListener(this.producerChannel, this);
            throw th;
        }
    }

    public void handleCancelOk(String str) {
        this.destroyLatch.countDown();
    }

    public void handleCancel(String str) throws IOException {
        logger.error("Unexpectedly cancelled: consumerTag = {}", str);
    }

    public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
    }

    public void handleRecoverOk(String str) {
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        try {
            this.messageAcker.deliver(envelope.getDeliveryTag());
            this.queueExecutor.execute(new RabbitMQMessageHandler(this.queueName, bArr, this.internalMessageDeserializer, this.messageHandler, new RabbitMQAck(envelope), logger));
        } catch (Exception e) {
            logger.error("Unexpected Exception on handleDelivery.. Acking the message so it will not clog up the system", e);
            this.messageAcker.ack(envelope.getDeliveryTag());
        }
    }

    public void onCreate(Channel channel) {
    }

    public void onCreateFailure(Throwable th) {
    }

    public void onRecovery(Channel channel) {
    }

    public void onRecoveryStarted(Channel channel) {
    }

    public void onRecoveryCompleted(Channel channel) {
        if (this.recovering.compareAndSet(true, false)) {
            logger.info("RabbitMQ Channel recovered");
        }
    }

    public void onRecoveryFailure(Channel channel, Throwable th) {
        logger.error("RabbitMQ Channel recovery failed");
    }
}
