package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.camunda.zeebe.broker.exporter.util.ControlledTestExporter;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterDirectorDistributionTest.class */
public final class ExporterDirectorDistributionTest {
    private static final String EXPORTER_ID_1 = "exporter-1";
    private static final String EXPORTER_ID_2 = "exporter-2";
    private static final Duration DISTRIBUTION_INTERVAL = Duration.ofSeconds(15);
    private final SimplePartitionMessageService simplePartitionMessageService = new SimplePartitionMessageService();

    @Rule
    public final ExporterRule activeExporters = ExporterRule.activeExporter().withPartitionMessageService(this.simplePartitionMessageService).withDistributionInterval(DISTRIBUTION_INTERVAL);

    @Rule
    public final ExporterRule passiveExporters = ExporterRule.passiveExporter().withPartitionMessageService(this.simplePartitionMessageService);
    private final List<ControlledTestExporter> exporters = new ArrayList();
    private final List<ExporterDescriptor> exporterDescriptors = new ArrayList();

    @Before
    public void init() {
        this.exporters.clear();
        this.exporterDescriptors.clear();
        createExporter(EXPORTER_ID_1, Collections.singletonMap("x", 1));
        createExporter(EXPORTER_ID_2, Collections.singletonMap("y", 2));
    }

    private void createExporter(String str, Map<String, Object> map) {
        ControlledTestExporter controlledTestExporter = (ControlledTestExporter) Mockito.spy(new ControlledTestExporter());
        ExporterDescriptor exporterDescriptor = (ExporterDescriptor) Mockito.spy(new ExporterDescriptor(str, controlledTestExporter.getClass(), map));
        ((ExporterDescriptor) Mockito.doAnswer(invocationOnMock -> {
            return controlledTestExporter;
        }).when(exporterDescriptor)).newInstance();
        this.exporters.add(controlledTestExporter);
        this.exporterDescriptors.add(exporterDescriptor);
    }

    private void startExporters(List<ExporterDescriptor> list) {
        this.activeExporters.startExporterDirector(list);
        this.passiveExporters.startExporterDirector(list);
    }

    @Test
    public void shouldDistributeExporterPositions() {
        ControlledTestExporter controlledTestExporter = this.exporters.get(1);
        this.exporters.forEach(controlledTestExporter2 -> {
            controlledTestExporter2.shouldAutoUpdatePosition(true);
        });
        startExporters(this.exporterDescriptors);
        long writeEvent = this.activeExporters.writeEvent(DeploymentIntent.CREATED, new DeploymentRecord());
        Awaitility.await("director has read all records until now").untilAsserted(() -> {
            Assertions.assertThat(controlledTestExporter.getExportedRecords()).hasSize(1);
        });
        ExportersState exportersState = this.activeExporters.getExportersState();
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(writeEvent);
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(writeEvent);
        ExportersState exportersState2 = this.passiveExporters.getExportersState();
        Assertions.assertThat(exportersState2.getPosition(EXPORTER_ID_1)).isEqualTo(-1L);
        Assertions.assertThat(exportersState2.getPosition(EXPORTER_ID_2)).isEqualTo(-1L);
        this.activeExporters.getClock().addTime(DISTRIBUTION_INTERVAL);
        Awaitility.await("Active Director has distributed positions and passive has received it").untilAsserted(() -> {
            Assertions.assertThat(exportersState2.getPosition(EXPORTER_ID_1)).isEqualTo(writeEvent);
            Assertions.assertThat(exportersState2.getPosition(EXPORTER_ID_2)).isEqualTo(writeEvent);
        });
    }

    @Test
    public void shouldNotResetExporterPositionWhenOldPositionReceived() throws Exception {
        this.exporters.forEach(controlledTestExporter -> {
            controlledTestExporter.shouldAutoUpdatePosition(true);
        });
        startExporters(this.exporterDescriptors);
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_1, 10L);
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_2, 10L);
        this.activeExporters.getClock().addTime(DISTRIBUTION_INTERVAL);
        ExportersState exportersState = this.passiveExporters.getExportersState();
        Awaitility.await("Active Director has distributed positions and passive has received it").untilAsserted(() -> {
            Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(10L);
            Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(10L);
        });
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_1, 9L);
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_2, 11L);
        this.activeExporters.getClock().addTime(DISTRIBUTION_INTERVAL);
        Awaitility.await("Active Director has distributed positions and passive has received it").untilAsserted(() -> {
            Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(11L);
        });
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(10L);
    }
}
