/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.xoom.lattice.exchange.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.vlingo.xoom.lattice.exchange.ExchangeException;
import io.vlingo.xoom.lattice.exchange.MessageParameters;
import io.vlingo.xoom.lattice.exchange.rabbitmq.BrokerQueue;
import io.vlingo.xoom.lattice.exchange.rabbitmq.Message;
import io.vlingo.xoom.lattice.exchange.rabbitmq.MessageListener;
import java.io.IOException;

class MessageConsumer {
    private final boolean autoAcknowledged;
    private final BrokerQueue queue;
    private boolean closed;
    private String tag;

    static MessageConsumer autoAcknowledgedInstance(BrokerQueue queue) {
        return MessageConsumer.instance(queue, true);
    }

    static MessageConsumer instance(BrokerQueue queue) {
        return new MessageConsumer(queue, false);
    }

    static MessageConsumer instance(BrokerQueue queue, boolean autoAcknowledged) {
        return new MessageConsumer(queue, autoAcknowledged);
    }

    void close() {
        this.closed = true;
        this.queue.close();
    }

    boolean isClosed() {
        return this.closed;
    }

    void receiveAll(MessageListener messageListener) {
        this.receiveFor(messageListener);
    }

    String tag() {
        return this.tag;
    }

    protected MessageConsumer(BrokerQueue queue, boolean autoAcknowledged) {
        this.queue = queue;
        this.autoAcknowledged = autoAcknowledged;
        this.tag = "";
        this.equalizeDistribution();
    }

    protected BrokerQueue queue() {
        return this.queue;
    }

    private void equalizeDistribution() {
        try {
            this.queue.connection.channel().basicQos(1);
        }
        catch (IOException e) {
            throw new ExchangeException("Cannot equalize distribution.", (Throwable)e);
        }
    }

    private void receiveFor(MessageListener messageListener) {
        Channel channel = this.queue.connection.channel();
        try {
            this.tag = channel.basicConsume(this.queue.name(), this.autoAcknowledged, (Consumer)new DispatchingConsumer(channel, messageListener));
        }
        catch (IOException e) {
            throw new ExchangeException("Failed to initiate consume because: " + e.getMessage(), (Throwable)e);
        }
    }

    private class DispatchingConsumer
    extends DefaultConsumer {
        private final MessageListener messageListener;

        public DispatchingConsumer(Channel channel, MessageListener messageListener) {
            super(channel);
            this.messageListener = messageListener;
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            if (!MessageConsumer.this.isClosed()) {
                MessageParameters parameters = MessageParameters.bare().deliveryId(Long.toString(envelope.getDeliveryTag())).exchangeName(envelope.getExchange()).tag(consumerTag).typeName(properties.getType()).redeliver(envelope.isRedeliver()).routing(new String[]{envelope.getRoutingKey()});
                this.handle(this.messageListener, new Message(body, parameters));
            }
            if (MessageConsumer.this.isClosed()) {
                MessageConsumer.this.queue().close();
            }
        }

        public void handleShutdownSignal(String consumerTag, ShutdownSignalException aSignal) {
            MessageConsumer.this.close();
        }

        private void handle(MessageListener messageListener, Message message) {
            try {
                messageListener.handleMessage(message);
                this.ack(message);
            }
            catch (ExchangeException e) {
                this.nack(message, e.retry());
            }
            catch (Throwable t) {
                this.nack(message, false);
            }
        }

        private void ack(Message message) {
            try {
                if (!MessageConsumer.this.autoAcknowledged) {
                    this.getChannel().basicAck(Long.parseLong(message.messageParameters.deliveryId()), false);
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }

        private void nack(Message message, boolean retry) {
            try {
                if (!MessageConsumer.this.autoAcknowledged) {
                    this.getChannel().basicNack(Long.parseLong(message.messageParameters.deliveryId()), false, retry);
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

