package org.apache.flink.streaming.connectors.kafka.internal;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.class */
public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Kafka09Fetcher.class);
    private final KeyedDeserializationSchema<T> deserializer;
    private final RuntimeContext runtimeContext;
    private final Properties kafkaProperties;
    private final long pollTimeout;
    private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
    private final OffsetCommitCallback offsetCommitCallback;
    private volatile KafkaConsumer<byte[], byte[]> consumer;
    private volatile ExceptionProxy errorHandler;
    private volatile boolean running;
    private volatile boolean commitInProgress;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher$CommitCallback.class */
    private class CommitCallback implements OffsetCommitCallback {
        private CommitCallback() {
        }

        @Override // org.apache.kafka.clients.consumer.OffsetCommitCallback
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            Kafka09Fetcher.this.commitInProgress = false;
            if (exc != null) {
                Kafka09Fetcher.LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", (Throwable) exc);
            }
        }
    }

    public Kafka09Fetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> list, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties, long j, boolean z) throws Exception {
        super(sourceContext, list, serializedValue, serializedValue2, streamingRuntimeContext, z);
        this.running = true;
        this.deserializer = keyedDeserializationSchema;
        this.runtimeContext = streamingRuntimeContext;
        this.kafkaProperties = properties;
        this.pollTimeout = j;
        this.nextOffsetsToCommit = new AtomicReference<>();
        this.offsetCommitCallback = new CommitCallback();
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(!streamingRuntimeContext.isCheckpointingEnabled()));
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
    public void runFetchLoop() throws Exception {
        this.errorHandler = new ExceptionProxy(Thread.currentThread());
        Thread thread = new Thread(this, "Kafka 0.9 Fetcher for " + this.runtimeContext.getTaskNameWithSubtasks());
        thread.setDaemon(true);
        thread.start();
        try {
            thread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.errorHandler.checkAndThrowException();
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
    public void cancel() {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(this.kafkaProperties);
            try {
                try {
                    kafkaConsumer.assign(convertKafkaPartitions(subscribedPartitions()));
                    if (this.useMetrics) {
                        MetricGroup addGroup = this.runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
                        addOffsetStateGauge(addGroup);
                        Map<MetricName, ? extends Metric> metrics = kafkaConsumer.metrics();
                        if (metrics == null) {
                            LOG.info("Consumer implementation does not support metrics");
                        } else {
                            for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
                                addGroup.gauge(entry.getKey().name(), (String) new KafkaMetricWrapper(entry.getValue()));
                            }
                        }
                    }
                    for (KafkaTopicPartitionState kafkaTopicPartitionState : subscribedPartitions()) {
                        if (kafkaTopicPartitionState.isOffsetDefined()) {
                            kafkaConsumer.seek((TopicPartition) kafkaTopicPartitionState.getKafkaPartitionHandle(), kafkaTopicPartitionState.getOffset() + 1);
                        }
                    }
                    this.consumer = kafkaConsumer;
                    while (this.running) {
                        Map<TopicPartition, OffsetAndMetadata> andSet = this.nextOffsetsToCommit.getAndSet(null);
                        if (andSet != null && !this.commitInProgress) {
                            this.commitInProgress = true;
                            kafkaConsumer.commitAsync(andSet, this.offsetCommitCallback);
                        }
                        try {
                            ConsumerRecords<byte[], byte[]> poll = kafkaConsumer.poll(this.pollTimeout);
                            for (KafkaTopicPartitionState kafkaTopicPartitionState2 : subscribedPartitions()) {
                                Iterator<ConsumerRecord<byte[], byte[]>> it = poll.records((TopicPartition) kafkaTopicPartitionState2.getKafkaPartitionHandle()).iterator();
                                while (true) {
                                    if (it.hasNext()) {
                                        ConsumerRecord<byte[], byte[]> next = it.next();
                                        T deserialize = this.deserializer.deserialize(next.key(), next.value(), next.topic(), next.partition(), next.offset());
                                        if (this.deserializer.isEndOfStream(deserialize)) {
                                            this.running = false;
                                            break;
                                        }
                                        emitRecord(deserialize, kafkaTopicPartitionState2, next.offset());
                                    }
                                }
                            }
                        } catch (WakeupException e) {
                        }
                    }
                } catch (Throwable th) {
                    if (this.running) {
                        this.running = false;
                        this.errorHandler.reportError(th);
                    } else {
                        LOG.debug("Stopped ConsumerThread threw exception", th);
                    }
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        LOG.warn("Error while closing Kafka 0.9 consumer", th2);
                    }
                }
            } finally {
                try {
                    kafkaConsumer.close();
                } catch (Throwable th3) {
                    LOG.warn("Error while closing Kafka 0.9 consumer", th3);
                }
            }
        } catch (Throwable th4) {
            this.running = false;
            this.errorHandler.reportError(th4);
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
    public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition) {
        return new TopicPartition(kafkaTopicPartition.getTopic(), kafkaTopicPartition.getPartition());
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
    public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> map) throws Exception {
        KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions = subscribedPartitions();
        HashMap hashMap = new HashMap(subscribedPartitions.length);
        for (KafkaTopicPartitionState<TopicPartition> kafkaTopicPartitionState : subscribedPartitions) {
            Long l = map.get(kafkaTopicPartitionState.getKafkaTopicPartition());
            if (l != null) {
                long longValue = l.longValue() + 1;
                hashMap.put(kafkaTopicPartitionState.getKafkaPartitionHandle(), new OffsetAndMetadata(longValue));
                kafkaTopicPartitionState.setCommittedOffset(longValue);
            }
        }
        if (this.nextOffsetsToCommit.getAndSet(hashMap) != null) {
            LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.");
        }
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    public static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] kafkaTopicPartitionStateArr) {
        ArrayList arrayList = new ArrayList(kafkaTopicPartitionStateArr.length);
        for (KafkaTopicPartitionState<TopicPartition> kafkaTopicPartitionState : kafkaTopicPartitionStateArr) {
            arrayList.add(kafkaTopicPartitionState.getKafkaPartitionHandle());
        }
        return arrayList;
    }
}
