package org.jzenith.kafka.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.reactivex.ContextScheduler;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.impl.AsyncResultSingle;
import io.vertx.reactivex.kafka.client.consumer.KafkaConsumer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import lombok.NonNull;
import org.jzenith.core.AbstractPlugin;
import org.jzenith.core.util.CompletableFutureObserver;
import org.jzenith.kafka.model.AbstractMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jzenith/kafka/consumer/KafkaConsumerPlugin.class */
public class KafkaConsumerPlugin extends AbstractPlugin {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerPlugin.class);
    private final Multimap<String, TopicHandler<AbstractMessage>> topicHandlers = HashMultimap.create();

    private KafkaConsumerPlugin() {
    }

    protected List<Module> getModules() {
        return ImmutableList.of(new KafkaConsumerModule());
    }

    public static KafkaConsumerPlugin withTopicHandler(@NonNull String str, @NonNull TopicHandler<?> topicHandler) {
        if (str == null) {
            throw new NullPointerException("topic is marked @NonNull but is null");
        }
        if (topicHandler == null) {
            throw new NullPointerException("handler is marked @NonNull but is null");
        }
        return new KafkaConsumerPlugin().andTopicHandler(str, topicHandler);
    }

    public KafkaConsumerPlugin andTopicHandler(@NonNull String str, @NonNull TopicHandler<?> topicHandler) {
        if (str == null) {
            throw new NullPointerException("topic is marked @NonNull but is null");
        }
        if (topicHandler == null) {
            throw new NullPointerException("handler is marked @NonNull but is null");
        }
        this.topicHandlers.put(str, topicHandler);
        return this;
    }

    protected CompletableFuture<String> start(@NonNull Injector injector) {
        if (injector == null) {
            throw new NullPointerException("injector is marked @NonNull but is null");
        }
        KafkaConsumer kafkaConsumer = (KafkaConsumer) injector.getInstance(Key.get(new TypeLiteral<KafkaConsumer<String, String>>() { // from class: org.jzenith.kafka.consumer.KafkaConsumerPlugin.1
        }));
        Vertx vertx = (Vertx) injector.getInstance(Vertx.class);
        CompletableFutureObserver completableFutureObserver = new CompletableFutureObserver();
        kafkaConsumer.rxSubscribe(this.topicHandlers.keySet()).subscribe(completableFutureObserver.observer());
        TopicHandlerDispatcher topicHandlerDispatcher = new TopicHandlerDispatcher((ObjectMapper) injector.getInstance(ObjectMapper.class), this.topicHandlers);
        return completableFutureObserver.thenApply(r9 -> {
            return consumerChain(vertx, topicHandlerDispatcher, kafkaConsumer);
        }).thenApply(disposable -> {
            return "Done";
        });
    }

    private Disposable consumerChain(Vertx vertx, TopicHandlerDispatcher topicHandlerDispatcher, KafkaConsumer<String, String> kafkaConsumer) {
        ContextScheduler contextScheduler = new ContextScheduler(vertx.getDelegate().createSharedWorkerExecutor("jzenith-kafka-executor"), false);
        Observable observeOn = kafkaConsumer.toObservable().observeOn(contextScheduler);
        Objects.requireNonNull(topicHandlerDispatcher);
        return observeOn.flatMapSingle(topicHandlerDispatcher::handle).flatMapSingle(dispatcherResult -> {
            return handleResult(dispatcherResult, kafkaConsumer);
        }).subscribeOn(contextScheduler).subscribe();
    }

    private Single<Map<TopicPartition, OffsetAndMetadata>> handleResult(DispatcherResult dispatcherResult, KafkaConsumer<String, String> kafkaConsumer) {
        if (dispatcherResult.isSuccessful()) {
            return new AsyncResultSingle(handler -> {
                kafkaConsumer.getDelegate().commit(dispatcherResult.getCommitData(), handler);
            });
        }
        log.error("Error handling {}, will stop consuming", dispatcherResult.getOriginalPayload());
        return kafkaConsumer.rxClose().andThen(Single.just(ImmutableMap.of()));
    }
}
