package ru.kiryam.storm.rabbitmq;

import backtype.storm.topology.ReportedFailedException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.kiryam.storm.rabbitmq.Declarator;
import ru.kiryam.storm.rabbitmq.Message;
import ru.kiryam.storm.rabbitmq.config.ConnectionConfig;

/* loaded from: input_file:ru/kiryam/storm/rabbitmq/RabbitMQProducer.class */
public class RabbitMQProducer implements Serializable {
    private final Declarator declarator;
    private transient Logger logger;
    private transient ConnectionConfig connectionConfig;
    private transient Connection connection;
    private transient Channel channel;
    private boolean blocked;

    public RabbitMQProducer() {
        this(new Declarator.NoOp());
    }

    public RabbitMQProducer(Declarator declarator) {
        this.blocked = false;
        this.declarator = declarator;
    }

    public void send(Message message) {
        if (message == Message.NONE) {
            return;
        }
        sendMessageWhenNotBlocked((Message.MessageForSending) message);
    }

    private void sendMessageWhenNotBlocked(Message.MessageForSending messageForSending) {
        while (this.blocked) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }
        sendMessageActual(messageForSending);
    }

    private void sendMessageActual(Message.MessageForSending messageForSending) {
        reinitIfNecessary();
        if (this.channel == null) {
            throw new ReportedFailedException("No connection to RabbitMQ");
        }
        try {
            this.channel.basicPublish(messageForSending.getExchangeName(), messageForSending.getRoutingKey(), new AMQP.BasicProperties.Builder().contentType(messageForSending.getContentType()).contentEncoding(messageForSending.getContentEncoding()).deliveryMode(Integer.valueOf(messageForSending.isPersistent() ? 2 : 1)).headers(messageForSending.getHeaders()).build(), messageForSending.getBody());
        } catch (IOException e) {
            this.logger.error("io exception while attempting to send message", e);
            reset();
            throw new ReportedFailedException(e);
        } catch (Exception e2) {
            this.logger.warn("Unexpected error while sending message. Backing off for a bit before trying again (to allow time for recovery)", e2);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e3) {
            }
        } catch (AlreadyClosedException e4) {
            this.logger.error("already closed exception while attempting to send message", e4);
            reset();
            throw new ReportedFailedException(e4);
        }
    }

    public void open(Map map) {
        this.logger = LoggerFactory.getLogger(RabbitMQProducer.class);
        this.connectionConfig = ConnectionConfig.getFromStormConfig(map);
        internalOpen();
    }

    private void internalOpen() {
        try {
            this.connection = createConnection();
            this.channel = this.connection.createChannel();
            this.declarator.execute(this.channel);
        } catch (Exception e) {
            this.logger.error("could not open connection on rabbitmq", e);
            reset();
        }
    }

    public void close() {
        try {
            if (this.channel != null && this.channel.isOpen()) {
                this.channel.close();
            }
        } catch (Exception e) {
            this.logger.debug("error closing channel", e);
        }
        try {
            this.logger.info("closing connection to rabbitmq: " + this.connection);
            this.connection.close();
        } catch (Exception e2) {
            this.logger.debug("error closing connection", e2);
        }
        this.channel = null;
        this.connection = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reset() {
        this.channel = null;
    }

    private void reinitIfNecessary() {
        if (this.channel == null) {
            close();
            internalOpen();
        }
    }

    private Connection createConnection() throws IOException {
        ConnectionFactory asConnectionFactory = this.connectionConfig.asConnectionFactory();
        Connection newConnection = this.connectionConfig.getHighAvailabilityHosts().isEmpty() ? asConnectionFactory.newConnection() : asConnectionFactory.newConnection(this.connectionConfig.getHighAvailabilityHosts().toAddresses());
        newConnection.addShutdownListener(new ShutdownListener() { // from class: ru.kiryam.storm.rabbitmq.RabbitMQProducer.1
            public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                RabbitMQProducer.this.logger.error("shutdown signal received", shutdownSignalException);
                RabbitMQProducer.this.reset();
            }
        });
        newConnection.addBlockedListener(new BlockedListener() { // from class: ru.kiryam.storm.rabbitmq.RabbitMQProducer.2
            public void handleBlocked(String str) throws IOException {
                RabbitMQProducer.this.blocked = true;
                RabbitMQProducer.this.logger.warn(String.format("Got blocked by rabbitmq with reason = %s", str));
            }

            public void handleUnblocked() throws IOException {
                RabbitMQProducer.this.blocked = false;
                RabbitMQProducer.this.logger.warn(String.format("Got unblocked by rabbitmq", new Object[0]));
            }
        });
        this.logger.info("connected to rabbitmq: " + newConnection);
        return newConnection;
    }
}
