package com.mware.core.model.workQueue;

import com.google.inject.Inject;
import com.mware.core.config.Configuration;
import com.mware.core.exception.BcException;
import com.mware.core.ingest.WorkerSpout;
import com.mware.core.ingest.WorkerTuple;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:com/mware/core/model/workQueue/RabbitMQWorkQueueSpout.class */
public class RabbitMQWorkQueueSpout extends WorkerSpout {
    private static final BcLogger LOGGER = BcLoggerFactory.getLogger(RabbitMQWorkQueueSpout.class);
    public static final int DEFAULT_RABBITMQ_PREFETCH_COUNT = 10;
    private final String queueName;
    private Channel channel;
    private QueueingConsumer consumer;
    private Connection connection;
    private Configuration configuration;

    public RabbitMQWorkQueueSpout(String str) {
        this.queueName = str;
    }

    @Override // com.mware.core.ingest.WorkerSpout
    public void open() {
        try {
            this.connection = RabbitMQUtils.openConnection(this.configuration);
            this.channel = RabbitMQUtils.openChannel(this.connection);
            RabbitMQWorkQueueRepository.createQueue(this.channel, this.queueName);
            this.consumer = new QueueingConsumer(this.channel);
            this.channel.basicQos(this.configuration.getInt(Configuration.RABBITMQ_PREFETCH_COUNT, 10).intValue(), false);
            this.channel.basicConsume(this.queueName, false, this.consumer);
        } catch (IOException e) {
            throw new BcException("Could not startup RabbitMQ", e);
        }
    }

    @Override // com.mware.core.ingest.WorkerSpout
    public void close() {
        super.close();
        try {
            LOGGER.debug("Closing RabbitMQ channel", new Object[0]);
            this.channel.close();
            LOGGER.debug("Closing RabbitMQ connection", new Object[0]);
            this.connection.close();
        } catch (IOException | TimeoutException e) {
            LOGGER.error("Could not close RabbitMQ connection and channel", e);
        }
    }

    @Override // com.mware.core.ingest.WorkerSpout
    public WorkerTuple nextTuple() throws InterruptedException {
        QueueingConsumer.Delivery nextDelivery = this.consumer.nextDelivery(100L);
        if (nextDelivery == null) {
            return null;
        }
        return new WorkerTuple(Long.valueOf(nextDelivery.getEnvelope().getDeliveryTag()), nextDelivery.getBody());
    }

    @Override // com.mware.core.ingest.WorkerSpout
    public void ack(WorkerTuple workerTuple) {
        long longValue = ((Long) workerTuple.getMessageId()).longValue();
        try {
            this.channel.basicAck(longValue, false);
        } catch (IOException e) {
            LOGGER.error("Could not ack: %d", Long.valueOf(longValue), e);
        }
    }

    @Override // com.mware.core.ingest.WorkerSpout
    public void fail(WorkerTuple workerTuple) {
        long longValue = ((Long) workerTuple.getMessageId()).longValue();
        try {
            this.channel.basicNack(longValue, false, false);
        } catch (IOException e) {
            LOGGER.error("Could not ack: %d", Long.valueOf(longValue), e);
        }
    }

    protected QueueingConsumer getConsumer() {
        return this.consumer;
    }

    @Inject
    public void setConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }
}
