package org.elasticsoftware.elasticactors.activemq;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.elasticsoftware.elasticactors.MessageDeliveryException;
import org.elasticsoftware.elasticactors.messaging.InternalMessage;
import org.elasticsoftware.elasticactors.messaging.MessageHandlerEventListener;
import org.elasticsoftware.elasticactors.messaging.MessageQueue;
import org.elasticsoftware.elasticactors.messaging.UUIDTools;
import org.elasticsoftware.elasticactors.serialization.internal.InternalMessageDeserializer;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundExecutor;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/elasticactors/activemq/LocalMessageQueue.class */
public final class LocalMessageQueue implements MessageQueue, MessageHandler {
    private static final Logger logger = LoggerFactory.getLogger(LocalMessageQueue.class);
    private final ThreadBoundExecutor queueExecutor;
    private final InternalMessageDeserializer internalMessageDeserializer;
    private final String queueName;
    private final String routingKey;
    private final ClientSession clientSession;
    private final ClientProducer producer;
    private final ClientConsumer consumer;
    private final org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler;
    private final ActiveMQMessageProcessor messageProcessor;
    private final boolean useMessageHandler;
    private final AtomicBoolean recovering = new AtomicBoolean(false);
    private final TransientAck transientAck = new TransientAck();
    private final CountDownLatch destroyLatch = new CountDownLatch(1);
    private boolean running = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/activemq/LocalMessageQueue$AcknowledgeMessage.class */
    public static final class AcknowledgeMessage implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final ClientMessage clientMessage;

        public AcknowledgeMessage(String str, ClientMessage clientMessage) {
            this.queueName = str;
            this.clientMessage = clientMessage;
        }

        public void run() {
            try {
                this.clientMessage.individualAcknowledge();
            } catch (ActiveMQException e) {
                LocalMessageQueue.logger.error("Exception while acking message", e);
            }
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m2getKey() {
            return this.queueName;
        }
    }

    /* loaded from: input_file:org/elasticsoftware/elasticactors/activemq/LocalMessageQueue$ActiveMQAck.class */
    private final class ActiveMQAck implements MessageHandlerEventListener {
        private final ClientMessage clientMessage;

        private ActiveMQAck(ClientMessage clientMessage) {
            this.clientMessage = clientMessage;
        }

        public void onError(InternalMessage internalMessage, Throwable th) {
            onDone(internalMessage);
        }

        public void onDone(InternalMessage internalMessage) {
            LocalMessageQueue.this.queueExecutor.execute(new AcknowledgeMessage(LocalMessageQueue.this.queueName, this.clientMessage));
        }
    }

    /* loaded from: input_file:org/elasticsoftware/elasticactors/activemq/LocalMessageQueue$ActiveMQMessageHandler.class */
    private static final class ActiveMQMessageHandler implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final InternalMessageDeserializer internalMessageDeserializer;
        private final byte[] body;
        private final org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler;
        private final MessageHandlerEventListener listener;
        private final Logger logger;

