/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.io.kafka;

import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.commitlog.Position;
import cz.o2.proxima.direct.io.kafka.Utils;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serde;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerFactory<K, V> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerFactory.class);
    private final URI uri;
    @Nullable
    private final String topic;
    @Nullable
    private final String topicPattern;
    private final Properties props;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;

    KafkaConsumerFactory(URI uri, Properties props, Serde<K> keySerde, Serde<V> valueSerde) {
        this.uri = uri;
        this.props = props;
        this.topic = Utils.topic(uri);
        this.topicPattern = Utils.topicPattern(uri);
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    public KafkaConsumer<K, V> create(String name, Position position, @Nullable ConsumerRebalanceListener listener) {
        log.debug("Creating named consumer with name {} and listener {}", (Object)name, (Object)listener);
        Properties cloned = this.clone(this.props);
        cloned.put("bootstrap.servers", this.uri.getAuthority());
        cloned.put("group.id", name);
        cloned.put("auto.commit.interval.ms", (Object)0);
        cloned.put("enable.auto.commit", (Object)false);
        KafkaConsumerFactory.updateAutoOffsetReset(position, cloned, false);
        cloned.put("key.deserializer", this.keySerde.deserializer().getClass());
        cloned.put("value.deserializer", this.valueSerde.deserializer().getClass());
        KafkaConsumer ret = new KafkaConsumer(cloned);
        if (this.topic == null) {
            Pattern pattern = Pattern.compile(Objects.requireNonNull(this.topicPattern));
            if (listener == null) {
                ret.subscribe(pattern);
            } else {
                ret.subscribe(pattern, listener);
            }
        } else if (listener == null) {
            ret.subscribe(Collections.singletonList(this.topic));
        } else {
            ret.subscribe(Collections.singletonList(this.topic), listener);
        }
        return ret;
    }

    public KafkaConsumer<K, V> create(String name) {
        return this.create(name, Position.NEWEST, null);
    }

    public KafkaConsumer<K, V> create(Position position, Collection<Partition> partitions) {
        log.debug("Creating unnamed consumer for partitions {}", partitions);
        Properties cloned = this.clone(this.props);
        cloned.put("bootstrap.servers", this.uri.getAuthority());
        cloned.put("enable.auto.commit", (Object)false);
        cloned.put("key.deserializer", this.keySerde.deserializer().getClass());
        cloned.put("value.deserializer", this.valueSerde.deserializer().getClass());
        KafkaConsumerFactory.updateAutoOffsetReset(position, cloned, true);
        KafkaConsumer ret = new KafkaConsumer(cloned);
        List<TopicPartition> topicPartitions = partitions.stream().map(p -> new TopicPartition(this.topic, p.getId())).collect(Collectors.toList());
        ret.assign(topicPartitions);
        return ret;
    }

    public KafkaConsumer<K, V> create() {
        log.debug("Creating unnamed consumer for all partitions of topic {}", (Object)this.topic);
        Properties cloned = this.clone(this.props);
        cloned.put("bootstrap.servers", this.uri.getAuthority());
        cloned.put("enable.auto.commit", (Object)false);
        cloned.put("key.deserializer", this.keySerde.deserializer().getClass());
        cloned.put("value.deserializer", this.valueSerde.deserializer().getClass());
        KafkaConsumer ret = new KafkaConsumer(cloned);
        List<TopicPartition> partitions = ret.partitionsFor(this.topic).stream().map(p -> new TopicPartition(this.topic, p.partition())).collect(Collectors.toList());
        ret.assign(partitions);
        return ret;
    }

    private Properties clone(Properties props) {
        Properties ret = new Properties();
        props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)ret::put));
        return ret;
    }

    @VisibleForTesting
    static void updateAutoOffsetReset(Position position, Properties props, boolean setToNoneOnCurrent) {
        if (!props.containsKey("auto.offset.reset")) {
            if (position == Position.OLDEST) {
                props.put("auto.offset.reset", "earliest");
            } else if (setToNoneOnCurrent && position == Position.CURRENT) {
                props.put("auto.offset.reset", "none");
            } else {
                props.put("auto.offset.reset", "latest");
            }
        }
    }
}

