package com.github.cafdataprocessing.utilities.queuehelper;

import com.google.common.base.Strings;
import com.hpe.caf.api.Codec;
import com.hpe.caf.api.CodecException;
import com.hpe.caf.api.worker.TaskMessage;
import com.hpe.caf.codec.JsonCodec;
import com.hpe.caf.configs.RabbitConfiguration;
import com.hpe.caf.util.rabbitmq.DefaultRabbitConsumer;
import com.hpe.caf.util.rabbitmq.RabbitUtil;
import com.hpe.caf.worker.queue.rabbit.RabbitWorkerQueueConfiguration;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/cafdataprocessing/utilities/queuehelper/QueueManager.class */
public class QueueManager implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) QueueManager.class);
    private static QueueManager instance;
    private final DefaultRabbitConsumer consumer;
    private final Channel publisherChannel;
    private final Channel consumerChannel;
    private final Thread consumerThread;
    private final RabbitServices services = RabbitServices.getInstance();
    private final List<String> consumerTags = new ArrayList();
    private final MessageHandler messageHandler = new MessageHandler();
    private final Codec codec = new JsonCodec();
    private final Connection connection = createRabbitConnection(RabbitServices.getInstance().getRabbitQueueConfiguration());
    private final String publishQueue = this.services.getRabbitProperties().getPublishQueue();
    private final List<String> consumeQueues = this.services.getRabbitProperties().getConsumeQueueNames();

    public static QueueManager getInstance() throws Exception {
        if (instance == null) {
            instance = new QueueManager();
        }
        return instance;
    }

    private QueueManager() throws Exception {
        if (Strings.isNullOrEmpty(this.publishQueue)) {
            LOGGER.debug("No RabbitMQ queue to publish to passed. Check your RabbitMQ properties if this is unexpected.");
            this.publisherChannel = null;
        } else {
            this.publisherChannel = this.connection.createChannel();
            RabbitUtil.declareWorkerQueue(this.publisherChannel, this.publishQueue);
        }
        if (this.consumeQueues == null || this.consumeQueues.isEmpty()) {
            LOGGER.debug("No RabbitMQ queues to consume message from set. Check your RabbitMQ properties if this is unexpected.");
            this.consumerChannel = null;
            this.consumer = null;
            this.consumerThread = null;
            return;
        }
        this.consumerChannel = this.connection.createChannel();
        Iterator<String> it = this.consumeQueues.iterator();
        while (it.hasNext()) {
            RabbitUtil.declareWorkerQueue(this.consumerChannel, it.next());
        }
        this.consumer = new DefaultRabbitConsumer(new LinkedBlockingQueue(), new QueueConsumerImpl(this.consumerChannel, this.messageHandler, this.codec));
        Iterator<String> it2 = this.consumeQueues.iterator();
        while (it2.hasNext()) {
            this.consumerTags.add(this.consumerChannel.basicConsume(it2.next(), true, this.consumer));
        }
        this.consumerThread = new Thread(this.consumer);
        this.consumerThread.start();
    }

    public void publishMessage(TaskMessage taskMessage) throws CodecException, IOException {
        if (Strings.isNullOrEmpty(this.publishQueue)) {
            LOGGER.warn("Attempted to publish message when publish queue is not set. Message not published.");
            return;
        }
        LOGGER.info("Publishing task message " + taskMessage.getTaskId());
        this.publisherChannel.basicPublish("", this.publishQueue, MessageProperties.TEXT_PLAIN, this.codec.serialise(taskMessage));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.consumerTags != null) {
            for (String str : this.consumerTags) {
                if (str != null) {
                    this.consumerChannel.basicCancel(str);
                }
            }
        }
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.consumerChannel != null) {
            try {
                this.consumerChannel.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
        if (this.publisherChannel != null) {
            try {
                this.publisherChannel.close();
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (IOException e3) {
                e3.printStackTrace();
            }
        }
    }

    public static Connection createRabbitConnection(RabbitWorkerQueueConfiguration rabbitWorkerQueueConfiguration) throws IOException, TimeoutException {
        RabbitConfiguration rabbitConfiguration = rabbitWorkerQueueConfiguration.getRabbitConfiguration();
        return RabbitUtil.createRabbitConnection(RabbitUtil.createLyraConnectionOptions(rabbitConfiguration.getRabbitHost(), rabbitConfiguration.getRabbitPort(), rabbitConfiguration.getRabbitUser(), rabbitConfiguration.getRabbitPassword()), RabbitUtil.createLyraConfig(rabbitConfiguration.getBackoffInterval(), rabbitConfiguration.getMaxBackoffInterval(), rabbitConfiguration.getMaxAttempts()));
    }

    public MessageHandler getMessageHandler() {
        return this.messageHandler;
    }
}
