package io.vlingo.lattice.exchange.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.vlingo.lattice.exchange.ExchangeException;
import io.vlingo.lattice.exchange.MessageParameters;
import java.io.IOException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/vlingo/lattice/exchange/rabbitmq/MessageConsumer.class */
public class MessageConsumer {
    private final boolean autoAcknowledged;
    private final BrokerQueue queue;
    private boolean closed;
    private String tag = "";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vlingo/lattice/exchange/rabbitmq/MessageConsumer$DispatchingConsumer.class */
    public class DispatchingConsumer extends DefaultConsumer {
        private final MessageListener messageListener;

        public DispatchingConsumer(Channel channel, MessageListener messageListener) {
            super(channel);
            this.messageListener = messageListener;
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            if (!MessageConsumer.this.isClosed()) {
                handle(this.messageListener, new Message(bArr, MessageParameters.bare().deliveryId(Long.toString(envelope.getDeliveryTag())).exchangeName(envelope.getExchange()).tag(str).redeliver(envelope.isRedeliver()).routing(new String[]{envelope.getRoutingKey()})));
            }
            if (MessageConsumer.this.isClosed()) {
                MessageConsumer.this.queue().close();
            }
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            MessageConsumer.this.close();
        }

        private void handle(MessageListener messageListener, Message message) {
            try {
                messageListener.handleMessage(message);
                ack(message);
            } catch (ExchangeException e) {
                nack(message, e.retry());
            } catch (Throwable th) {
                nack(message, false);
            }
        }

        private void ack(Message message) {
            try {
                if (!MessageConsumer.this.autoAcknowledged) {
                    getChannel().basicAck(Long.parseLong(message.messageParameters.deliveryId()), false);
                }
            } catch (IOException e) {
            }
        }

        private void nack(Message message, boolean z) {
            try {
                if (!MessageConsumer.this.autoAcknowledged) {
                    getChannel().basicNack(Long.parseLong(message.messageParameters.deliveryId()), false, z);
                }
            } catch (IOException e) {
            }
        }
    }

    static MessageConsumer autoAcknowledgedInstance(BrokerQueue brokerQueue) {
        return instance(brokerQueue, true);
    }

    static MessageConsumer instance(BrokerQueue brokerQueue) {
        return new MessageConsumer(brokerQueue, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static MessageConsumer instance(BrokerQueue brokerQueue, boolean z) {
        return new MessageConsumer(brokerQueue, z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.closed = true;
        this.queue.close();
    }

    boolean isClosed() {
        return this.closed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receiveAll(MessageListener messageListener) {
        receiveFor(messageListener);
    }

    String tag() {
        return this.tag;
    }

    protected MessageConsumer(BrokerQueue brokerQueue, boolean z) {
        this.queue = brokerQueue;
        this.autoAcknowledged = z;
        equalizeDistribution();
    }

    protected BrokerQueue queue() {
        return this.queue;
    }

    private void equalizeDistribution() {
        try {
            this.queue.connection.channel().basicQos(1);
        } catch (IOException e) {
            throw new ExchangeException("Cannot equalize distribution.", e);
        }
    }

    private void receiveFor(MessageListener messageListener) {
        Channel channel = this.queue.connection.channel();
        try {
            this.tag = channel.basicConsume(this.queue.name(), this.autoAcknowledged, new DispatchingConsumer(channel, messageListener));
        } catch (IOException e) {
            throw new ExchangeException("Failed to initiate consume because: " + e.getMessage(), e);
        }
    }
}
