/*
 * Decompiled with CFR 0.152.
 */
package com.github.harbby.spark.sql.kafka;

import com.github.harbby.spark.sql.kafka.model.KafkaPartitionOffset;
import com.github.harbby.spark.sql.kafka.util.PropertiesUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaInputPartition08
implements InputPartition<InternalRow> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaInputPartition08.class);
    private final TopicAndPartition topicPartition;
    private final long startOffset;
    private final Properties kafkaParams;
    private final Broker broker;

    public KafkaInputPartition08(TopicAndPartition topicPartition, long startOffset, Properties kafkaParams, Broker broker) {
        this.topicPartition = topicPartition;
        this.startOffset = startOffset;
        this.kafkaParams = kafkaParams;
        this.broker = broker;
    }

    public InputPartitionReader<InternalRow> createPartitionReader() {
        return new KafkaInputPartitionReader08(this.topicPartition, this.startOffset, this.kafkaParams, this.broker);
    }

    public String[] preferredLocations() {
        return new String[]{this.broker.host()};
    }

    private static class KafkaInputPartitionReader08
    implements ContinuousInputPartitionReader<InternalRow> {
        private final TopicAndPartition topicPartition;
        private final Broker broker;
        private final int soTimeout;
        private final int minBytes;
        private final int maxWait;
        private final int fetchSize;
        private final int bufferSize;
        private final int recreateConsumerLimit;
        private final String clientId;
        private final long startOffset;
        private SimpleConsumer consumer;
        private long currentOffset;
        private int recreateConsumerNum = 0;
        private Iterator<MessageAndOffset> fetchIterator;
        private final BlockingQueue<Iterator<MessageAndOffset>> fetchQueue = new ArrayBlockingQueue<Iterator<MessageAndOffset>>(32);
        private final Thread consumerFetchThread;
        private volatile IOException consumerIOException;
        private boolean isInitialized = false;

        public KafkaInputPartitionReader08(TopicAndPartition topicPartition, long startOffset, Properties kafkaParams, Broker broker) {
            this.topicPartition = topicPartition;
            this.broker = broker;
            this.startOffset = startOffset;
            this.soTimeout = PropertiesUtil.getInt(kafkaParams, "socket.timeout.ms", 30000);
            this.minBytes = PropertiesUtil.getInt(kafkaParams, "fetch.min.bytes", 1);
            this.maxWait = PropertiesUtil.getInt(kafkaParams, "fetch.wait.max.ms", 500);
            this.fetchSize = PropertiesUtil.getInt(kafkaParams, "fetch.message.max.bytes", 0x100000);
            this.bufferSize = PropertiesUtil.getInt(kafkaParams, "socket.receive.buffer.bytes", 65536);
            this.recreateConsumerLimit = PropertiesUtil.getInt(kafkaParams, "sylph.spark.simple-consumer-reconnectLimit", 3);
            String groupId = kafkaParams.getProperty("group.id", "sylph-spark-kafka-consumer-legacy-" + broker.id());
            this.clientId = kafkaParams.getProperty("client.id", groupId);
            this.consumer = new SimpleConsumer(broker.host(), broker.port(), this.soTimeout, this.bufferSize, this.clientId);
            this.consumerFetchThread = new Thread(() -> {
                long nextOffset = startOffset + 1L;
                while (true) {
                    Iterator<MessageAndOffset> messageIterator = null;
                    try {
                        messageIterator = this.fetch(nextOffset);
                    }
                    catch (Throwable e) {
                        this.consumerIOException = new IOException("consumer fetch failed", e);
                        break;
                    }
                    if (messageIterator == null) continue;
                    ArrayList<MessageAndOffset> list = new ArrayList<MessageAndOffset>();
                    while (messageIterator.hasNext()) {
                        MessageAndOffset messageAndOffset = messageIterator.next();
                        list.add(messageAndOffset);
                        nextOffset = messageAndOffset.offset() + 1L;
                    }
                    try {
                        this.fetchQueue.put(list.iterator());
                    }
                    catch (InterruptedException e) {
                        break;
                    }
                }
            });
        }

        private Iterator<MessageAndOffset> fetch(long nextOffset) throws IOException {
            FetchResponse fetchResponse;
            FetchRequestBuilder frb = new FetchRequestBuilder();
            frb.clientId(this.clientId);
            frb.maxWait(this.maxWait);
            frb.minBytes(this.minBytes);
            frb.addFetch(this.topicPartition.topic(), this.topicPartition.partition(), nextOffset, this.fetchSize);
            FetchRequest fetchRequest = frb.build();
            try {
                fetchResponse = this.consumer.fetch(fetchRequest);
            }
            catch (Exception e) {
                if (e instanceof ClosedChannelException) {
                    logger.error("Fetch failed!", (Throwable)e);
                    try {
                        this.consumer.close();
                    }
                    catch (Exception e1) {
                        logger.error("consumer close failed", (Throwable)e1);
                    }
                    if (this.recreateConsumerNum++ > this.recreateConsumerLimit) {
                        throw new IOException("Consumer recreate Number > " + this.recreateConsumerLimit, e);
                    }
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException ignored) {
                        Thread.currentThread().interrupt();
                    }
                    this.consumer = new SimpleConsumer(this.broker.host(), this.broker.port(), this.soTimeout, this.bufferSize, this.clientId);
                    return this.fetch(nextOffset);
                }
                throw e;
            }
            if (fetchResponse == null) {
                throw new IOException("Fetch from Kafka failed (request returned null)");
            }
            this.recreateConsumerNum = 0;
            ByteBufferMessageSet messageSet = fetchResponse.messageSet(this.topicPartition.topic(), this.topicPartition.partition());
            return messageSet.iterator();
        }

        public boolean next() throws IOException {
            if (!this.isInitialized) {
                this.consumerFetchThread.setDaemon(true);
                this.consumerFetchThread.setName("consumer_Fetch_partition_" + this.topicPartition.partition());
                this.consumerFetchThread.start();
                this.isInitialized = true;
            }
            while (this.fetchIterator == null || !this.fetchIterator.hasNext()) {
                if (this.consumerIOException != null) {
                    throw this.consumerIOException;
                }
                if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) {
                    return false;
                }
                try {
                    this.fetchIterator = this.fetchQueue.take();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return true;
        }

        public InternalRow get() {
            byte[] valueBytes;
            MessageAndOffset msg = this.fetchIterator.next();
            ByteBuffer payload = msg.message().payload();
            long offset = msg.offset();
            if (payload == null) {
                valueBytes = null;
            } else {
                valueBytes = new byte[payload.remaining()];
                payload.get(valueBytes);
            }
            byte[] keyBytes = null;
            int keySize = msg.message().keySize();
            if (keySize >= 0) {
                ByteBuffer keyPayload = msg.message().key();
                keyBytes = new byte[keySize];
                keyPayload.get(keyBytes);
            }
            this.currentOffset = offset;
            return new GenericInternalRow(new Object[]{keyBytes, valueBytes, UTF8String.fromString((String)this.topicPartition.topic()), this.topicPartition.partition(), offset});
        }

        public void close() throws IOException {
            if (this.consumerFetchThread != null) {
                this.consumerFetchThread.interrupt();
            }
            if (this.consumer != null) {
                this.consumer.close();
            }
        }

        public PartitionOffset getOffset() {
            return new KafkaPartitionOffset(this.topicPartition, this.currentOffset);
        }
    }
}

