package io.latent.storm.rabbitmq;

import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import io.latent.storm.rabbitmq.Declarator;
import io.latent.storm.rabbitmq.Message;
import io.latent.storm.rabbitmq.MessageScheme;
import io.latent.storm.rabbitmq.config.ConsumerConfig;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/latent/storm/rabbitmq/RabbitMQSpout.class */
public class RabbitMQSpout extends BaseRichSpout {
    private final MessageScheme scheme;
    private final Declarator declarator;
    private transient Logger logger;
    private transient RabbitMQConsumer consumer;
    private transient SpoutOutputCollector collector;
    private transient int prefetchCount;
    private boolean active;
    private String streamId;

    public RabbitMQSpout(Scheme scheme) {
        this(MessageScheme.Builder.from(scheme), (Declarator) new Declarator.NoOp(), (String) null);
    }

    public RabbitMQSpout(Scheme scheme, String str) {
        this(MessageScheme.Builder.from(scheme), (Declarator) new Declarator.NoOp(), str);
    }

    public RabbitMQSpout(Scheme scheme, Declarator declarator) {
        this(MessageScheme.Builder.from(scheme), declarator, (String) null);
    }

    public RabbitMQSpout(MessageScheme messageScheme, Declarator declarator) {
        this(messageScheme, declarator, (String) null);
    }

    public RabbitMQSpout(Scheme scheme, Declarator declarator, String str) {
        this(MessageScheme.Builder.from(scheme), declarator, str);
    }

    public RabbitMQSpout(MessageScheme messageScheme, Declarator declarator, String str) {
        this.scheme = messageScheme;
        this.declarator = declarator;
        this.streamId = str;
    }

    public void open(Map map, TopologyContext topologyContext, final SpoutOutputCollector spoutOutputCollector) {
        ConsumerConfig fromStormConfig = ConsumerConfig.getFromStormConfig(map);
        this.consumer = loadConsumer(this.declarator, new ErrorReporter() { // from class: io.latent.storm.rabbitmq.RabbitMQSpout.1
            @Override // io.latent.storm.rabbitmq.ErrorReporter
            public void reportError(Throwable th) {
                spoutOutputCollector.reportError(th);
            }
        }, fromStormConfig);
        this.scheme.open(map, topologyContext);
        this.consumer.open();
        this.prefetchCount = fromStormConfig.getPrefetchCount();
        this.logger = LoggerFactory.getLogger(RabbitMQSpout.class);
        this.collector = spoutOutputCollector;
        this.active = true;
    }

    protected RabbitMQConsumer loadConsumer(Declarator declarator, ErrorReporter errorReporter, ConsumerConfig consumerConfig) {
        return new RabbitMQConsumer(consumerConfig.getConnectionConfig(), consumerConfig.getPrefetchCount(), consumerConfig.getQueueName(), consumerConfig.isRequeueOnFail(), declarator, errorReporter);
    }

    public void close() {
        this.consumer.close();
        this.scheme.close();
        super.close();
    }

    public void nextTuple() {
        Message nextMessage;
        if (this.active) {
            int i = 0;
            while (i < this.prefetchCount && (nextMessage = this.consumer.nextMessage()) != Message.NONE) {
                List<Object> extractTuple = extractTuple(nextMessage);
                if (!extractTuple.isEmpty()) {
                    emit(extractTuple, nextMessage, this.collector);
                    i++;
                }
            }
        }
    }

    protected List<Integer> emit(List<Object> list, Message message, SpoutOutputCollector spoutOutputCollector) {
        return this.streamId == null ? spoutOutputCollector.emit(list, Long.valueOf(getDeliveryTag(message))) : spoutOutputCollector.emit(this.streamId, list, Long.valueOf(getDeliveryTag(message)));
    }

    private List<Object> extractTuple(Message message) {
        List<Object> deserialize;
        long deliveryTag = getDeliveryTag(message);
        try {
            deserialize = this.scheme.deserialize(message);
        } catch (Exception e) {
            this.logger.warn("Deserialization error for msgId " + deliveryTag, e);
            this.collector.reportError(e);
        }
        if (deserialize != null && !deserialize.isEmpty()) {
            return deserialize;
        }
        String str = "Deserialization error for msgId " + deliveryTag;
        this.logger.warn(str);
        this.collector.reportError(new Exception(str));
        this.consumer.deadLetter(Long.valueOf(deliveryTag));
        return Collections.emptyList();
    }

    public void ack(Object obj) {
        if (obj instanceof Long) {
            this.consumer.ack((Long) obj);
        }
    }

    public void fail(Object obj) {
        if (obj instanceof Long) {
            this.consumer.fail((Long) obj);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        if (this.streamId == null) {
            outputFieldsDeclarer.declare(this.scheme.getOutputFields());
        } else {
            outputFieldsDeclarer.declareStream(this.streamId, this.scheme.getOutputFields());
        }
    }

    public void deactivate() {
        super.deactivate();
        this.active = false;
    }

    public void activate() {
        super.activate();
        this.active = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getDeliveryTag(Message message) {
        return ((Message.DeliveredMessage) message).getDeliveryTag();
    }
}
