/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.qa.util.cluster;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.ZeebeClientBuilder;
import io.camunda.zeebe.client.api.response.Topology;
import io.camunda.zeebe.qa.util.cluster.TestApplication;
import io.camunda.zeebe.qa.util.cluster.TestClusterBuilder;
import io.camunda.zeebe.qa.util.cluster.TestGateway;
import io.camunda.zeebe.qa.util.cluster.TestHealthProbe;
import io.camunda.zeebe.qa.util.cluster.TestStandaloneBroker;
import io.camunda.zeebe.qa.util.cluster.TestStandaloneGateway;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.camunda.zeebe.util.CloseableSilently;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TestCluster
implements CloseableSilently {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestCluster.class);
    private final String name;
    private final Map<MemberId, TestStandaloneGateway> gateways;
    private final Map<MemberId, TestStandaloneBroker> brokers;
    private final int replicationFactor;
    private final int partitionsCount;

    public TestCluster(String name, int replicationFactor, int partitionsCount, Map<MemberId, TestStandaloneBroker> brokers, Map<MemberId, TestStandaloneGateway> gateways) {
        this.name = name;
        this.replicationFactor = replicationFactor;
        this.partitionsCount = partitionsCount;
        this.brokers = Collections.unmodifiableMap(brokers);
        this.gateways = Collections.unmodifiableMap(gateways);
    }

    public static TestClusterBuilder builder() {
        return new TestClusterBuilder();
    }

    public TestCluster start() {
        LOGGER.info("Starting cluster {} with {} brokers, {} gateways, {} partitions, and a replication factor of {}", new Object[]{this.name, this.brokers.size(), this.gateways.size(), this.partitionsCount, this.replicationFactor});
        CompletableFuture[] started = (CompletableFuture[])this.nodes().values().stream().map(node -> CompletableFuture.runAsync(node::start)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(started).join();
        return this;
    }

    public TestCluster shutdown() {
        CompletableFuture[] stopped = (CompletableFuture[])this.nodes().values().stream().map(node -> CompletableFuture.runAsync(node::stop)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(stopped).join();
        return this;
    }

    public int replicationFactor() {
        return this.replicationFactor;
    }

    public int partitionsCount() {
        return this.partitionsCount;
    }

    public String name() {
        return this.name;
    }

    public TestGateway<?> availableGateway() {
        return this.allGateways().filter(this::isReady).findFirst().orElseThrow(() -> new NoSuchElementException("No available gateway for cluster"));
    }

    public TestGateway<?> anyGateway() {
        return this.allGateways().filter(TestApplication::isStarted).findFirst().orElseThrow(() -> new NoSuchElementException("No available gateway for cluster"));
    }

    public Map<MemberId, TestStandaloneGateway> gateways() {
        return this.gateways;
    }

    public Map<MemberId, TestStandaloneBroker> brokers() {
        return this.brokers;
    }

    public Map<MemberId, TestApplication<?>> nodes() {
        HashMap nodes = new HashMap(this.brokers);
        nodes.putAll(this.gateways);
        return nodes;
    }

    public ZeebeClientBuilder newClientBuilder() {
        return ZeebeClient.newClientBuilder().usePlaintext().gatewayAddress(this.availableGateway().gatewayAddress());
    }

    public TestCluster awaitCompleteTopology() {
        this.awaitCompleteTopology(Duration.ofMinutes(this.brokers.size()));
        return this;
    }

    public TestCluster awaitCompleteTopology(Duration timeout) {
        return this.awaitCompleteTopology(this.brokers.size(), this.partitionsCount, this.replicationFactor, timeout);
    }

    public TestCluster awaitCompleteTopology(int clusterSize, int partitionsCount, int replicationFactor, Duration timeout) {
        Awaitility.await((String)"until cluster topology is complete").atMost(timeout).untilAsserted(() -> Assertions.assertThat(this.allGateways()).allSatisfy(gateway -> this.assertCompleteTopology((TestGateway<?>)gateway, clusterSize, partitionsCount, replicationFactor)));
        return this;
    }

    public TestCluster awaitHealthyTopology() {
        this.awaitHealthyTopology(Duration.ofMinutes(this.brokers.size()));
        return this;
    }

    public TestCluster awaitHealthyTopology(Duration timeout) {
        Awaitility.await((String)"until cluster topology is complete").atMost(timeout).untilAsserted(() -> Assertions.assertThat(this.allGateways()).allSatisfy(this::assertHealthyTopology));
        return this;
    }

    public TestCluster await(TestHealthProbe probe) {
        this.await(probe, Duration.ofMinutes(this.brokers.size() + this.gateways.size()));
        return this;
    }

    public TestCluster await(TestHealthProbe probe, Duration timeout) {
        Collection<TestApplication<?>> nodes = this.nodes().values();
        Awaitility.await((String)"until '%s' probe succeeds on all nodes".formatted(new Object[]{probe})).atMost(timeout).untilAsserted(() -> Assertions.assertThat((Collection)nodes).allSatisfy(node -> this.assertProbe((TestApplication<?>)node, probe)));
        return this;
    }

    public void close() {
        this.shutdown();
    }

    private boolean isReady(TestApplication<?> node) {
        if (!node.isStarted()) {
            return false;
        }
        try {
            node.probe(TestHealthProbe.READY);
        }
        catch (Exception e) {
            LOGGER.trace("Node {} is not ready", (Object)node.nodeId(), (Object)e);
            return false;
        }
        return true;
    }

    private void assertCompleteTopology(TestGateway<?> node, int clusterSize, int partitionsCount, int replicationFactor) {
        ((AbstractThrowableAssert)Assertions.assertThatCode(() -> node.probe(TestHealthProbe.READY)).as("gateway '%s' is ready", new Object[]{node.nodeId()})).doesNotThrowAnyException();
        try (ZeebeClient client = node.newClientBuilder().build();){
            TopologyAssert.assertThat((Topology)((Topology)client.newTopologyRequest().send().join())).isComplete(clusterSize, partitionsCount, replicationFactor);
        }
    }

    private void assertHealthyTopology(TestGateway<?> node) {
        ((AbstractThrowableAssert)Assertions.assertThatCode(() -> node.probe(TestHealthProbe.READY)).as("gateway '%s' is ready", new Object[]{node.nodeId()})).doesNotThrowAnyException();
        try (ZeebeClient client = node.newClientBuilder().build();){
            TopologyAssert.assertThat((Topology)((Topology)client.newTopologyRequest().send().join())).isHealthy();
        }
    }

    private void assertProbe(TestApplication<?> node, TestHealthProbe probe) {
        Assertions.assertThatCode(() -> node.probe(probe)).doesNotThrowAnyException();
    }

    private Stream<TestGateway<?>> allGateways() {
        return this.nodes().values().stream().filter(TestApplication::isGateway).filter(TestGateway.class::isInstance).map(node -> (TestGateway)node);
    }
}

