package com.github.harbby.spark.sql.kafka;

import com.github.harbby.spark.sql.kafka.model.KafkaPartitionOffset;
import com.github.harbby.spark.sql.kafka.util.PropertiesUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import kafka.api.FetchRequestBuilder;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/harbby/spark/sql/kafka/KafkaInputPartition08.class */
public class KafkaInputPartition08 implements InputPartition<InternalRow> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaInputPartition08.class);
    private final TopicAndPartition topicPartition;
    private final long startOffset;
    private final Properties kafkaParams;
    private final Broker broker;

    /* loaded from: input_file:com/github/harbby/spark/sql/kafka/KafkaInputPartition08$KafkaInputPartitionReader08.class */
    private static class KafkaInputPartitionReader08 implements ContinuousInputPartitionReader<InternalRow> {
        private final TopicAndPartition topicPartition;
        private final Broker broker;
        private final int soTimeout;
        private final int minBytes;
        private final int maxWait;
        private final int fetchSize;
        private final int bufferSize;
        private final int recreateConsumerLimit;
        private final String clientId;
        private final long startOffset;
        private SimpleConsumer consumer;
        private long currentOffset;
        private Iterator<MessageAndOffset> fetchIterator;
        private final Thread consumerFetchThread;
        private volatile IOException consumerIOException;
        private int recreateConsumerNum = 0;
        private final BlockingQueue<Iterator<MessageAndOffset>> fetchQueue = new ArrayBlockingQueue(32);
        private boolean isInitialized = false;

        public KafkaInputPartitionReader08(TopicAndPartition topicAndPartition, long j, Properties properties, Broker broker) {
            this.topicPartition = topicAndPartition;
            this.broker = broker;
            this.startOffset = j;
            this.soTimeout = PropertiesUtil.getInt(properties, "socket.timeout.ms", 30000);
            this.minBytes = PropertiesUtil.getInt(properties, "fetch.min.bytes", 1);
            this.maxWait = PropertiesUtil.getInt(properties, "fetch.wait.max.ms", 500);
            this.fetchSize = PropertiesUtil.getInt(properties, "fetch.message.max.bytes", 1048576);
            this.bufferSize = PropertiesUtil.getInt(properties, "socket.receive.buffer.bytes", 65536);
            this.recreateConsumerLimit = PropertiesUtil.getInt(properties, "sylph.spark.simple-consumer-reconnectLimit", 3);
            this.clientId = properties.getProperty("client.id", properties.getProperty("group.id", "sylph-spark-kafka-consumer-legacy-" + broker.id()));
            this.consumer = new SimpleConsumer(broker.host(), broker.port(), this.soTimeout, this.bufferSize, this.clientId);
            this.consumerFetchThread = new Thread(() -> {
                long j2 = j + 1;
                while (true) {
                    try {
                        Iterator<MessageAndOffset> fetch = fetch(j2);
                        if (fetch != null) {
                            ArrayList arrayList = new ArrayList();
                            while (fetch.hasNext()) {
                                MessageAndOffset next = fetch.next();
                                arrayList.add(next);
                                j2 = next.offset() + 1;
                            }
                            try {
                                this.fetchQueue.put(arrayList.iterator());
                            } catch (InterruptedException e) {
                                return;
                            }
                        }
                    } catch (Throwable th) {
                        this.consumerIOException = new IOException("consumer fetch failed", th);
                        return;
                    }
                }
            });
        }

        private Iterator<MessageAndOffset> fetch(long j) throws IOException {
            FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
            fetchRequestBuilder.clientId(this.clientId);
            fetchRequestBuilder.maxWait(this.maxWait);
            fetchRequestBuilder.minBytes(this.minBytes);
            fetchRequestBuilder.addFetch(this.topicPartition.topic(), this.topicPartition.partition(), j, this.fetchSize);
            try {
                FetchResponse fetch = this.consumer.fetch(fetchRequestBuilder.build());
                if (fetch == null) {
                    throw new IOException("Fetch from Kafka failed (request returned null)");
                }
                this.recreateConsumerNum = 0;
                return fetch.messageSet(this.topicPartition.topic(), this.topicPartition.partition()).iterator();
            } catch (Exception e) {
                if (!(e instanceof ClosedChannelException)) {
                    throw e;
                }
                KafkaInputPartition08.logger.error("Fetch failed!", e);
                try {
                    this.consumer.close();
                } catch (Exception e2) {
                    KafkaInputPartition08.logger.error("consumer close failed", e2);
                }
                int i = this.recreateConsumerNum;
                this.recreateConsumerNum = i + 1;
                if (i > this.recreateConsumerLimit) {
                    throw new IOException("Consumer recreate Number > " + this.recreateConsumerLimit, e);
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                }
                this.consumer = new SimpleConsumer(this.broker.host(), this.broker.port(), this.soTimeout, this.bufferSize, this.clientId);
                return fetch(j);
            }
        }

        public boolean next() throws IOException {
            if (!this.isInitialized) {
                this.consumerFetchThread.setDaemon(true);
                this.consumerFetchThread.setName("consumer_Fetch_partition_" + this.topicPartition.partition());
                this.consumerFetchThread.start();
                this.isInitialized = true;
            }
            while (true) {
                if (this.fetchIterator != null && this.fetchIterator.hasNext()) {
                    return true;
                }
                if (this.consumerIOException != null) {
                    throw this.consumerIOException;
                }
                if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) {
                    return false;
                }
                try {
                    this.fetchIterator = this.fetchQueue.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public InternalRow m2get() {
            byte[] bArr;
            MessageAndOffset next = this.fetchIterator.next();
            ByteBuffer payload = next.message().payload();
            long offset = next.offset();
            if (payload == null) {
                bArr = null;
            } else {
                bArr = new byte[payload.remaining()];
                payload.get(bArr);
            }
            byte[] bArr2 = null;
            int keySize = next.message().keySize();
            if (keySize >= 0) {
                bArr2 = new byte[keySize];
                next.message().key().get(bArr2);
            }
            this.currentOffset = offset;
            return new GenericInternalRow(new Object[]{bArr2, bArr, UTF8String.fromString(this.topicPartition.topic()), Integer.valueOf(this.topicPartition.partition()), Long.valueOf(offset)});
        }

        public void close() throws IOException {
            if (this.consumerFetchThread != null) {
                this.consumerFetchThread.interrupt();
            }
            if (this.consumer != null) {
                this.consumer.close();
            }
        }

        public PartitionOffset getOffset() {
            return new KafkaPartitionOffset(this.topicPartition, this.currentOffset);
        }
    }

    public KafkaInputPartition08(TopicAndPartition topicAndPartition, long j, Properties properties, Broker broker) {
        this.topicPartition = topicAndPartition;
        this.startOffset = j;
        this.kafkaParams = properties;
        this.broker = broker;
    }

    public InputPartitionReader<InternalRow> createPartitionReader() {
        return new KafkaInputPartitionReader08(this.topicPartition, this.startOffset, this.kafkaParams, this.broker);
    }

    public String[] preferredLocations() {
        return new String[]{this.broker.host()};
    }
}
