package com.github.harbby.spark.sql.kafka;

import com.github.harbby.spark.sql.kafka.model.KafkaPartitionOffset;
import com.github.harbby.spark.sql.kafka.model.KafkaSourceOffset;
import com.github.harbby.spark.sql.kafka.model.TopicPartitionLeader;
import com.github.harbby.spark.sql.kafka.util.KafkaUtil;
import com.github.harbby.spark.sql.kafka.util.PropertiesUtil;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.common.TopicAndPartition;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map$;

/* loaded from: input_file:com/github/harbby/spark/sql/kafka/KafkaDataSource08.class */
public class KafkaDataSource08 implements DataSourceV2, ContinuousReadSupport, DataSourceRegister {
    private static final String dummyClientId = "sylph-spark-kafka-consumer-partition-lookup";
    private static final Logger logger = LoggerFactory.getLogger(KafkaDataSource08.class);
    private static final StructType schema = new StructType(new StructField[]{new StructField("_key", DataTypes.BinaryType, true, Metadata.empty()), new StructField("_message", DataTypes.BinaryType, true, Metadata.empty()), new StructField("_topic", DataTypes.StringType, false, Metadata.empty()), new StructField("_partition", DataTypes.IntegerType, false, Metadata.empty()), new StructField("_offset", DataTypes.LongType, false, Metadata.empty())});

    /* loaded from: input_file:com/github/harbby/spark/sql/kafka/KafkaDataSource08$KafkaContinuousReader08.class */
    public static class KafkaContinuousReader08 implements ContinuousReader {
        private final DataSourceOptions options;
        private final Properties properties;
        private final Set<String> topics;
        private final KafkaCluster kafkaCluster;
        private final String groupId;
        private Offset lastCommit;
        private Map<TopicAndPartition, Long> fromOffsets;

        public KafkaContinuousReader08(DataSourceOptions dataSourceOptions, Properties properties, String[] strArr, String str, KafkaCluster kafkaCluster) {
            this.options = dataSourceOptions;
            this.properties = properties;
            this.kafkaCluster = kafkaCluster;
            this.topics = (Set) Arrays.stream(strArr).collect(Collectors.toSet());
            this.groupId = str;
        }

        private List<TopicPartitionLeader> getKafkaBrokers() {
            int i = PropertiesUtil.getInt(this.properties, "socket.timeout.ms", 30000);
            int i2 = PropertiesUtil.getInt(this.properties, "socket.receive.buffer.bytes", 65536);
            String[] split = this.properties.getProperty("bootstrap.servers").split(",");
            URL hostnamePortUrl = KafkaDataSource08.getHostnamePortUrl(split[new Random().nextInt(split.length)]);
            SimpleConsumer simpleConsumer = new SimpleConsumer(hostnamePortUrl.getHost(), hostnamePortUrl.getPort(), i, i2, KafkaDataSource08.dummyClientId);
            try {
                List<TopicPartitionLeader> brokers = KafkaUtil.getBrokers(simpleConsumer, new ArrayList(this.topics), split);
                simpleConsumer.close();
                return brokers;
            } catch (Throwable th) {
                simpleConsumer.close();
                throw th;
            }
        }

        public Offset mergeOffsets(PartitionOffset[] partitionOffsetArr) {
            return new KafkaSourceOffset((Map) Arrays.stream(partitionOffsetArr).map(partitionOffset -> {
                return (KafkaPartitionOffset) partitionOffset;
            }).collect(Collectors.toMap(kafkaPartitionOffset -> {
                return kafkaPartitionOffset.getTopicPartition();
            }, kafkaPartitionOffset2 -> {
                return Long.valueOf(kafkaPartitionOffset2.getOffset());
            })));
        }

        public Offset deserializeOffset(String str) {
            return KafkaSourceOffset.format(str);
        }

        public void setStartOffset(Optional<Offset> optional) {
            if (optional.isPresent() && (optional.get() instanceof KafkaSourceOffset)) {
                this.fromOffsets = KafkaSourceOffset.getPartitionOffsets(optional.get());
                KafkaDataSource08.logger.warn("setting StartOffset {}, Will use checkpoint get startOffset", optional);
            } else {
                this.fromOffsets = KafkaUtil.getFromOffset(this.kafkaCluster, this.topics, this.groupId);
                KafkaDataSource08.logger.info("setting StartOffset {}, Will use kafka cluster get startOffset", this.fromOffsets);
            }
        }

        public Offset getStartOffset() {
            return new KafkaSourceOffset(this.fromOffsets);
        }

        public void commit(Offset offset) {
            if (offset.equals(this.lastCommit)) {
                return;
            }
            Map<TopicAndPartition, Long> partitionToOffsets = ((KafkaSourceOffset) offset).getPartitionToOffsets();
            KafkaDataSource08.logger.info("committing offset to kafka, {}", partitionToOffsets);
            this.kafkaCluster.setConsumerOffsets(this.groupId, Map$.MODULE$.apply(((scala.collection.mutable.Map) JavaConverters.mapAsScalaMapConverter(partitionToOffsets).asScala()).toSeq()));
            this.lastCommit = offset;
        }

        public void stop() {
        }

        public StructType readSchema() {
            return KafkaDataSource08.schema;
        }

        public List<InputPartition<InternalRow>> planInputPartitions() {
            KafkaDataSource08.logger.info("getting kafka topic {}, partition info", this.topics);
            List<TopicPartitionLeader> kafkaBrokers = getKafkaBrokers();
            ArrayList arrayList = new ArrayList(kafkaBrokers.size());
            for (TopicPartitionLeader topicPartitionLeader : kafkaBrokers) {
                Long l = this.fromOffsets.get(topicPartitionLeader.getKtp());
                Objects.requireNonNull(l, topicPartitionLeader.getKtp() + " not found formOffset");
                arrayList.add(new KafkaInputPartition08(topicPartitionLeader.getKtp(), l.longValue(), this.properties, topicPartitionLeader.getLeader()));
            }
            KafkaDataSource08.logger.info("creating partitions {}", arrayList);
            return arrayList;
        }
    }

    public String shortName() {
        return "kafka08";
    }

    public ContinuousReader createContinuousReader(Optional<StructType> optional, String str, DataSourceOptions dataSourceOptions) {
        Properties properties = new Properties();
        properties.putAll(dataSourceOptions.asMap());
        String[] split = ((String) Objects.requireNonNull(properties.getProperty("topics"), "kafka_topic not setting")).split(",");
        String str2 = (String) Objects.requireNonNull(properties.getProperty("group.id"), "group.id not setting");
        KafkaCluster kafkaCluster = new KafkaCluster(Map$.MODULE$.apply(((scala.collection.mutable.Map) JavaConverters.mapAsScalaMapConverter(dataSourceOptions.asMap()).asScala()).toSeq()));
        PropertiesUtil.getInt(properties, "auto.commit.interval.ms", 90000);
        return new KafkaContinuousReader08(dataSourceOptions, properties, split, str2, kafkaCluster);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static URL getHostnamePortUrl(String str) {
        try {
            URL url = new URL("http://" + str);
            if (url.getHost() == null) {
                throw new IllegalArgumentException("The given host:port ('" + str + "') doesn't contain a valid host");
            }
            if (url.getPort() == -1) {
                throw new IllegalArgumentException("The given host:port ('" + str + "') doesn't contain a valid port");
            }
            return url;
        } catch (MalformedURLException e) {
            throw new IllegalArgumentException("The given host:port ('" + str + "') is invalid", e);
        }
    }
}
