package io.camunda.zeebe.broker.test;

import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.SpringBrokerBridge;
import io.camunda.zeebe.broker.TestLoggers;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.system.EmbeddedGatewayService;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.test.util.TestConfigurationFactory;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.Environment;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.allocation.DirectBufferAllocator;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.agrona.LangUtil;
import org.assertj.core.util.Files;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/test/EmbeddedBrokerRule.class */
public final class EmbeddedBrokerRule extends ExternalResource {
    public static final String DEFAULT_CONFIG_FILE = "zeebe.test.cfg.yaml";
    public static final int INSTALL_TIMEOUT = 5;
    public static final TimeUnit INSTALL_TIMEOUT_UNIT = TimeUnit.MINUTES;
    protected static final Logger LOG = TestLoggers.TEST_LOGGER;
    private static final boolean ENABLE_DEBUG_EXPORTER = false;
    private static final boolean ENABLE_HTTP_EXPORTER = false;
    private static final String SNAPSHOTS_DIRECTORY = "snapshots";
    private static final String STATE_DIRECTORY = "state";
    protected final RecordingExporterTestWatcher recordingExporterTestWatcher;
    protected final Supplier<InputStream> configSupplier;
    protected final Consumer<BrokerCfg>[] configurators;
    protected BrokerCfg brokerCfg;
    protected Broker broker;
    protected final ControlledActorClock controlledActorClock;
    protected final SpringBrokerBridge springBrokerBridge;
    protected long startTime;
    private File newTemporaryFolder;
    private String dataDirectory;
    private SystemContext systemContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/broker/test/EmbeddedBrokerRule$LeaderPartitionListener.class */
    public static class LeaderPartitionListener implements PartitionListener {
        private final CountDownLatch latch;

        LeaderPartitionListener(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public ActorFuture<Void> onBecomingFollower(int i, long j) {
            return CompletableActorFuture.completed((Object) null);
        }

        public ActorFuture<Void> onBecomingLeader(int i, long j, LogStream logStream, QueryService queryService) {
            this.latch.countDown();
            return CompletableActorFuture.completed((Object) null);
        }

        public ActorFuture<Void> onBecomingInactive(int i, long j) {
            return CompletableActorFuture.completed((Object) null);
        }
    }

    @SafeVarargs
    public EmbeddedBrokerRule(Consumer<BrokerCfg>... consumerArr) {
        this(DEFAULT_CONFIG_FILE, consumerArr);
    }

    @SafeVarargs
    public EmbeddedBrokerRule(String str, Consumer<BrokerCfg>... consumerArr) {
        this((Supplier<InputStream>) () -> {
            return EmbeddedBrokerRule.class.getClassLoader().getResourceAsStream(str);
        }, consumerArr);
    }

    @SafeVarargs
    public EmbeddedBrokerRule(Supplier<InputStream> supplier, Consumer<BrokerCfg>... consumerArr) {
        this.recordingExporterTestWatcher = new RecordingExporterTestWatcher();
        this.controlledActorClock = new ControlledActorClock();
        this.springBrokerBridge = new SpringBrokerBridge();
        this.configSupplier = supplier;
        this.configurators = consumerArr;
    }

    private static void deleteSnapshots(File file) {
        File file2 = new File(file, SNAPSHOTS_DIRECTORY);
        if (file2.exists()) {
            try {
                FileUtil.deleteFolder(file2.getAbsolutePath());
            } catch (IOException e) {
                throw new RuntimeException("Could not delete snapshot directory " + file2.getAbsolutePath(), e);
            }
        }
    }

    public static void assignSocketAddresses(BrokerCfg brokerCfg) {
        EmbeddedBrokerConfigurator.setGatewayApiPort(SocketUtil.getNextAddress().getPort()).accept(brokerCfg);
        EmbeddedBrokerConfigurator.setGatewayClusterPort(SocketUtil.getNextAddress().getPort()).accept(brokerCfg);
        EmbeddedBrokerConfigurator.setCommandApiPort(SocketUtil.getNextAddress().getPort()).accept(brokerCfg);
        EmbeddedBrokerConfigurator.setInternalApiPort(SocketUtil.getNextAddress().getPort()).accept(brokerCfg);
        EmbeddedBrokerConfigurator.setMonitoringPort(SocketUtil.getNextAddress().getPort()).accept(brokerCfg);
    }

    public Statement apply(Statement statement, Description description) {
        return super.apply(this.recordingExporterTestWatcher.apply(statement, description), description);
    }

    public void before() {
        this.newTemporaryFolder = Files.newTemporaryFolder();
        this.startTime = System.currentTimeMillis();
        startBroker(new PartitionListener[0]);
        LOG.info("\n====\nBroker startup time: {}\n====\n", Long.valueOf(System.currentTimeMillis() - this.startTime));
        this.startTime = System.currentTimeMillis();
    }

