/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.activemq;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.elasticsoftware.elasticactors.MessageDeliveryException;
import org.elasticsoftware.elasticactors.messaging.InternalMessage;
import org.elasticsoftware.elasticactors.messaging.MessageHandlerEventListener;
import org.elasticsoftware.elasticactors.messaging.MessageQueue;
import org.elasticsoftware.elasticactors.messaging.UUIDTools;
import org.elasticsoftware.elasticactors.serialization.internal.InternalMessageDeserializer;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundEvent;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundExecutor;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LocalMessageQueue
implements MessageQueue,
MessageHandler {
    private static final Logger logger = LoggerFactory.getLogger(LocalMessageQueue.class);
    private final ThreadBoundExecutor queueExecutor;
    private final InternalMessageDeserializer internalMessageDeserializer;
    private final String queueName;
    private final String routingKey;
    private final ClientSession clientSession;
    private final ClientProducer producer;
    private final ClientConsumer consumer;
    private final org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler;
    private final AtomicBoolean recovering = new AtomicBoolean(false);
    private final TransientAck transientAck = new TransientAck();
    private final ActiveMQMessageProcessor messageProcessor;
    private final CountDownLatch destroyLatch = new CountDownLatch(1);
    private final boolean useMessageHandler;
    private boolean running = true;

    LocalMessageQueue(ThreadBoundExecutor queueExecutor, InternalMessageDeserializer internalMessageDeserializer, String queueName, String routingKey, ClientSession clientSession, ClientProducer clientProducer, org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler, boolean useMessageHandler, boolean useImmediateReceive) throws ActiveMQException {
        this.queueExecutor = queueExecutor;
        this.internalMessageDeserializer = internalMessageDeserializer;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.clientSession = clientSession;
        this.producer = clientProducer;
        this.useMessageHandler = useMessageHandler;
        this.consumer = clientSession.createConsumer(queueName);
        this.messageHandler = messageHandler;
        this.messageProcessor = new ActiveMQMessageProcessor(queueName, internalMessageDeserializer, messageHandler, useImmediateReceive);
    }

    public boolean offer(InternalMessage message) {
        if (this.recovering.get()) {
            throw new MessageDeliveryException("MessagingService is recovering", true);
        }
        if (!message.isDurable()) {
            this.queueExecutor.execute((ThreadBoundEvent)new InternalMessageHandler(this.queueName, message, this.messageHandler, this.transientAck, logger));
            return true;
        }
        this.queueExecutor.execute((ThreadBoundEvent)new SendMessage(message));
        return true;
    }

    public boolean add(InternalMessage message) {
        return this.offer(message);
    }

    public InternalMessage poll() {
        return null;
    }

    public String getName() {
        return this.queueName;
    }

    public synchronized void initialize() throws Exception {
        logger.info("Starting local message queue [{}->{}]", (Object)this.routingKey, (Object)this.queueName);
        if (this.useMessageHandler) {
            this.consumer.setMessageHandler((MessageHandler)this);
        } else {
            this.receiveMessage();
        }
        this.clientSession.start();
    }

    public void destroy() {
        try {
            logger.info("Stopping local message queue [{}->{}]", (Object)this.routingKey, (Object)this.queueName);
            this.queueExecutor.execute((ThreadBoundEvent)new DestroyQueue(this.queueName));
            this.destroyLatch.await(3L, TimeUnit.SECONDS);
            this.consumer.close();
            this.producer.close();
            this.clientSession.close();
        }
        catch (InterruptedException | ActiveMQException e) {
            logger.warn("Exception while closing consumer for queue {}", (Object)this.queueName, (Object)e);
        }
    }

    public void onMessage(ClientMessage message) {
        byte[] bodyBuffer = new byte[message.getBodySize()];
        message.getBodyBuffer().readBytes(bodyBuffer);
        this.queueExecutor.execute((ThreadBoundEvent)new ActiveMQMessageHandler(this.queueName, bodyBuffer, this.internalMessageDeserializer, this.messageHandler, new ActiveMQAck(message), logger));
    }

    private void receiveMessage() {
        if (this.running) {
            this.queueExecutor.execute((ThreadBoundEvent)this.messageProcessor);
        } else {
            this.destroyLatch.countDown();
        }
    }

    private final class DestroyQueue
    implements ThreadBoundRunnable<String> {
        private final String queueName;

        private DestroyQueue(String queueName) {
            this.queueName = queueName;
        }

        public void run() {
            LocalMessageQueue.this.running = false;
            if (LocalMessageQueue.this.useMessageHandler) {
                LocalMessageQueue.this.destroyLatch.countDown();
            }
        }

        public String getKey() {
            return this.queueName;
        }
    }

    private final class TransientAck
    implements MessageHandlerEventListener {
        private TransientAck() {
        }

        public void onError(InternalMessage message, Throwable exception) {
            logger.error("Error handling transient message, payloadClass [{}]", (Object)message.getPayloadClass(), (Object)exception);
        }

        public void onDone(InternalMessage message) {
        }
    }

    private static final class InternalMessageHandler
    implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final InternalMessage message;
        private final org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler;
        private final MessageHandlerEventListener listener;
        private final Logger logger;

        private InternalMessageHandler(String queueName, InternalMessage message, org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler, MessageHandlerEventListener listener, Logger logger) {
            this.queueName = queueName;
            this.message = message;
            this.messageHandler = messageHandler;
            this.listener = listener;
            this.logger = logger;
        }

        public String getKey() {
            return this.queueName;
        }

        public void run() {
            try {
                this.messageHandler.handleMessage(this.message, this.listener);
            }
            catch (Exception e) {
                this.logger.error("Unexpected exception on #handleMessage", (Throwable)e);
            }
        }
    }

    private static final class AcknowledgeMessage
    implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final ClientMessage clientMessage;

        public AcknowledgeMessage(String queueName, ClientMessage clientMessage) {
            this.queueName = queueName;
            this.clientMessage = clientMessage;
        }

        public void run() {
            try {
                this.clientMessage.individualAcknowledge();
            }
            catch (ActiveMQException e) {
                logger.error("Exception while acking message", (Throwable)e);
            }
        }

        public String getKey() {
            return this.queueName;
        }
    }

    private final class SendMessage
    implements ThreadBoundRunnable<String> {
        private final InternalMessage message;

        public SendMessage(InternalMessage message) {
            this.message = message;
        }

        public void run() {
            ClientMessage clientMessage = LocalMessageQueue.this.clientSession.createMessage(this.message.isDurable());
            clientMessage.getBodyBuffer().writeBytes(this.message.toByteArray());
            clientMessage.putStringProperty("routingKey", LocalMessageQueue.this.routingKey);
            clientMessage.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, UUIDTools.toByteArray((UUID)this.message.getId()));
            if (this.message.getTimeout() >= 0) {
                clientMessage.setExpiration(System.currentTimeMillis() + (long)this.message.getTimeout());
            }
            try {
                LocalMessageQueue.this.producer.send((Message)clientMessage);
            }
            catch (ActiveMQException e) {
                throw new MessageDeliveryException("IOException while publishing message", (Throwable)e, false);
            }
        }

        public String getKey() {
            return LocalMessageQueue.this.queueName;
        }
    }

    private final class ActiveMQAck
    implements MessageHandlerEventListener {
        private final ClientMessage clientMessage;

        private ActiveMQAck(ClientMessage clientMessage) {
            this.clientMessage = clientMessage;
        }

        public void onError(InternalMessage message, Throwable exception) {
            this.onDone(message);
        }

        public void onDone(InternalMessage message) {
            LocalMessageQueue.this.queueExecutor.execute((ThreadBoundEvent)new AcknowledgeMessage(LocalMessageQueue.this.queueName, this.clientMessage));
        }
    }

    private static final class ActiveMQMessageHandler
    implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final InternalMessageDeserializer internalMessageDeserializer;
        private final byte[] body;
        private final org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler;
        private final MessageHandlerEventListener listener;
        private final Logger logger;

        private ActiveMQMessageHandler(String queueName, byte[] body, InternalMessageDeserializer internalMessageDeserializer, org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler, MessageHandlerEventListener listener, Logger logger) {
            this.queueName = queueName;
            this.internalMessageDeserializer = internalMessageDeserializer;
            this.body = body;
            this.messageHandler = messageHandler;
            this.listener = listener;
            this.logger = logger;
        }

        public String getKey() {
            return this.queueName;
        }

        public void run() {
            InternalMessage message = null;
            try {
                message = this.internalMessageDeserializer.deserialize(this.body);
                this.messageHandler.handleMessage(message, this.listener);
            }
            catch (Exception e) {
                this.logger.error("Unexpected exception on #handleMessage", (Throwable)e);
            }
        }
    }

    private final class ActiveMQMessageProcessor
    implements ThreadBoundRunnable<String> {
        private final String queueName;
        private final InternalMessageDeserializer internalMessageDeserializer;
        private final org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler;
        private final boolean receiveImmediate;

        private ActiveMQMessageProcessor(String queueName, InternalMessageDeserializer internalMessageDeserializer, org.elasticsoftware.elasticactors.messaging.MessageHandler messageHandler, boolean receiveImmediate) {
            this.queueName = queueName;
            this.internalMessageDeserializer = internalMessageDeserializer;
            this.messageHandler = messageHandler;
            this.receiveImmediate = receiveImmediate;
        }

        public String getKey() {
            return this.queueName;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                ClientMessage clientMessage;
                ClientMessage clientMessage2 = clientMessage = this.receiveImmediate ? LocalMessageQueue.this.consumer.receiveImmediate() : LocalMessageQueue.this.consumer.receive(1L);
                if (clientMessage != null) {
                    byte[] bodyBuffer = new byte[clientMessage.getBodySize()];
                    clientMessage.getBodyBuffer().readBytes(bodyBuffer);
                    InternalMessage message = this.internalMessageDeserializer.deserialize(bodyBuffer);
                    this.messageHandler.handleMessage(message, (MessageHandlerEventListener)new ActiveMQAck(clientMessage));
                }
            }
            catch (ActiveMQException e) {
                logger.error("Unexpected exception on consumer.receive*", (Throwable)e);
            }
            catch (IOException e) {
                logger.error("Exception deserializing InteralMessage", (Throwable)e);
            }
            catch (Exception e) {
                logger.error("Unexpected exception in handleMessage", (Throwable)e);
            }
            finally {
                LocalMessageQueue.this.receiveMessage();
            }
        }
    }
}

