package io.zeebe.broker.exporter;

import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.system.configuration.ExporterCfg;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.exporter.context.Context;
import io.zeebe.exporter.context.Controller;
import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.RecordMetadata;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/exporter/ExporterManagerTest.class */
public class ExporterManagerTest {
    private static final int PARTITIONS = 3;
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(brokerCfg -> {
        brokerCfg.getCluster().setPartitionsCount(3);
        ExporterCfg exporterCfg = new ExporterCfg();
        exporterCfg.setClassName(TestExporter.class.getName());
        exporterCfg.setId("test-exporter");
        brokerCfg.getExporters().add(exporterCfg);
    });
    public ClientApiRule clientRule;

    @Rule
    public RuleChain ruleChain;

    /* loaded from: input_file:io/zeebe/broker/exporter/ExporterManagerTest$TestExporter.class */
    public static class TestExporter extends DebugExporter {
        static CountDownLatch configureLatch = new CountDownLatch(4);
        static CountDownLatch openLatch = new CountDownLatch(3);
        static CountDownLatch closeLatch = new CountDownLatch(3);
        static CountDownLatch exportLatch = new CountDownLatch(3);

        public void configure(Context context) {
            configureLatch.countDown();
            super.configure(context);
        }

        public void open(Controller controller) {
            openLatch.countDown();
            super.open(controller);
        }

        public void close() {
            closeLatch.countDown();
            super.close();
        }

        public void export(Record record) {
            RecordMetadata metadata = record.getMetadata();
            if (metadata.getValueType() == ValueType.JOB && metadata.getIntent() == JobIntent.CREATED) {
                exportLatch.countDown();
            }
            super.export(record);
        }
    }

    public ExporterManagerTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.clientRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.clientRule);
    }

    @Test
    public void shouldRunExporterForEveryPartition() throws InterruptedException {
        IntStream.range(0, 3).forEach(this::createJob);
        Assertions.assertThat(TestExporter.configureLatch.await(5L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(TestExporter.openLatch.await(5L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(TestExporter.exportLatch.await(5L, TimeUnit.SECONDS)).isTrue();
        this.brokerRule.stopBroker();
        Assertions.assertThat(TestExporter.closeLatch.await(5L, TimeUnit.SECONDS)).isTrue();
    }

    void createJob(int i) {
        ((ExecuteCommandRequestBuilder) this.clientRule.createCmdRequest().partitionId(i).type(ValueType.JOB, JobIntent.CREATE).command().put("type", TypedStreamProcessorTest.STREAM_NAME).put("retries", 3).done()).sendAndAwait();
    }
}