    public void after() {
        try {
            LOG.info("Test execution time: " + (System.currentTimeMillis() - this.startTime));
            this.startTime = System.currentTimeMillis();
            stopBroker();
            LOG.info("Broker closing time: " + (System.currentTimeMillis() - this.startTime));
            long allocatedMemoryInKb = DirectBufferAllocator.getAllocatedMemoryInKb();
            if (allocatedMemoryInKb > 0) {
                LOG.warn("There are still allocated direct buffers of a total size of {}kB.", Long.valueOf(allocatedMemoryInKb));
            }
        } finally {
            try {
                FileUtil.deleteFolder(this.newTemporaryFolder.getAbsolutePath());
            } catch (IOException e) {
                LOG.error("Unexpected error on deleting data.", e);
            }
            this.controlledActorClock.reset();
        }
    }

    public BrokerCfg getBrokerCfg() {
        return this.brokerCfg;
    }

    public SpringBrokerBridge getSpringBrokerBridge() {
        return this.springBrokerBridge;
    }

    public ClusterServices getClusterServices() {
        return this.broker.getClusterServices();
    }

    public AtomixCluster getAtomixCluster() {
        return this.broker.getAtomixCluster();
    }

    public InetSocketAddress getGatewayAddress() {
        return this.brokerCfg.getGateway().getNetwork().toSocketAddress();
    }

    public Broker getBroker() {
        return this.broker;
    }

    public ControlledActorClock getClock() {
        return this.controlledActorClock;
    }

    public void restartBroker(PartitionListener... partitionListenerArr) {
        stopBroker();
        startBroker(partitionListenerArr);
    }

    public void stopBroker() {
        if (this.broker != null) {
            this.broker.close();
            this.broker = null;
            try {
                this.systemContext.getScheduler().stop().get();
            } catch (InterruptedException | ExecutionException e) {
                LangUtil.rethrowUnchecked(e);
            }
            this.systemContext = null;
            System.gc();
        }
    }

    public void startBroker(PartitionListener... partitionListenerArr) {
        if (this.brokerCfg == null) {
            try {
                InputStream inputStream = this.configSupplier.get();
                try {
                    if (inputStream == null) {
                        this.brokerCfg = new BrokerCfg();
                    } else {
                        this.brokerCfg = (BrokerCfg) new TestConfigurationFactory().create((Environment) null, "zeebe.broker", inputStream, BrokerCfg.class);
                    }
                    configureBroker(this.brokerCfg);
                    if (inputStream != null) {
                        inputStream.close();
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException("Unable to open configuration", e);
            }
        }
        this.systemContext = new SystemContext(this.brokerCfg, this.newTemporaryFolder.getAbsolutePath(), this.controlledActorClock);
        this.systemContext.getScheduler().start();
        this.broker = new Broker(this.systemContext, this.springBrokerBridge);
        CountDownLatch countDownLatch = new CountDownLatch(this.brokerCfg.getCluster().getPartitionsCount());
        this.broker.addPartitionListener(new LeaderPartitionListener(countDownLatch));
        for (PartitionListener partitionListener : partitionListenerArr) {
            this.broker.addPartitionListener(partitionListener);
        }
        this.broker.start().join();
        try {
            countDownLatch.await(5L, INSTALL_TIMEOUT_UNIT);
        } catch (InterruptedException e2) {
            LOG.info("Broker was not started in 15 seconds", e2);
            Thread.currentThread().interrupt();
        }
        EmbeddedGatewayService embeddedGatewayService = this.broker.getEmbeddedGatewayService();
        if (embeddedGatewayService != null) {
            BrokerClient brokerClient = embeddedGatewayService.get().getBrokerClient();
            TestUtil.waitUntil(() -> {
                BrokerClusterState topology = brokerClient.getTopologyManager().getTopology();
                return topology != null && topology.getLeaderForPartition(1) >= 0;
            });
        }
        this.dataDirectory = this.broker.getSystemContext().getBrokerConfiguration().getData().getDirectory();
    }

    public void configureBroker(BrokerCfg brokerCfg) {
        EmbeddedBrokerConfigurator.TEST_RECORDER.accept(brokerCfg);
        for (Consumer<BrokerCfg> consumer : this.configurators) {
            consumer.accept(brokerCfg);
        }
        assignSocketAddresses(brokerCfg);
    }

    public void purgeSnapshots() {
        File[] listFiles = new File(this.dataDirectory).listFiles((file, str) -> {
            return new File(file, str).isDirectory();
        });
        if (listFiles == null) {
            return;
        }
        for (File file2 : listFiles) {
            File file3 = new File(file2, STATE_DIRECTORY);
            if (file3.exists()) {
                deleteSnapshots(file3);
            }
        }
    }
}
