package org.voltdb.stream.api.kafka;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltdb.stream.api.extension.VoltStreamSourceConfigurator;
import org.voltdb.stream.execution.Configuration;
import org.voltdb.stream.execution.Property;

/* loaded from: input_file:org/voltdb/stream/api/kafka/KafkaStreamSourceConfigurator.class */
public class KafkaStreamSourceConfigurator<K, V> implements VoltStreamSourceConfigurator<KafkaRequest<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger("KAFKA");
    private final String DEFAULT_CONFIG_PATH = "kafka.source";
    private final Class<? extends Deserializer<K>> keyDeserializer;
    private final Class<? extends Deserializer<V>> valueDeserializer;
    private final Properties properties;
    private String groupId;
    private Set<String> topicNames;
    private String bootstrapServers;
    private KafkaStartingOffset startingOffset;
    private Duration pollTimeout;
    private Duration maxCommitTimeout;
    private int macCommitRetries;
    private KafkaSourceExceptionHandler exceptionHandler;

    public static KafkaStreamSourceConfigurator<ByteBuffer, ByteBuffer> aConsumer() {
        return new KafkaStreamSourceConfigurator<>(ByteBufferDeserializer.class, ByteBufferDeserializer.class);
    }

    private static <KEY, VALUE> KafkaStreamSourceConfigurator<KEY, VALUE> overrideKey(KafkaStreamSourceConfigurator<?, VALUE> kafkaStreamSourceConfigurator, Class<? extends Deserializer<KEY>> cls) {
        return new KafkaStreamSourceConfigurator<>(cls, ((KafkaStreamSourceConfigurator) kafkaStreamSourceConfigurator).valueDeserializer, kafkaStreamSourceConfigurator);
    }

    private static <KEY, VALUE> KafkaStreamSourceConfigurator<KEY, VALUE> overrideValue(KafkaStreamSourceConfigurator<KEY, ?> kafkaStreamSourceConfigurator, Class<? extends Deserializer<VALUE>> cls) {
        return new KafkaStreamSourceConfigurator<>(((KafkaStreamSourceConfigurator) kafkaStreamSourceConfigurator).keyDeserializer, cls, kafkaStreamSourceConfigurator);
    }

    private static <KEY, VALUE> KafkaStreamSourceConfigurator<KEY, VALUE> overrideValue(KafkaStreamSourceConfigurator<KEY, ?> kafkaStreamSourceConfigurator, Class<? extends Deserializer<?>> cls, Class<VALUE> cls2) {
        return new KafkaStreamSourceConfigurator<>(((KafkaStreamSourceConfigurator) kafkaStreamSourceConfigurator).keyDeserializer, cls, cls2, kafkaStreamSourceConfigurator);
    }

    private KafkaStreamSourceConfigurator(Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<V>> cls2) {
        this.DEFAULT_CONFIG_PATH = "kafka.source";
        this.properties = new Properties();
        this.startingOffset = KafkaStartingOffset.EARLIEST;
        this.pollTimeout = Duration.ofMillis(250L);
        this.maxCommitTimeout = Duration.ofSeconds(10L);
        this.macCommitRetries = 3;
        this.exceptionHandler = (kafkaConsumer, executionContext, exc) -> {
            LOG.warn("Kafka source caught exception", exc);
        };
        this.keyDeserializer = cls;
        this.valueDeserializer = cls2;
        autoConfigureBuilder();
    }

    private KafkaStreamSourceConfigurator(Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<V>> cls2, KafkaStreamSourceConfigurator<?, ?> kafkaStreamSourceConfigurator) {
        this.DEFAULT_CONFIG_PATH = "kafka.source";
        this.properties = new Properties();
        this.startingOffset = KafkaStartingOffset.EARLIEST;
        this.pollTimeout = Duration.ofMillis(250L);
        this.maxCommitTimeout = Duration.ofSeconds(10L);
        this.macCommitRetries = 3;
        this.exceptionHandler = (kafkaConsumer, executionContext, exc) -> {
            LOG.warn("Kafka source caught exception", exc);
        };
        this.properties.putAll(kafkaStreamSourceConfigurator.getProperties());
        this.groupId = kafkaStreamSourceConfigurator.groupId;
        this.topicNames = kafkaStreamSourceConfigurator.topicNames;
        this.bootstrapServers = kafkaStreamSourceConfigurator.bootstrapServers;
        this.startingOffset = kafkaStreamSourceConfigurator.startingOffset;
        this.keyDeserializer = cls;
        this.valueDeserializer = cls2;
        this.pollTimeout = kafkaStreamSourceConfigurator.pollTimeout;
    }

    private KafkaStreamSourceConfigurator(Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<?>> cls2, Class<V> cls3, KafkaStreamSourceConfigurator<?, ?> kafkaStreamSourceConfigurator) {
        this(cls, cls2, kafkaStreamSourceConfigurator);
    }

    public KafkaStreamSourceConfigurator<K, V> withGroupId(String str) {
        this.groupId = Property.extractSafe(str);
        return this;
    }

    public KafkaStreamSourceConfigurator<K, V> withTopicNames(String str) {
        this.topicNames = Property.extractSafeList(str);
        return this;
    }

    public KafkaStreamSourceConfigurator<K, V> withSchemaRegistryUrl(String str) {
        return withProperty("schema.registry.url", Property.extractSafe(str));
    }

    public KafkaStreamSourceConfigurator<K, V> withPollTimeout(Duration duration) {
        this.pollTimeout = duration;
        return this;
    }

    public KafkaStreamSourceConfigurator<K, V> withMaxCommitRetries(int i) {
        this.macCommitRetries = i;
        return this;
    }

    public KafkaStreamSourceConfigurator<K, V> withMaxCommitTimeout(Duration duration) {
        this.maxCommitTimeout = duration;
        return this;
    }

    public KafkaStreamSourceConfigurator<K, V> withProperty(String str, String str2) {
        this.properties.setProperty(str, str2);
        return this;
    }

    public KafkaStreamSourceConfigurator<K, V> withProperty(String str, int i) {
        return withProperty(str, String.valueOf(i));
    }

    public KafkaStreamSourceConfigurator<K, V> withProperty(String str, long j) {
        return withProperty(str, String.valueOf(j));
    }

    public KafkaStreamSourceConfigurator<K, V> withProperty(String str, boolean z) {
        return withProperty(str, String.valueOf(z));
    }

    public KafkaStreamSourceConfigurator<K, V> withBootstrapServers(String str) {
        this.bootstrapServers = Property.extractSafe(str);
        return this;
    }

    public KafkaStreamSourceConfigurator<K, V> withSSL(KafkaStreamSslConfiguration kafkaStreamSslConfiguration) {
        this.properties.putAll(kafkaStreamSslConfiguration.getProperties());
        return this;
    }

    public <KEY> KafkaStreamSourceConfigurator<KEY, V> withKeyDeserializer(Class<? extends Deserializer<KEY>> cls) {
        return overrideKey(this, cls);
    }

    public <VALUE> KafkaStreamSourceConfigurator<K, VALUE> withValueDeserializer(Class<? extends Deserializer<VALUE>> cls) {
        return overrideValue(this, cls);
    }

    public <VALUE> KafkaStreamSourceConfigurator<K, VALUE> withValueDeserializer(Class<? extends Deserializer<?>> cls, Class<VALUE> cls2) {
        return overrideValue(this, cls, cls2);
    }

    public KafkaStreamSourceConfigurator<K, V> withStartingOffset(KafkaStartingOffset kafkaStartingOffset) {
        this.startingOffset = kafkaStartingOffset;
        return this;
    }

    public KafkaStreamSourceConfigurator<K, V> withExceptionHandler(KafkaSourceExceptionHandler kafkaSourceExceptionHandler) {
        this.exceptionHandler = kafkaSourceExceptionHandler;
        return this;
    }

    public String getGroupId() {
        return this.groupId;
    }

    Set<String> getTopicNames() {
        return this.topicNames;
    }

    String getBootstrapServers() {
        return this.bootstrapServers;
    }

    String getKeyDeserializer() {
        return this.keyDeserializer.getName();
    }

    String getValueDeserializer() {
        return this.valueDeserializer.getName();
    }

    KafkaStartingOffset getStartingOffset() {
        return this.startingOffset;
    }

    Properties getProperties() {
        return this.properties;
    }

    Duration getPollTimeout() {
        return this.pollTimeout;
    }

    public int getMaxCommitRetries() {
        return this.macCommitRetries;
    }

    public Duration getMaxCommitTimeout() {
        return this.maxCommitTimeout;
    }

    public KafkaSourceExceptionHandler getExceptionHandler() {
        return this.exceptionHandler;
    }

    public KafkaStreamSourceConfigurator<K, V> configure(String str) {
        if (!getConfiguration().findByPath(str, new String[0]).hasValue()) {
            throw new IllegalArgumentException("No configuration found for path: " + str + ", check your helm configuration");
        }
        configureBuilder(str);
        return this;
    }

    private void autoConfigureBuilder() {
        if (getConfiguration().findByPath("kafka.source", new String[0]).hasValue()) {
            configureBuilder("kafka.source");
        }
    }

    private void configureBuilder(String str) {
        Configuration configuration = getConfiguration();
        this.groupId = configuration.findByPath(str, new String[]{"groupId"}).asString();
        this.topicNames = (Set) Arrays.stream(configuration.findByPath(str, new String[]{"topicNames"}).asString().split(",")).collect(Collectors.toSet());
        this.bootstrapServers = configuration.findByPath(str, new String[]{"bootstrapServers"}).asString();
        Configuration.ConfigurationPart findByPath = configuration.findByPath(str, new String[]{"startingOffset"});
        if (findByPath.hasValue()) {
            this.startingOffset = KafkaStartingOffset.valueOf(findByPath.asString());
        }
        Configuration.ConfigurationPart findByPath2 = configuration.findByPath(str, new String[]{"pollTimeout"});
        if (findByPath2.hasValue()) {
            this.pollTimeout = Duration.parse(findByPath2.asString());
        }
        Configuration.ConfigurationPart findByPath3 = configuration.findByPath(str, new String[]{"maxCommitTimeout"});
        if (findByPath3.hasValue()) {
            this.maxCommitTimeout = Duration.parse(findByPath3.asString());
        }
        Configuration.ConfigurationPart findByPath4 = configuration.findByPath(str, new String[]{"maxCommitRetries"});
        if (findByPath4.hasValue()) {
            this.macCommitRetries = findByPath4.asInt();
        }
        Configuration.ConfigurationPart findByPath5 = configuration.findByPath(str, new String[]{"schemaRegistry"});
        if (findByPath5.hasValue()) {
            withSchemaRegistryUrl(findByPath5.asString());
        }
        Configuration.ConfigurationPart findByPath6 = configuration.findByPath(str, new String[]{"properties"});
        if (findByPath6.hasValue()) {
            this.properties.putAll(findByPath6.asMap(String.class, String.class));
        }
    }
}
