package org.wso2.carbon.inbound.endpoint.protocol.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.Properties;
import org.apache.axiom.om.OMException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/wso2/carbon/inbound/endpoint/protocol/rabbitmq/RabbitMQConnectionConsumer.class */
public class RabbitMQConnectionConsumer {
    private static final Log log = LogFactory.getLog(RabbitMQConnectionConsumer.class);
    private RabbitMQConnectionFactory rabbitMQConnectionFactory;
    private Properties rabbitMQProperties;
    private static final int STATE_STOPPED = 0;
    private static final int STATE_STARTED = 1;
    private static final int STATE_PAUSED = 2;
    private static final int STATE_SHUTTING_DOWN = 3;
    private static final int STATE_FAILURE = 4;
    private static final int STATE_FAULTY = 5;
    private String inboundName;
    private QueueingConsumer queueingConsumer;
    private String queueName;
    private String routeKey;
    private String exchangeName;
    private RabbitMQInjectHandler injectHandler;
    private String consumerTagString;
    private volatile int workerState = 0;
    private Connection connection = null;
    private Channel channel = null;
    private boolean autoAck = false;
    private Hashtable<String, String> rabbitMQProps = new Hashtable<>();
    private volatile boolean connected = false;
    private volatile boolean idle = false;

    public RabbitMQConnectionConsumer(RabbitMQConnectionFactory rabbitMQConnectionFactory, Properties properties, RabbitMQInjectHandler rabbitMQInjectHandler) {
        this.rabbitMQConnectionFactory = rabbitMQConnectionFactory;
        this.rabbitMQProperties = properties;
        this.injectHandler = rabbitMQInjectHandler;
        for (String str : properties.stringPropertyNames()) {
            this.rabbitMQProps.put(str, properties.getProperty(str));
        }
    }

    public void execute() {
        try {
            this.workerState = 1;
            initConsumer();
            while (this.workerState == 1) {
                try {
                    startConsumer();
                } catch (IOException e) {
                    log.error("RabbitMQ Listener of the inbound " + this.inboundName + " was disconnected", e);
                    waitForConnection();
                } catch (OMException e2) {
                    log.error("Invalid Message Format while consuming the message", e2);
                } catch (ShutdownSignalException e3) {
                    if (!e3.isInitiatedByApplication()) {
                        log.error("RabbitMQ Listener of the inbound " + this.inboundName + " was disconnected", e3);
                        waitForConnection();
                    }
                }
            }
        } catch (IOException e4) {
            handleException("Error initializing consumer for inbound " + this.inboundName, e4);
        } finally {
            closeConnection();
            this.workerState = 0;
        }
    }

    private void waitForConnection() throws IOException {
        int retryInterval = this.rabbitMQConnectionFactory.getRetryInterval();
        int retryCount = this.rabbitMQConnectionFactory.getRetryCount();
        int i = 0;
        while (this.workerState == 1 && !this.connection.isOpen() && (retryCount == -1 || i < retryCount)) {
            i++;
            log.info("Attempting to reconnect to RabbitMQ Broker for the inbound " + this.inboundName + " in " + retryInterval + " ms");
            try {
                Thread.sleep(retryInterval);
            } catch (InterruptedException e) {
                log.error("Error while trying to reconnect to RabbitMQ Broker for the inbound " + this.inboundName, e);
            }
        }
        if (this.connection.isOpen()) {
            log.info("Successfully reconnected to RabbitMQ Broker for the inbound " + this.inboundName);
            initConsumer();
        } else {
            log.error("Could not reconnect to the RabbitMQ Broker for the inbound " + this.inboundName + ". Connection is closed.");
            this.workerState = 5;
        }
    }

