package io.bootique.kafka.client_0_8.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;

/* loaded from: input_file:io/bootique/kafka/client_0_8/consumer/TopicConsumer.class */
public class TopicConsumer<K, V> implements AutoCloseable {
    private ConsumerConnector connector;
    private Decoder<K> keyDecoder;
    private Decoder<V> valueDecoder;
    private String topic;
    private int numberOfThreads;

    /* loaded from: input_file:io/bootique/kafka/client_0_8/consumer/TopicConsumer$Builder.class */
    public static class Builder<K, V> {
        private TopicConsumer<K, V> consumer;
        private String configName;
        private String group;

        private Builder(Decoder<K> decoder, Decoder<V> decoder2) {
            this.consumer = new TopicConsumer<>();
            ((TopicConsumer) this.consumer).keyDecoder = (Decoder) Objects.requireNonNull(decoder);
            ((TopicConsumer) this.consumer).valueDecoder = (Decoder) Objects.requireNonNull(decoder2);
            ((TopicConsumer) this.consumer).numberOfThreads = 2;
        }

        public Builder<K, V> group(String str) {
            this.group = str;
            return this;
        }

        public Builder<K, V> configName(String str) {
            this.configName = str;
            return this;
        }

        public Builder<K, V> topic(String str) {
            ((TopicConsumer) this.consumer).topic = str;
            return this;
        }

        public Builder<K, V> threads(int i) {
            ((TopicConsumer) this.consumer).numberOfThreads = i;
            return this;
        }

        public TopicConsumer<K, V> build(ConsumerFactory consumerFactory) {
            Objects.requireNonNull(this.group);
            Objects.requireNonNull(((TopicConsumer) this.consumer).topic);
            ConsumerConfig consumerConfig = new ConsumerConfig();
            consumerConfig.setGroup(this.group);
            ((TopicConsumer) this.consumer).connector = createConnector(consumerFactory, consumerConfig);
            return this.consumer;
        }

        private ConsumerConnector createConnector(ConsumerFactory consumerFactory, ConsumerConfig consumerConfig) {
            return this.configName != null ? consumerFactory.newConsumerConnector(this.configName, consumerConfig) : consumerFactory.newConsumerConnector(consumerConfig);
        }
    }

    private TopicConsumer() {
    }

    public static <K, V> Builder<K, V> builder(Decoder<K> decoder, Decoder<V> decoder2) {
        return new Builder<>(decoder, decoder2);
    }

    public CompletableFuture<Void> consumeAll(Executor executor, BiConsumer<K, V> biConsumer) {
        ArrayList arrayList = new ArrayList();
        getStreams().forEach(kafkaStream -> {
            arrayList.add(consumeStream(kafkaStream, biConsumer, executor));
        });
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[arrayList.size()]));
        ConsumerConnector consumerConnector = this.connector;
        consumerConnector.getClass();
        return allOf.thenRun(consumerConnector::commitOffsets);
    }

    protected CompletableFuture<Object> consumeStream(KafkaStream<K, V> kafkaStream, BiConsumer<K, V> biConsumer, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            ConsumerIterator it = kafkaStream.iterator();
            while (it.hasNext()) {
                MessageAndMetadata messageAndMetadata = (MessageAndMetadata) it.next();
                biConsumer.accept(messageAndMetadata.key(), messageAndMetadata.message());
            }
            return null;
        }, executor);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.connector.commitOffsets();
        this.connector.shutdown();
    }

    protected List<KafkaStream<K, V>> getStreams() {
        return (List) Objects.requireNonNull(this.connector.createMessageStreams(Collections.singletonMap(this.topic, Integer.valueOf(this.numberOfThreads)), this.keyDecoder, this.valueDecoder).get(this.topic));
    }
}
