package io.camunda.zeebe.qa.util.cluster;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.ZeebeClientBuilder;
import io.camunda.zeebe.client.api.response.Topology;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.camunda.zeebe.util.CloseableSilently;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/qa/util/cluster/TestCluster.class */
public final class TestCluster implements CloseableSilently {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestCluster.class);
    private final String name;
    private final Map<MemberId, TestStandaloneGateway> gateways;
    private final Map<MemberId, TestStandaloneBroker> brokers;
    private final int replicationFactor;
    private final int partitionsCount;

    public TestCluster(String str, int i, int i2, Map<MemberId, TestStandaloneBroker> map, Map<MemberId, TestStandaloneGateway> map2) {
        this.name = str;
        this.replicationFactor = i;
        this.partitionsCount = i2;
        this.brokers = Collections.unmodifiableMap(map);
        this.gateways = Collections.unmodifiableMap(map2);
    }

    public static TestClusterBuilder builder() {
        return new TestClusterBuilder();
    }

    public TestCluster start() {
        LOGGER.info("Starting cluster {} with {} brokers, {} gateways, {} partitions, and a replication factor of {}", new Object[]{this.name, Integer.valueOf(this.brokers.size()), Integer.valueOf(this.gateways.size()), Integer.valueOf(this.partitionsCount), Integer.valueOf(this.replicationFactor)});
        CompletableFuture.allOf((CompletableFuture[]) nodes().values().stream().map(testApplication -> {
            Objects.requireNonNull(testApplication);
            return CompletableFuture.runAsync(testApplication::start);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
        return this;
    }

    public TestCluster shutdown() {
        CompletableFuture.allOf((CompletableFuture[]) nodes().values().stream().map(testApplication -> {
            Objects.requireNonNull(testApplication);
            return CompletableFuture.runAsync(testApplication::stop);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
        return this;
    }

    public int replicationFactor() {
        return this.replicationFactor;
    }

    public int partitionsCount() {
        return this.partitionsCount;
    }

    public String name() {
        return this.name;
    }

    public TestGateway<?> availableGateway() {
        return allGateways().filter((v1) -> {
            return isReady(v1);
        }).findFirst().orElseThrow(() -> {
            return new NoSuchElementException("No available gateway for cluster");
        });
    }

    public Map<MemberId, TestStandaloneGateway> gateways() {
        return this.gateways;
    }

    public Map<MemberId, TestStandaloneBroker> brokers() {
        return this.brokers;
    }

    public Map<MemberId, TestApplication<?>> nodes() {
        HashMap hashMap = new HashMap(this.brokers);
        hashMap.putAll(this.gateways);
        return hashMap;
    }

    public ZeebeClientBuilder newClientBuilder() {
        return ZeebeClient.newClientBuilder().usePlaintext().gatewayAddress(availableGateway().gatewayAddress());
    }

    public TestCluster awaitCompleteTopology() {
        awaitCompleteTopology(Duration.ofMinutes(this.brokers.size()));
        return this;
    }

    public TestCluster awaitCompleteTopology(Duration duration) {
        return awaitCompleteTopology(this.brokers.size(), this.partitionsCount, this.replicationFactor, duration);
    }

    public TestCluster awaitCompleteTopology(int i, int i2, int i3, Duration duration) {
        Awaitility.await("until cluster topology is complete").atMost(duration).untilAsserted(() -> {
            Assertions.assertThat(allGateways()).allSatisfy(testGateway -> {
                assertCompleteTopology(testGateway, i, i2, i3);
            });
        });
        return this;
    }

    public TestCluster awaitHealthyTopology() {
        awaitHealthyTopology(Duration.ofMinutes(this.brokers.size()));
        return this;
    }

    public TestCluster awaitHealthyTopology(Duration duration) {
        Awaitility.await("until cluster topology is complete").atMost(duration).untilAsserted(() -> {
            Assertions.assertThat(allGateways()).allSatisfy(this::assertHealthyTopology);
        });
        return this;
    }

    public TestCluster await(TestHealthProbe testHealthProbe) {
        await(testHealthProbe, Duration.ofMinutes(this.brokers.size() + this.gateways.size()));
        return this;
    }

    public TestCluster await(TestHealthProbe testHealthProbe, Duration duration) {
        Collection<TestApplication<?>> values = nodes().values();
        Awaitility.await("until '%s' probe succeeds on all nodes".formatted(testHealthProbe)).atMost(duration).untilAsserted(() -> {
            Assertions.assertThat(values).allSatisfy(testApplication -> {
                assertProbe(testApplication, testHealthProbe);
            });
        });
        return this;
    }

    public void close() {
        shutdown();
    }

    private boolean isReady(TestApplication<?> testApplication) {
        if (!testApplication.isStarted()) {
            return false;
        }
        try {
            testApplication.probe(TestHealthProbe.READY);
            return true;
        } catch (Exception e) {
            LOGGER.trace("Node {} is not ready", testApplication.nodeId(), e);
            return false;
        }
    }

    private void assertCompleteTopology(TestGateway<?> testGateway, int i, int i2, int i3) {
        Assertions.assertThatCode(() -> {
            testGateway.probe(TestHealthProbe.READY);
        }).as("gateway '%s' is ready", new Object[]{testGateway.nodeId()}).doesNotThrowAnyException();
        ZeebeClient build = testGateway.newClientBuilder().build();
        try {
            TopologyAssert.assertThat((Topology) build.newTopologyRequest().send().join()).isComplete(i, i2, i3);
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void assertHealthyTopology(TestGateway<?> testGateway) {
        Assertions.assertThatCode(() -> {
            testGateway.probe(TestHealthProbe.READY);
        }).as("gateway '%s' is ready", new Object[]{testGateway.nodeId()}).doesNotThrowAnyException();
        ZeebeClient build = testGateway.newClientBuilder().build();
        try {
            TopologyAssert.assertThat((Topology) build.newTopologyRequest().send().join()).isHealthy();
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void assertProbe(TestApplication<?> testApplication, TestHealthProbe testHealthProbe) {
        Assertions.assertThatCode(() -> {
            testApplication.probe(testHealthProbe);
        }).doesNotThrowAnyException();
    }

    private Stream<TestGateway<?>> allGateways() {
        Stream<TestApplication<?>> filter = nodes().values().stream().filter((v0) -> {
            return v0.isGateway();
        });
        Class<TestGateway> cls = TestGateway.class;
        Objects.requireNonNull(TestGateway.class);
        return filter.filter((v1) -> {
            return r1.isInstance(v1);
        }).map(testApplication -> {
            return (TestGateway) testApplication;
        });
    }
}