    private void startConsumer() throws ShutdownSignalException, IOException {
        this.connection = getConnection();
        if (this.channel == null || !this.channel.isOpen()) {
            this.channel = this.connection.createChannel();
            log.debug("Channel is not open. Creating a new channel for inbound " + this.inboundName);
        }
        String property = this.rabbitMQProperties.getProperty(RabbitMQConstants.CONSUMER_QOS);
        if (property != null && !property.isEmpty()) {
            this.channel.basicQos(Integer.parseInt(property));
        }
        if (this.queueingConsumer == null) {
            this.workerState = 0;
            return;
        }
        while (isActive()) {
            try {
                if (!this.channel.isOpen()) {
                    this.channel = this.queueingConsumer.getChannel();
                }
                this.channel.txSelect();
                boolean z = false;
                try {
                    RabbitMQMessage consumerDelivery = getConsumerDelivery(this.queueingConsumer);
                    if (consumerDelivery != null) {
                        this.idle = false;
                        try {
                            z = this.injectHandler.invoke(consumerDelivery, this.inboundName);
                            if (z) {
                                try {
                                    if (!this.autoAck) {
                                        this.channel.basicAck(consumerDelivery.getDeliveryTag(), false);
                                    }
                                    this.channel.txCommit();
                                } catch (IOException e) {
                                    log.error("Error while committing transaction", e);
                                }
                            } else {
                                try {
                                    this.channel.txRollback();
                                    this.channel.basicRecover();
                                } catch (IOException e2) {
                                    log.error("Error while trying to roll back transaction", e2);
                                }
                            }
                        } catch (Throwable th) {
                            if (z) {
                                try {
                                    if (!this.autoAck) {
                                        this.channel.basicAck(consumerDelivery.getDeliveryTag(), false);
                                    }
                                    this.channel.txCommit();
                                } catch (IOException e3) {
                                    log.error("Error while committing transaction", e3);
                                }
                            } else {
                                try {
                                    this.channel.txRollback();
                                    this.channel.basicRecover();
                                } catch (IOException e4) {
                                    log.error("Error while trying to roll back transaction", e4);
                                }
                            }
                            throw th;
                        }
                    } else {
                        this.idle = true;
                    }
                } catch (InterruptedException e5) {
                    log.error("Error while consuming message", e5);
                }
            } catch (IOException e6) {
                log.error("Error while starting transaction", e6);
            }
        }
    }

