/*
 * Decompiled with CFR 0.152.
 */
package io.bootique.kafka.client_0_8.consumer;

import io.bootique.kafka.client_0_8.consumer.ConsumerConfig;
import io.bootique.kafka.client_0_8.consumer.ConsumerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;

public class TopicConsumer<K, V>
implements AutoCloseable {
    private ConsumerConnector connector;
    private Decoder<K> keyDecoder;
    private Decoder<V> valueDecoder;
    private String topic;
    private int numberOfThreads;

    private TopicConsumer() {
    }

    public static <K, V> Builder<K, V> builder(Decoder<K> keyDecoder, Decoder<V> valueDecoder) {
        return new Builder(keyDecoder, valueDecoder);
    }

    public CompletableFuture<Void> consumeAll(Executor executor, BiConsumer<K, V> handler) {
        ArrayList results = new ArrayList();
        this.getStreams().forEach(s -> results.add(this.consumeStream((KafkaStream<K, V>)s, handler, executor)));
        CompletableFuture[] resultsArray = results.toArray(new CompletableFuture[results.size()]);
        return CompletableFuture.allOf(resultsArray).thenRun(() -> ((ConsumerConnector)this.connector).commitOffsets());
    }

    protected CompletableFuture<Object> consumeStream(KafkaStream<K, V> s, BiConsumer<K, V> handler, Executor executor) {
        Supplier<Object> task = () -> {
            for (MessageAndMetadata m : s) {
                handler.accept(m.key(), m.message());
            }
            return null;
        };
        return CompletableFuture.supplyAsync(task, executor);
    }

    @Override
    public void close() {
        this.connector.commitOffsets();
        this.connector.shutdown();
    }

    protected List<KafkaStream<K, V>> getStreams() {
        Map<String, Integer> topicCountMap = Collections.singletonMap(this.topic, this.numberOfThreads);
        Map map = this.connector.createMessageStreams(topicCountMap, this.keyDecoder, this.valueDecoder);
        return (List)Objects.requireNonNull(map.get(this.topic));
    }

    public static class Builder<K, V> {
        private TopicConsumer<K, V> consumer = new TopicConsumer();
        private String configName;
        private String group;

        private Builder(Decoder<K> keyDecoder, Decoder<V> valueDecoder) {
            ((TopicConsumer)this.consumer).keyDecoder = Objects.requireNonNull(keyDecoder);
            ((TopicConsumer)this.consumer).valueDecoder = Objects.requireNonNull(valueDecoder);
            ((TopicConsumer)this.consumer).numberOfThreads = 2;
        }

        public Builder<K, V> group(String group) {
            this.group = group;
            return this;
        }

        public Builder<K, V> configName(String configName) {
            this.configName = configName;
            return this;
        }

        public Builder<K, V> topic(String topic) {
            ((TopicConsumer)this.consumer).topic = topic;
            return this;
        }

        public Builder<K, V> threads(int threads) {
            ((TopicConsumer)this.consumer).numberOfThreads = threads;
            return this;
        }

        public TopicConsumer<K, V> build(ConsumerFactory factory) {
            Objects.requireNonNull(this.group);
            Objects.requireNonNull(((TopicConsumer)this.consumer).topic);
            ConsumerConfig configOverride = new ConsumerConfig();
            configOverride.setGroup(this.group);
            ((TopicConsumer)this.consumer).connector = this.createConnector(factory, configOverride);
            return this.consumer;
        }

        private ConsumerConnector createConnector(ConsumerFactory factory, ConsumerConfig configOverride) {
            return this.configName != null ? factory.newConsumerConnector(this.configName, configOverride) : factory.newConsumerConnector(configOverride);
        }
    }
}