        private ActiveMQMessageHandler(String str, byte[] bArr, InternalMessageDeserializer internalMessageDeserializer, org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler, MessageHandlerEventListener messageHandlerEventListener, Logger logger) {
            this.queueName = str;
            this.internalMessageDeserializer = internalMessageDeserializer;
            this.body = bArr;
            this.messageHandler = messageHandler;
            this.listener = messageHandlerEventListener;
            this.logger = logger;
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m3getKey() {
            return this.queueName;
        }

        public void run() {
            try {
                this.messageHandler.handleMessage(this.internalMessageDeserializer.deserialize(this.body), this.listener);
            } catch (Exception e) {
                this.logger.error("Unexpected exception on #handleMessage", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/activemq/LocalMessageQueue$ActiveMQMessageProcessor.class */
    public final class ActiveMQMessageProcessor implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final InternalMessageDeserializer internalMessageDeserializer;
        private final org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler;
        private final boolean receiveImmediate;

        private ActiveMQMessageProcessor(String str, InternalMessageDeserializer internalMessageDeserializer, org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler, boolean z) {
            this.queueName = str;
            this.internalMessageDeserializer = internalMessageDeserializer;
            this.messageHandler = messageHandler;
            this.receiveImmediate = z;
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m4getKey() {
            return this.queueName;
        }

        public void run() {
            try {
                try {
                    try {
                        ClientMessage receiveImmediate = this.receiveImmediate ? LocalMessageQueue.this.consumer.receiveImmediate() : LocalMessageQueue.this.consumer.receive(1L);
                        if (receiveImmediate != null) {
                            byte[] bArr = new byte[receiveImmediate.getBodySize()];
                            receiveImmediate.getBodyBuffer().readBytes(bArr);
                            this.messageHandler.handleMessage(this.internalMessageDeserializer.deserialize(bArr), new ActiveMQAck(receiveImmediate));
                        }
                        LocalMessageQueue.this.receiveMessage();
                    } catch (Exception e) {
                        LocalMessageQueue.logger.error("Unexpected exception in handleMessage", e);
                        LocalMessageQueue.this.receiveMessage();
                    }
                } catch (ActiveMQException e2) {
                    LocalMessageQueue.logger.error("Unexpected exception on consumer.receive*", e2);
                    LocalMessageQueue.this.receiveMessage();
                } catch (IOException e3) {
                    LocalMessageQueue.logger.error("Exception deserializing InteralMessage", e3);
                    LocalMessageQueue.this.receiveMessage();
                }
            } catch (Throwable th) {
                LocalMessageQueue.this.receiveMessage();
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/elasticsoftware/elasticactors/activemq/LocalMessageQueue$DestroyQueue.class */
    private final class DestroyQueue implements ThreadBoundRunnable<String> {
        private final String queueName;

        private DestroyQueue(String str) {
            this.queueName = str;
        }

        public void run() {
            LocalMessageQueue.this.running = false;
            if (LocalMessageQueue.this.useMessageHandler) {
                LocalMessageQueue.this.destroyLatch.countDown();
            }
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m5getKey() {
            return this.queueName;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/activemq/LocalMessageQueue$InternalMessageHandler.class */
    public static final class InternalMessageHandler implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final InternalMessage message;
        private final org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler;
        private final MessageHandlerEventListener listener;
        private final Logger logger;

        private InternalMessageHandler(String str, InternalMessage internalMessage, org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler, MessageHandlerEventListener messageHandlerEventListener, Logger logger) {
            this.queueName = str;
            this.message = internalMessage;
            this.messageHandler = messageHandler;
            this.listener = messageHandlerEventListener;
            this.logger = logger;
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m6getKey() {
            return this.queueName;
        }

        public void run() {
            try {
                this.messageHandler.handleMessage(this.message, this.listener);
            } catch (Exception e) {
                this.logger.error("Unexpected exception on #handleMessage", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/activemq/LocalMessageQueue$SendMessage.class */
    public final class SendMessage implements ThreadBoundRunnable<String> {
        private final InternalMessage message;

        public SendMessage(InternalMessage internalMessage) {
            this.message = internalMessage;
        }

        public void run() {
            ClientMessage createMessage = LocalMessageQueue.this.clientSession.createMessage(this.message.isDurable());
            createMessage.getBodyBuffer().writeBytes(this.message.toByteArray());
            createMessage.putStringProperty("routingKey", LocalMessageQueue.this.routingKey);
            createMessage.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, UUIDTools.toByteArray(this.message.getId()));
            if (this.message.getTimeout() >= 0) {
                createMessage.setExpiration(System.currentTimeMillis() + this.message.getTimeout());
            }
            try {
                LocalMessageQueue.this.producer.send(createMessage);
            } catch (ActiveMQException e) {
                throw new MessageDeliveryException("IOException while publishing message", e, false);
            }
        }

        /* renamed from: getKey, reason: merged with bridge method [inline-methods] */
        public String m7getKey() {
            return LocalMessageQueue.this.queueName;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/activemq/LocalMessageQueue$TransientAck.class */
    public final class TransientAck implements MessageHandlerEventListener {
        private TransientAck() {
        }

        public void onError(InternalMessage internalMessage, Throwable th) {
            LocalMessageQueue.logger.error("Error handling transient message, payloadClass [{}]", internalMessage.getPayloadClass(), th);
        }

        public void onDone(InternalMessage internalMessage) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalMessageQueue(ThreadBoundExecutor threadBoundExecutor, InternalMessageDeserializer internalMessageDeserializer, String str, String str2, ClientSession clientSession, ClientProducer clientProducer, org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler, boolean z, boolean z2) throws ActiveMQException {
        this.queueExecutor = threadBoundExecutor;
        this.internalMessageDeserializer = internalMessageDeserializer;
        this.queueName = str;
        this.routingKey = str2;
        this.clientSession = clientSession;
        this.producer = clientProducer;
        this.useMessageHandler = z;
        this.consumer = clientSession.createConsumer(str);
        this.messageHandler = messageHandler;
        this.messageProcessor = new ActiveMQMessageProcessor(str, internalMessageDeserializer, messageHandler, z2);
    }

    public boolean offer(InternalMessage internalMessage) {
        if (this.recovering.get()) {
            throw new MessageDeliveryException("MessagingService is recovering", true);
        }
        if (internalMessage.isDurable()) {
            this.queueExecutor.execute(new SendMessage(internalMessage));
            return true;
        }
        this.queueExecutor.execute(new InternalMessageHandler(this.queueName, internalMessage, this.messageHandler, this.transientAck, logger));
        return true;
    }

    public boolean add(InternalMessage internalMessage) {
        return offer(internalMessage);
    }

    public InternalMessage poll() {
        return null;
    }

    public String getName() {
        return this.queueName;
    }

    public synchronized void initialize() throws Exception {
        if (this.useMessageHandler) {
            this.consumer.setMessageHandler(this);
        } else {
            receiveMessage();
        }
        this.clientSession.start();
    }

    public void destroy() {
        try {
            this.queueExecutor.execute(new DestroyQueue(this.queueName));
            this.destroyLatch.await(3L, TimeUnit.SECONDS);
            this.consumer.close();
            this.producer.close();
            this.clientSession.close();
        } catch (ActiveMQException | InterruptedException e) {
            logger.warn("Exception while closing consumer", e);
        }
    }

    public void onMessage(ClientMessage clientMessage) {
        byte[] bArr = new byte[clientMessage.getBodySize()];
        clientMessage.getBodyBuffer().readBytes(bArr);
        this.queueExecutor.execute(new ActiveMQMessageHandler(this.queueName, bArr, this.internalMessageDeserializer, this.messageHandler, new ActiveMQAck(clientMessage), logger));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receiveMessage() {
        if (this.running) {
            this.queueExecutor.execute(this.messageProcessor);
        } else {
            this.destroyLatch.countDown();
        }
    }
}
