package org.elasticsoftware.elasticactors.rabbitmq.cpt;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import net.jodah.lyra.event.DefaultChannelListener;
import org.elasticsoftware.elasticactors.MessageDeliveryException;
import org.elasticsoftware.elasticactors.messaging.InternalMessage;
import org.elasticsoftware.elasticactors.messaging.MessageQueue;
import org.elasticsoftware.elasticactors.rabbitmq.ChannelListenerRegistry;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundExecutor;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/cpt/RemoteMessageQueue.class */
public final class RemoteMessageQueue extends DefaultChannelListener implements MessageQueue {
    private static final Logger logger = LoggerFactory.getLogger(RemoteMessageQueue.class);
    private final Channel producerChannel;
    private final String exchangeName;
    private final String queueName;
    private final ChannelListenerRegistry channelListenerRegistry;
    private final ThreadBoundExecutor queueExecutor;
    private final AtomicBoolean recovering = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/cpt/RemoteMessageQueue$MessageSender.class */
    public final class MessageSender implements ThreadBoundRunnable<String> {
        private final InternalMessage message;

        public MessageSender(InternalMessage internalMessage) {
            this.message = internalMessage;
        }

        public void run() {
            try {
                RemoteMessageQueue.this.producerChannel.basicPublish(RemoteMessageQueue.this.exchangeName, RemoteMessageQueue.this.queueName, false, false, RemoteMessageQueue.this.createProps(this.message), this.message.toByteArray());
            } catch (IOException e) {
                RemoteMessageQueue.logger.error("IOException while publishing message", e);
            } catch (AlreadyClosedException e2) {
                RemoteMessageQueue.this.recovering.set(true);
                RemoteMessageQueue.logger.error("MessagingService is recovering");
            }
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m16getKey() {
            return RemoteMessageQueue.this.queueName;
        }
    }

    public RemoteMessageQueue(ChannelListenerRegistry channelListenerRegistry, ThreadBoundExecutor threadBoundExecutor, Channel channel, String str, String str2) {
        this.queueExecutor = threadBoundExecutor;
        this.producerChannel = channel;
        this.exchangeName = str;
        this.queueName = str2;
        this.channelListenerRegistry = channelListenerRegistry;
        this.channelListenerRegistry.addChannelListener(this.producerChannel, this);
    }

    public boolean offer(InternalMessage internalMessage) {
        if (this.recovering.get()) {
            throw new MessageDeliveryException("MessagingService is recovering", true);
        }
        this.queueExecutor.execute(new MessageSender(internalMessage));
        return true;
    }

    private AMQP.BasicProperties createProps(InternalMessage internalMessage) {
        return internalMessage.getTimeout() < 0 ? internalMessage.isDurable() ? MessageProperties.PERSISTENT_BASIC : MessageProperties.BASIC : internalMessage.isDurable() ? new AMQP.BasicProperties.Builder().contentType("application/octet-stream").deliveryMode(2).priority(0).expiration(String.valueOf(internalMessage.getTimeout())).build() : new AMQP.BasicProperties.Builder().contentType("application/octet-stream").deliveryMode(1).priority(0).expiration(String.valueOf(internalMessage.getTimeout())).build();
    }

    public boolean add(InternalMessage internalMessage) {
        return offer(internalMessage);
    }

    public InternalMessage poll() {
        throw new UnsupportedOperationException("Remote queues cannot be polled");
    }

    public String getName() {
        return this.queueName;
    }

    public void initialize() throws Exception {
        logger.info("Starting remote message queue [{}->{}]", this.exchangeName, this.queueName);
    }

    public void destroy() {
        logger.info("Stopping remote message queue [{}->{}]", this.exchangeName, this.queueName);
        this.channelListenerRegistry.removeChannelListener(this.producerChannel, this);
    }

    public void onRecovery(Channel channel) {
        if (this.recovering.compareAndSet(true, false)) {
            logger.info("RabbitMQ Channel recovered");
        }
    }

    public void onRecoveryFailure(Channel channel, Throwable th) {
        logger.error("RabbitMQ Channel recovery failed");
    }
}
