package com.abiquo.commons.amqp.consumer;

import com.abiquo.commons.amqp.AMQPConfiguration;
import com.abiquo.commons.amqp.AMQPProperties;
import com.abiquo.commons.amqp.ChannelHandler;
import com.abiquo.commons.amqp.consumer.retry.DelayedRetryStrategy;
import com.abiquo.commons.amqp.util.ConsumerUtils;
import com.abiquo.commons.amqp.util.JSONUtils;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/abiquo/commons/amqp/consumer/AMQPConsumer.class */
public class AMQPConsumer<T extends Serializable> extends ChannelHandler {
    private static final Logger LOG = LoggerFactory.getLogger(AMQPConsumer.class);
    private AMQPConfiguration configuration;
    private QueueSubscriber<AMQPConsumer<T>> subscriber;
    protected AMQPCallback<T> callback;
    private Class<? extends RetryStrategy> strategyClass;
    private Class<T> messageClass;

    public static <S extends Serializable> AMQPConsumer<S> of(AMQPConfiguration aMQPConfiguration, Class<S> cls, AMQPCallback<S> aMQPCallback) {
        return new AMQPConsumer<>(aMQPConfiguration, cls, aMQPCallback);
    }

    public AMQPConsumer(AMQPConfiguration aMQPConfiguration, Class<T> cls, AMQPCallback<T> aMQPCallback, Class<? extends RetryStrategy> cls2) {
        this.configuration = aMQPConfiguration;
        this.messageClass = cls;
        this.callback = aMQPCallback;
        this.strategyClass = cls2;
    }

    public AMQPConsumer(AMQPConfiguration aMQPConfiguration, Class<T> cls, AMQPCallback<T> aMQPCallback) {
        this(aMQPConfiguration, cls, aMQPCallback, DelayedRetryStrategy.class);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void consume(Envelope envelope, byte[] bArr) throws IOException {
        Serializable serializable = (Serializable) JSONUtils.deserialize(bArr, this.messageClass);
        if (serializable == null) {
            getChannel().basicReject(envelope.getDeliveryTag(), false);
        } else {
            this.callback.process(serializable);
            getChannel().basicAck(envelope.getDeliveryTag(), false);
        }
    }

    public void start() {
        try {
            startConsumer();
        } catch (Exception e) {
            LOG.error("Unable to connect to {}", AMQPProperties.getBrokerHost());
            reconnectAsync();
        }
    }

    private void startConsumer() throws IOException {
        openChannelAndConnection();
        getChannel().basicQos(getPrefetchCount());
        this.configuration.declareExchanges(getChannel());
        this.configuration.declareQueues(getChannel());
        this.subscriber = new QueueSubscriber<>(getChannel(), this);
        getChannel().basicConsume(this.configuration.getQueue(), false, this.subscriber);
    }

    public void stop() throws IOException {
        stopConsumer();
    }

    private void stopConsumer() throws IOException {
        getChannel().basicCancel(this.subscriber.getConsumerTag());
        closeChannelAndConnection();
    }

    public int getPrefetchCount() {
        return 1;
    }

    public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
        LOG.error("Connection lost to {}", AMQPProperties.getBrokerHost());
        reconnect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnect() {
        try {
            RetryStrategy newInstance = this.strategyClass.newInstance();
            while (newInstance.shouldRetry()) {
                LOG.debug("Try to reconnect to {}", AMQPProperties.getBrokerHost());
                try {
                    startConsumer();
                    LOG.debug("And we are back!");
                    return;
                } catch (Exception e) {
                }
            }
        } catch (Exception e2) {
            LOG.debug("Unable to instance new retry strategy");
        }
        LOG.debug("Unable to reconnect to {}", AMQPProperties.getBrokerHost());
    }

    private void reconnectAsync() {
        ConsumerUtils.reconnectionExecutor.submit(new Runnable() { // from class: com.abiquo.commons.amqp.consumer.AMQPConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                AMQPConsumer.this.reconnect();
            }
        });
    }
}
