/*
 * Decompiled with CFR 0.152.
 */
package com.github.harbby.spark.sql.kafka;

import com.github.harbby.spark.sql.kafka.KafkaInputPartition08;
import com.github.harbby.spark.sql.kafka.KafkaOffsetCommitter;
import com.github.harbby.spark.sql.kafka.model.KafkaPartitionOffset;
import com.github.harbby.spark.sql.kafka.model.KafkaSourceOffset;
import com.github.harbby.spark.sql.kafka.model.TopicPartitionLeader;
import com.github.harbby.spark.sql.kafka.util.KafkaUtil;
import com.github.harbby.spark.sql.kafka.util.PropertiesUtil;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.common.TopicAndPartition;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;

public class KafkaDataSource08
implements DataSourceV2,
ContinuousReadSupport,
DataSourceRegister {
    private static final Logger logger = LoggerFactory.getLogger(KafkaDataSource08.class);
    private static final String dummyClientId = "sylph-spark-kafka-consumer-partition-lookup";
    private static final StructType schema = new StructType(new StructField[]{new StructField("_key", DataTypes.BinaryType, true, Metadata.empty()), new StructField("_message", DataTypes.BinaryType, true, Metadata.empty()), new StructField("_topic", DataTypes.StringType, false, Metadata.empty()), new StructField("_partition", DataTypes.IntegerType, false, Metadata.empty()), new StructField("_offset", DataTypes.LongType, false, Metadata.empty())});

    public String shortName() {
        return "kafka08";
    }

    public ContinuousReader createContinuousReader(Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
        Properties properties = new Properties();
        properties.putAll((java.util.Map<?, ?>)options.asMap());
        String[] topics = Objects.requireNonNull(properties.getProperty("topics"), "kafka_topic not setting").split(",");
        String groupId = Objects.requireNonNull(properties.getProperty("group.id"), "group.id not setting");
        Map map = (Map)Map$.MODULE$.apply(((scala.collection.mutable.Map)JavaConverters.mapAsScalaMapConverter((java.util.Map)options.asMap()).asScala()).toSeq());
        KafkaCluster kafkaCluster = new KafkaCluster(map);
        int commitInterval = PropertiesUtil.getInt(properties, "auto.commit.interval.ms", 90000);
        KafkaOffsetCommitter kafkaOffsetCommitter = new KafkaOffsetCommitter(kafkaCluster, properties.getProperty("group.id"), commitInterval);
        return new KafkaContinuousReader08(options, properties, topics, groupId, kafkaCluster, kafkaOffsetCommitter);
    }

    private static URL getHostnamePortUrl(String hostPort) {
        try {
            URL u = new URL("http://" + hostPort);
            if (u.getHost() == null) {
                throw new IllegalArgumentException("The given host:port ('" + hostPort + "') doesn't contain a valid host");
            }
            if (u.getPort() == -1) {
                throw new IllegalArgumentException("The given host:port ('" + hostPort + "') doesn't contain a valid port");
            }
            return u;
        }
        catch (MalformedURLException e) {
            throw new IllegalArgumentException("The given host:port ('" + hostPort + "') is invalid", e);
        }
    }

    public static class KafkaContinuousReader08
    implements ContinuousReader {
        private final List<TopicPartitionLeader> topicPartitionLeaders;
        private final DataSourceOptions options;
        private final Properties properties;
        private final KafkaOffsetCommitter kafkaOffsetCommitter;
        private final Set<String> topics;
        private final KafkaCluster kafkaCluster;
        private final String groupId;
        private java.util.Map<TopicAndPartition, Long> fromOffsets;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public KafkaContinuousReader08(DataSourceOptions options, Properties properties, String[] topics, String groupId, KafkaCluster kafkaCluster, KafkaOffsetCommitter kafkaOffsetCommitter) {
            this.options = options;
            this.properties = properties;
            this.kafkaCluster = kafkaCluster;
            this.topics = Arrays.stream(topics).collect(Collectors.toSet());
            this.groupId = groupId;
            int soTimeout = PropertiesUtil.getInt(properties, "socket.timeout.ms", 30000);
            int bufferSize = PropertiesUtil.getInt(properties, "socket.receive.buffer.bytes", 65536);
            String[] brokers = properties.getProperty("bootstrap.servers").split(",");
            URL url = KafkaDataSource08.getHostnamePortUrl(brokers[new Random().nextInt(brokers.length)]);
            try (SimpleConsumer consumer = new SimpleConsumer(url.getHost(), url.getPort(), soTimeout, bufferSize, KafkaDataSource08.dummyClientId);){
                this.topicPartitionLeaders = KafkaUtil.getBrokers(consumer, Arrays.stream(topics).collect(Collectors.toList()), brokers);
            }
            this.kafkaOffsetCommitter = kafkaOffsetCommitter;
            kafkaOffsetCommitter.setName("Kafka_Offset_Committer");
            kafkaOffsetCommitter.start();
        }

        public Offset mergeOffsets(PartitionOffset[] offsets) {
            java.util.Map<TopicAndPartition, Long> partitionToOffsets = Arrays.stream(offsets).map(x -> (KafkaPartitionOffset)x).collect(Collectors.toMap(k -> k.getTopicPartition(), v -> v.getOffset()));
            return new KafkaSourceOffset(partitionToOffsets);
        }

        public Offset deserializeOffset(String json) {
            return KafkaSourceOffset.format(json);
        }

        public void setStartOffset(Optional<Offset> start) {
            if (start.isPresent() && start.get() instanceof KafkaSourceOffset) {
                logger.warn("setting StartOffset {}, Will use checkpoint get startOffset", start);
                this.fromOffsets = KafkaSourceOffset.getPartitionOffsets(start.get());
            } else {
                logger.warn("setting StartOffset {}, Will use kafka cluster get startOffset", start);
                this.fromOffsets = KafkaUtil.getFromOffset(this.kafkaCluster, this.topics, this.groupId);
            }
        }

        public Offset getStartOffset() {
            return new KafkaSourceOffset(this.fromOffsets);
        }

        public void commit(Offset end) {
            java.util.Map<TopicAndPartition, Long> offsets = ((KafkaSourceOffset)end).getPartitionToOffsets();
            KafkaPartitionOffset[] partitionOffsets = (KafkaPartitionOffset[])offsets.entrySet().stream().map(x -> new KafkaPartitionOffset((TopicAndPartition)x.getKey(), (Long)x.getValue())).toArray(KafkaPartitionOffset[]::new);
            this.kafkaOffsetCommitter.addAll(partitionOffsets);
        }

        public void stop() {
            this.kafkaOffsetCommitter.close();
        }

        public StructType readSchema() {
            return schema;
        }

        public List<InputPartition<InternalRow>> planInputPartitions() {
            ArrayList<InputPartition<InternalRow>> partitions = new ArrayList<InputPartition<InternalRow>>(this.topicPartitionLeaders.size());
            for (TopicPartitionLeader topicPartitionLeader : this.topicPartitionLeaders) {
                Long formOffset = this.fromOffsets.get(topicPartitionLeader.getKtp());
                Objects.requireNonNull(formOffset, topicPartitionLeader.getKtp() + " not found formOffset");
                KafkaInputPartition08 inputPartition = new KafkaInputPartition08(topicPartitionLeader.getKtp(), formOffset, this.properties, topicPartitionLeader.getLeader());
                partitions.add(inputPartition);
            }
            return partitions;
        }
    }
}

