package org.reactivecommons.async.kafka.communications.topology;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.reactivecommons.async.kafka.communications.exceptions.TopicNotFoundException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/reactivecommons/async/kafka/communications/topology/TopologyCreator.class */
public class TopologyCreator {
    public static final int TIMEOUT_MS = 60000;
    private final AdminClient adminClient;
    private final KafkaCustomizations customizations;
    private final Map<String, Boolean> existingTopics = getTopics();
    private final boolean checkTopics;

    public TopologyCreator(AdminClient adminClient, KafkaCustomizations kafkaCustomizations, boolean z) {
        this.adminClient = adminClient;
        this.customizations = kafkaCustomizations;
        this.checkTopics = z;
    }

    public Map<String, Boolean> getTopics() {
        return !this.checkTopics ? Map.of() : (Map) ((Set) this.adminClient.listTopics(new ListTopicsOptions().timeoutMs(Integer.valueOf(TIMEOUT_MS))).names().get()).stream().collect(Collectors.toConcurrentMap(str -> {
            return str;
        }, str2 -> {
            return true;
        }));
    }

    public Mono<Void> createTopics(List<String> list) {
        return Flux.fromIterable(list).map(str -> {
            return this.customizations.getTopics().containsKey(str) ? this.customizations.getTopics().get(str) : TopicCustomization.builder().partitions(-1).replicationFactor((short) -1).topic(str).build();
        }).map(this::toNewTopic).flatMap(this::createTopic).doOnNext(newTopic -> {
            this.existingTopics.put(newTopic.name(), true);
        }).then();
    }

    protected Mono<NewTopic> createTopic(NewTopic newTopic) {
        return Mono.fromFuture(this.adminClient.createTopics(List.of(newTopic)).all().toCompletionStage().toCompletableFuture()).thenReturn(newTopic).onErrorResume(TopicExistsException.class, topicExistsException -> {
            return Mono.just(newTopic);
        });
    }

    protected NewTopic toNewTopic(TopicCustomization topicCustomization) {
        NewTopic newTopic = new NewTopic(topicCustomization.getTopic(), topicCustomization.getPartitions(), topicCustomization.getReplicationFactor());
        return topicCustomization.getConfig() != null ? newTopic.configs(topicCustomization.getConfig()) : newTopic;
    }

    public void checkTopic(String str) {
        if (this.checkTopics && !this.existingTopics.containsKey(str)) {
            throw new TopicNotFoundException("Topic not found: " + str + ". Please create it before send a message.");
        }
    }
}
