package io.camunda.zeebe.engine.processing.clock;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.client.ClockClient;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.ClockIntent;
import io.camunda.zeebe.protocol.record.intent.CommandDistributionIntent;
import io.camunda.zeebe.protocol.record.value.ClockRecordValue;
import io.camunda.zeebe.stream.api.StreamClock;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/clock/ClockMultiplePartitionsTest.class */
public class ClockMultiplePartitionsTest {
    private static final int PARTITION_COUNT = 3;

    @ClassRule
    public static final EngineRule ENGINE = EngineRule.multiplePartition(3);

    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private final ClockClient clockClient = ENGINE.clock();

    @Test
    public void shouldWriteDistributingRecordsForOtherPartitionsOnPin() {
        long key = this.clockClient.pinAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)).getKey();
        List asList = RecordingExporter.commandDistributionRecords().withIntent(CommandDistributionIntent.DISTRIBUTING).valueFilter(commandDistributionRecordValue -> {
            return commandDistributionRecordValue.getValueType().equals(ValueType.CLOCK);
        }).limit(2L).asList();
        Assertions.assertThat(asList).extracting((v0) -> {
            return v0.getKey();
        }).containsOnly(new Long[]{Long.valueOf(key)});
        Assertions.assertThat(asList).extracting((v0) -> {
            return v0.getValue();
        }).extracting((v0) -> {
            return v0.getPartitionId();
        }).containsExactly(new Integer[]{2, 3});
    }

    @Test
    public void shouldPinClockOnAllPartitions() {
        Instant truncatedTo = Instant.now().truncatedTo(ChronoUnit.MILLIS);
        Record<ClockRecordValue> pinAt = this.clockClient.pinAt(truncatedTo);
        for (int i = 1; i <= 3; i++) {
            int i2 = i;
            Awaitility.await("until side effect has been applied").until(() -> {
                return Boolean.valueOf(RecordingExporter.clockRecords(ClockIntent.PINNED).withPartitionId(i2).withRecordKey(pinAt.getKey()).exists());
            });
            Assertions.assertThat(ENGINE.getStreamClock(i2).instant()).isEqualTo(truncatedTo);
            Assertions.assertThat(ENGINE.getProcessingState(i2).getClockState().getModification()).isEqualTo(StreamClock.ControllableStreamClock.Modification.pinAt(truncatedTo));
        }
    }

    @Test
    public void shouldWriteDistributingRecordsForOtherPartitionsOnReset() {
        long key = this.clockClient.reset().getKey();
        List asList = RecordingExporter.commandDistributionRecords().withIntent(CommandDistributionIntent.DISTRIBUTING).valueFilter(commandDistributionRecordValue -> {
            return commandDistributionRecordValue.getValueType().equals(ValueType.CLOCK);
        }).limit(2L).asList();
        Assertions.assertThat(asList).extracting((v0) -> {
            return v0.getKey();
        }).containsOnly(new Long[]{Long.valueOf(key)});
        Assertions.assertThat(asList).extracting((v0) -> {
            return v0.getValue();
        }).extracting((v0) -> {
            return v0.getPartitionId();
        }).containsExactly(new Integer[]{2, 3});
    }

    @Test
    public void shouldResetClockOnAllPartitions() {
        Instant truncatedTo = Instant.now().truncatedTo(ChronoUnit.MILLIS);
        this.clockClient.pinAt(truncatedTo);
        Record<ClockRecordValue> reset = this.clockClient.reset();
        for (int i = 1; i <= 3; i++) {
            int i2 = i;
            Awaitility.await("until side effect has been applied").until(() -> {
                return Boolean.valueOf(RecordingExporter.clockRecords(ClockIntent.RESETTED).withPartitionId(i2).withRecordKey(reset.getKey()).exists());
            });
            Assertions.assertThat(ENGINE.getStreamClock(i2).instant()).isAfter(truncatedTo);
            Assertions.assertThat(ENGINE.getProcessingState(i2).getClockState().getModification()).isEqualTo(StreamClock.ControllableStreamClock.Modification.none());
        }
    }
}