    private void initConsumer() throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("Initializing consumer for inbound " + this.inboundName);
        }
        this.connection = getConnection();
        this.channel = this.connection.createChannel();
        this.queueName = this.rabbitMQProperties.getProperty(RabbitMQConstants.QUEUE_NAME);
        this.routeKey = this.rabbitMQProperties.getProperty(RabbitMQConstants.QUEUE_ROUTING_KEY);
        this.exchangeName = this.rabbitMQProperties.getProperty(RabbitMQConstants.EXCHANGE_NAME);
        String property = this.rabbitMQProperties.getProperty(RabbitMQConstants.QUEUE_AUTO_ACK);
        if (property != null) {
            try {
                this.autoAck = Boolean.parseBoolean(property);
            } catch (Exception e) {
                log.debug("Format error in rabbitmq.queue.auto.ack parameter");
            }
        }
        if (StringUtils.isEmpty(this.queueName)) {
            this.queueName = this.inboundName;
            log.info("No queue name is specified for " + this.inboundName + ". inbound factory name will be used as queue name");
        }
        if (this.routeKey == null) {
            log.info("No routing key specified. Using queue name as the routing key.");
            this.routeKey = this.queueName;
        }
        if (!StringUtils.isEmpty(this.queueName)) {
            RabbitMQUtils.declareQueue(this.connection, this.queueName, this.rabbitMQProps);
        }
        if (!StringUtils.isEmpty(this.exchangeName)) {
            RabbitMQUtils.declareExchange(this.connection, this.exchangeName, this.rabbitMQProps);
            if (!this.channel.isOpen()) {
                this.channel = this.connection.createChannel();
                if (log.isDebugEnabled()) {
                    log.debug("Channel is not open. Creating a new channel for inbound " + this.inboundName);
                }
            }
            this.channel.queueBind(this.queueName, this.exchangeName, this.routeKey);
            log.debug("Bind queue '" + this.queueName + "' to exchange '" + this.exchangeName + "' with route key '" + this.routeKey + "'");
        }
        if (!this.channel.isOpen()) {
            this.channel = this.connection.createChannel();
            log.debug("Channel is not open. Creating a new channel for inbound " + this.inboundName);
        }
        String property2 = this.rabbitMQProperties.getProperty(RabbitMQConstants.CONSUMER_QOS);
        if (property2 != null && !property2.isEmpty()) {
            this.channel.basicQos(Integer.parseInt(property2));
        }
        this.queueingConsumer = new QueueingConsumer(this.channel);
        this.consumerTagString = this.rabbitMQProperties.getProperty(RabbitMQConstants.CONSUMER_TAG);
        if (this.consumerTagString != null) {
            this.channel.basicConsume(this.queueName, this.autoAck, this.consumerTagString, this.queueingConsumer);
            log.debug("Start consuming queue '" + this.queueName + "' with consumer tag '" + this.consumerTagString + "' for inbound " + this.inboundName);
        } else {
            this.consumerTagString = this.channel.basicConsume(this.queueName, this.autoAck, this.queueingConsumer);
            log.debug("Start consuming queue '" + this.queueName + "' with consumer tag '" + this.consumerTagString + "' for inbound " + this.inboundName);
        }
    }

    private RabbitMQMessage getConsumerDelivery(QueueingConsumer queueingConsumer) throws InterruptedException, ShutdownSignalException {
        RabbitMQMessage rabbitMQMessage = new RabbitMQMessage();
        try {
            log.debug("Waiting for next delivery from queue for inbound " + this.inboundName);
            QueueingConsumer.Delivery nextDelivery = queueingConsumer.nextDelivery();
            if (nextDelivery == null) {
                log.debug("Queue delivery item is null for inbound " + this.inboundName);
                return null;
            }
            AMQP.BasicProperties properties = nextDelivery.getProperties();
            Map<String, Object> headers = properties.getHeaders();
            rabbitMQMessage.setBody(nextDelivery.getBody());
            rabbitMQMessage.setDeliveryTag(nextDelivery.getEnvelope().getDeliveryTag());
            rabbitMQMessage.setReplyTo(properties.getReplyTo());
            rabbitMQMessage.setMessageId(properties.getMessageId());
            String contentType = properties.getContentType();
            if (contentType == null) {
                contentType = this.rabbitMQProperties.getProperty(RabbitMQConstants.CONTENT_TYPE);
            }
            rabbitMQMessage.setContentType(contentType);
            rabbitMQMessage.setContentEncoding(properties.getContentEncoding());
            rabbitMQMessage.setCorrelationId(properties.getCorrelationId());
            if (headers != null) {
                rabbitMQMessage.setHeaders(headers);
                if (headers.get(RabbitMQConstants.SOAP_ACTION) != null) {
                    rabbitMQMessage.setSoapAction(headers.get(RabbitMQConstants.SOAP_ACTION).toString());
                }
            }
            return rabbitMQMessage;
        } catch (ConsumerCancelledException e) {
            return null;
        } catch (InterruptedException e2) {
            return null;
        } catch (ShutdownSignalException e3) {
            return null;
        }
    }

    private void closeConnection() {
        if (this.connection == null || !this.connection.isOpen()) {
            return;
        }
        try {
            this.connection.close();
            log.info("RabbitMQ connection closed for inbound " + this.inboundName);
        } catch (IOException e) {
            log.error("Error while closing RabbitMQ connection for inbound " + this.inboundName, e);
        } finally {
            this.connection = null;
        }
    }

    private Connection createConnection() throws IOException {
        Connection connection = null;
        try {
            connection = this.rabbitMQConnectionFactory.createConnection();
            log.info("RabbitMQ connection created for inbound " + this.inboundName);
        } catch (Exception e) {
            handleException("Error while creating RabbitMQ connection for inbound " + this.inboundName, e);
        }
        return connection;
    }

    private Connection getConnection() throws IOException {
        if (this.connection == null) {
            this.connection = createConnection();
            setConnected(true);
        }
        return this.connection;
    }

    private boolean isActive() {
        return this.workerState == 1;
    }

    public void setConnected(boolean z) {
        this.connected = z;
    }

    public String getInboundName() {
        return this.inboundName;
    }

    public void setInboundName(String str) {
        this.inboundName = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void requestShutdown() {
        this.workerState = 3;
        closeConnection();
    }

    private void handleException(String str, Exception exc) {
        log.error(str, exc);
        throw new RabbitMQException(str, exc);
    }
}
