/*
 * Decompiled with CFR 0.152.
 */
package io.kroxylicious.testing.kafka.common;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Utils {
    private static final Logger log = LoggerFactory.getLogger(Utils.class);
    private static final String CONSISTENCY_TEST = "__org_kroxylicious_testing_consistencyTest";

    private Utils() {
    }

    public static void awaitReassignmentOfKafkaInternalTopicsIfNecessary(Map<String, Object> connectionConfig, int fromNodeId, int toNodeId, int timeout, TimeUnit timeUnit) {
        List<String> kafkaInternalTopics = List.of("__consumer_offsets", "__transaction_state", "__cluster_metadata");
        try (Admin admin = Admin.create(connectionConfig);){
            Utils.awaitCondition(timeout, timeUnit).until(() -> {
                Map<String, TopicDescription> topicDescriptions = Utils.describeKnownTopics(kafkaInternalTopics, admin);
                HashMap movements = new HashMap();
                Optional<NewPartitionReassignment> toNodeReassignment = Optional.of(new NewPartitionReassignment(List.of(Integer.valueOf(toNodeId))));
                topicDescriptions.forEach((name, description) -> {
                    List<TopicPartitionInfo> toMove = description.partitions().stream().filter(p -> p.replicas().stream().anyMatch(n -> n.id() == fromNodeId) && p.replicas().size() < 2).toList();
                    toMove.forEach(tpi -> movements.put(new TopicPartition(name, tpi.partition()), toNodeReassignment));
                });
                if (movements.isEmpty()) {
                    log.debug("No kafka internal topic partitions need re-assigning from node {}", (Object)fromNodeId);
                    return true;
                }
                log.debug("Kafka internal topic partitions to re-assign: {}", movements);
                admin.alterPartitionReassignments(movements).all().get();
                return true;
            });
            Utils.awaitCondition(timeout, timeUnit).until(() -> {
                Map ongoingReassignments = (Map)admin.listPartitionReassignments().reassignments().get();
                Set ongoingKafkaInternalReassignments = ongoingReassignments.keySet().stream().filter(o -> kafkaInternalTopics.contains(o.topic())).collect(Collectors.toSet());
                if (!ongoingKafkaInternalReassignments.isEmpty()) {
                    log.debug("Kafka internal topic partitions re-assigment in-progress: {}", ongoingKafkaInternalReassignments);
                    return false;
                }
                log.debug("Kafka internal topic partitions re-assigment complete.");
                return true;
            });
        }
    }

    public static void awaitExpectedBrokerCountInClusterViaTopic(Map<String, Object> connectionConfig, int timeout, TimeUnit timeUnit, Integer expectedBrokerCount) {
        try (Admin admin = Admin.create(connectionConfig);){
            log.debug("Creating topic: {} via {}", (Object)CONSISTENCY_TEST, connectionConfig.get("bootstrap.servers"));
            CompletionStage<Void> createTopicStage = Utils.createTopic(expectedBrokerCount, admin);
            log.debug("Waiting for {} to be replicated to {} brokers", (Object)CONSISTENCY_TEST, (Object)expectedBrokerCount);
            Utils.awaitCondition(timeout, timeUnit).failFast(() -> {
                CompletableFuture f = createTopicStage.toCompletableFuture();
                if (f.isCompletedExceptionally()) {
                    f.get();
                }
            }).until(() -> {
                log.debug("Calling describe topic");
                CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>();
                admin.describeTopics(Set.of(CONSISTENCY_TEST)).allTopicNames().whenComplete((topicDescriptions, throwable) -> {
                    if (throwable != null) {
                        if (throwable instanceof CompletionException && throwable.getCause() instanceof UnknownTopicOrPartitionException) {
                            log.debug("Cluster quorum test topic ({}) doesn't exist yet", (Object)CONSISTENCY_TEST);
                        } else {
                            log.warn("Unexpected failure describing topic: {} due to {}", new Object[]{CONSISTENCY_TEST, throwable.getMessage(), throwable});
                        }
                        promise.complete(false);
                    } else {
                        log.debug("Current topicDescriptions: {}", topicDescriptions);
                        Utils.checkReplicaDistribution(expectedBrokerCount, promise, topicDescriptions);
                    }
                }).get(1L, TimeUnit.SECONDS);
                Boolean isQuorate = promise.getNow(false);
                if (isQuorate.booleanValue()) {
                    Utils.deleteTopic(admin);
                }
                return isQuorate;
            });
        }
    }

    private static void deleteTopic(Admin admin) throws InterruptedException, TimeoutException, ExecutionException {
        try {
            admin.deleteTopics(Set.of(CONSISTENCY_TEST)).all().get(10L, TimeUnit.SECONDS);
        }
        catch (ExecutionException ee) {
            Throwable throwable = ee.getCause();
            if (throwable instanceof TopicDeletionDisabledException) {
                TopicDeletionDisabledException cause = (TopicDeletionDisabledException)throwable;
                log.warn("Failed to delete {}. Caught: {} ", new Object[]{CONSISTENCY_TEST, cause.getMessage(), cause});
            }
            throw ee;
        }
        catch (TopicDeletionDisabledException ke) {
            log.warn("caught {} deleting {}", new Object[]{ke.getMessage(), CONSISTENCY_TEST, ke});
        }
    }

    private static void checkReplicaDistribution(Integer expectedBrokerCount, CompletableFuture<Boolean> promise, Map<String, TopicDescription> topicDescriptions) {
        TopicDescription topicDescription = topicDescriptions.get(CONSISTENCY_TEST);
        if (topicDescription != null) {
            long distinctReplicas = topicDescription.partitions().stream().map(TopicPartitionInfo::replicas).flatMap(Collection::stream).filter(Objects::nonNull).distinct().count();
            if (distinctReplicas == (long)expectedBrokerCount.intValue()) {
                log.debug("Expected number of replicas found.");
                promise.complete(true);
                return;
            }
            log.debug("Unexpected number of replicas found expected: {} got: {}", (Object)expectedBrokerCount, (Object)distinctReplicas);
        }
        promise.complete(false);
    }

    private static CompletionStage<Void> createTopic(Integer expectedBrokerCount, Admin admin) {
        return admin.createTopics(Set.of(new NewTopic(CONSISTENCY_TEST, 1, expectedBrokerCount.shortValue()))).all().toCompletionStage().thenRun(() -> log.debug("Create future for topic {} completed.", (Object)CONSISTENCY_TEST)).exceptionallyComposeAsync(throwable -> {
            log.warn("Failed to create topic: {} due to {}", (Object)CONSISTENCY_TEST, (Object)throwable.getMessage());
            if (Utils.isRetryable(throwable)) {
                return Utils.createTopic(expectedBrokerCount, admin);
            }
            return CompletableFuture.failedStage(new RuntimeException("Failed to create topic: __org_kroxylicious_testing_consistencyTest", (Throwable)throwable));
        }, CompletableFuture.delayedExecutor(100L, TimeUnit.MILLISECONDS));
    }

    private static boolean isRetryable(Throwable potentiallyWrapped) {
        Throwable throwable = potentiallyWrapped instanceof CompletionException && potentiallyWrapped.getCause() != null ? potentiallyWrapped.getCause() : potentiallyWrapped;
        return throwable instanceof RetriableException || throwable instanceof InvalidReplicationFactorException || throwable instanceof TopicExistsException && throwable.getMessage().contains("is marked for deletion");
    }

    public static ConditionFactory awaitCondition(int timeout, TimeUnit timeUnit) {
        return Awaitility.await().pollDelay(Duration.ZERO).pollInterval(500L, TimeUnit.MILLISECONDS).atMost((long)timeout, timeUnit).ignoreExceptions();
    }

    private static Map<String, TopicDescription> describeKnownTopics(List<String> topics, Admin admin) throws Exception {
        HashMap<String, TopicDescription> known = new HashMap<String, TopicDescription>();
        for (String name : topics) {
            try {
                known.putAll((Map)admin.describeTopics(List.of(name)).allTopicNames().get());
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof UnknownTopicOrPartitionException) continue;
                Throwable throwable = e.getCause();
                if (throwable instanceof RuntimeException) {
                    RuntimeException re = (RuntimeException)throwable;
                    throw re;
                }
                throw new RuntimeException(e.getCause());
            }
        }
        return known;
    }
}

