package pcosta.kafka.internal;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pcosta.kafka.internal.MessageReceiver;

/* loaded from: input_file:pcosta/kafka/internal/KafkaReceiver.class */
class KafkaReceiver<KEY, IN> {
    private static final int POLL_TIMEOUT = 1000;
    private Thread consumerThread;
    private KafkaConsumer<KEY, IN> consumer;
    private final String topic;
    private final Deserializer<KEY> keyDeserializer;
    private final Deserializer<IN> valueDeserializer;
    private final Map<String, Object> consumerProperties;
    private final int partition;
    private final MessageReceiver.MessageProcessor delegate;
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiver.class);
    private static AtomicBoolean shuttingDown = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaReceiver(String str, Deserializer<KEY> deserializer, Deserializer<IN> deserializer2, MessageReceiver.MessageProcessor messageProcessor) {
        this.topic = str;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
        this.delegate = messageProcessor;
        this.partition = messageProcessor.partition;
        this.consumerProperties = loadConsumerProps();
    }

    KafkaReceiver(String str, Deserializer<KEY> deserializer, Deserializer<IN> deserializer2, MessageReceiver.MessageProcessor messageProcessor, int i, Map<String, Object> map) {
        this.topic = str;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
        this.delegate = messageProcessor;
        this.partition = i;
        this.consumerProperties = map;
    }

    private void createConsumer() {
        log.info("Initiating Kafka Receiver for Topic: {} Partition: {} Initial Offset: {}", new Object[]{this.topic, Integer.valueOf(this.partition), Long.valueOf(this.delegate.initialOffset)});
        this.consumer = getKafkaConsumer();
        this.consumerThread = new Thread(this::pollRecords, this.topic + "KafkaConsumer");
    }

    private void pollRecords() {
        do {
            try {
                try {
                    Iterator it = this.consumer.poll(1000L).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        try {
                            this.delegate.process((byte[]) consumerRecord.value(), this.topic, new StringMessageKey(consumerRecord.key()), consumerRecord.offset());
                        } catch (IllegalArgumentException e) {
                            log.error("Impossible to deliver message to processor: {}", e.getMessage(), e);
                            this.delegate.processError(new PlatformErrorImpl(e.getMessage(), e.getCause()));
                        }
                    }
                } finally {
                    this.consumer.close();
                }
            } catch (Exception e2) {
                log.error("SEVERE error pooling records: ", e2);
                this.consumer.close();
                return;
            }
        } while (!shuttingDown.get());
    }

    private KafkaConsumer<KEY, IN> getKafkaConsumer() {
        long j = this.delegate.initialOffset;
        try {
            KafkaConsumer<KEY, IN> kafkaConsumer = new KafkaConsumer<>(this.consumerProperties);
            TopicPartition topicPartition = new TopicPartition(this.topic, 0);
            List singletonList = Collections.singletonList(topicPartition);
            kafkaConsumer.assign(singletonList);
            OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
            if (j != -1 && (committed == null || committed.offset() < j)) {
                log.warn("Provided offset: {} is ahead of the last committed one: {}. Assuming LATEST", Long.valueOf(j), committed);
                this.delegate.processError(new PlatformErrorImpl("Invalid offset provided", new NoOffsetForPartitionException(topicPartition)));
                this.delegate.initialOffset = -1L;
            }
            if (j == -1) {
                kafkaConsumer.seekToEnd(singletonList);
                kafkaConsumer.poll(1000L);
            } else if (j == -3) {
                kafkaConsumer.seekToBeginning(singletonList);
            } else if (j != -2) {
                kafkaConsumer.seek(topicPartition, j + 1);
            }
            return kafkaConsumer;
        } catch (InvalidOffsetException e) {
            log.error("Invalid offset provided: {} resetting to Earliest Offset", Long.valueOf(j), e);
            this.delegate.processError(new PlatformErrorImpl(e.getMessage(), e.getCause()));
            this.delegate.initialOffset = -3L;
            return getKafkaConsumer();
        }
    }

    private Map<String, Object> loadConsumerProps() {
        Map<String, Object> loadConsumerProps = PropertiesReader.getInstance().loadConsumerProps();
        loadConsumerProps.put("client.id", this.topic.toLowerCase());
        loadConsumerProps.put("group.id", this.topic.toLowerCase() + "Consumer");
        loadConsumerProps.put("key.deserializer", this.keyDeserializer.getClass().getName());
        loadConsumerProps.put("value.deserializer", this.valueDeserializer.getClass().getName());
        return loadConsumerProps;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        createConsumer();
        shuttingDown.set(false);
        this.consumerThread.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        shuttingDown.set(true);
    }
}
