package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.verification.VerificationWithTimeout;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReplayTest.class */
public final class StreamProcessorReplayTest {
    private static final int EXPECTED_ON_RECOVERED_INVOCATIONS = 2;

    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();

    @Rule
    public MockitoRule mockitoRule = MockitoJUnit.rule();

    @Mock
    private TypedRecordProcessor<?> typedRecordProcessor;

    @Mock
    private EventApplier eventApplier;
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);
    private static final ProcessInstanceRecord RECORD = new ProcessInstanceRecord().setBpmnElementType(BpmnElementType.TESTING_ONLY);

    @Test
    public void shouldReplayEvents() {
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.sourceRecordPosition(writeCommand);
        });
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier) inOrder.verify(this.eventApplier, TIMEOUT)).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(this.typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.anyLong(), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(this.typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldSkipCommands() {
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.sourceRecordPosition(writeCommand);
        });
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((TypedRecordProcessor) inOrder.verify(this.typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.anyLong(), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((EventApplier) inOrder.verify(this.eventApplier, Mockito.never())).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ACTIVATE_ELEMENT), (RecordValue) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(this.typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldSkipRejections() {
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeCommandRejection(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD, fluentLogWriter -> {
            return fluentLogWriter.sourceRecordPosition(writeCommand);
        });
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((TypedRecordProcessor) inOrder.verify(this.typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.anyLong(), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((EventApplier) inOrder.verify(this.eventApplier, Mockito.never())).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ACTIVATE_ELEMENT), (RecordValue) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(this.typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotReplayEventIfAlreadyApplied() {
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.key(1L).sourceRecordPosition(writeCommand);
        });
        awaitUntilProcessed(writeCommand);
        this.streamProcessorRule.snapshot();
        this.streamProcessorRule.closeStreamProcessor();
        long writeCommand2 = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter2 -> {
            return fluentLogWriter2.key(2L).sourceRecordPosition(writeCommand2);
        });
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.eventApplier});
        ((EventApplier) inOrder.verify(this.eventApplier, Mockito.never())).applyState(ArgumentMatchers.eq(1L), (Intent) ArgumentMatchers.any(), (RecordValue) ArgumentMatchers.any());
        ((EventApplier) inOrder.verify(this.eventApplier, TIMEOUT)).applyState(ArgumentMatchers.eq(2L), (Intent) ArgumentMatchers.any(), (RecordValue) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldRestoreKeyGenerator() {
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.key(2L).sourceRecordPosition(writeCommand);
        });
        long writeCommand2 = this.streamProcessorRule.writeCommand(1L, ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter2 -> {
            return fluentLogWriter2.key(1L).sourceRecordPosition(writeCommand2);
        });
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        ((TypedRecordProcessor) Mockito.verify(this.typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        Assertions.assertThat(this.streamProcessorRule.getZeebeState().getKeyGenerator().nextKey()).isEqualTo(3L);
    }

    @Test
    public void shouldRestoreKeyGeneratorAfterSkippingCommand() {
        ((AbstractBooleanAssert) Assumptions.assumeThat(MigratedStreamProcessors.isMigrated(ValueType.INCIDENT)).describedAs("Expected a not yet migrated value type", new Object[0])).isFalse();
        UnpackedObject incidentRecord = new IncidentRecord();
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.key(2L).sourceRecordPosition(writeCommand);
        });
        long writeCommand2 = this.streamProcessorRule.writeCommand(1L, ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter2 -> {
            return fluentLogWriter2.key(1L).sourceRecordPosition(writeCommand2);
        });
        long writeCommand3 = this.streamProcessorRule.writeCommand(IncidentIntent.RESOLVE, incidentRecord);
        this.streamProcessorRule.writeEvent((Intent) IncidentIntent.RESOLVED, incidentRecord, fluentLogWriter3 -> {
            return fluentLogWriter3.key(3L).sourceRecordPosition(writeCommand3);
        });
        AtomicLong atomicLong = new AtomicLong(-1L);
        this.streamProcessorRule.withEventApplierFactory(mutableZeebeState -> {
            return this.eventApplier;
        }).startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, this.typedRecordProcessor).onEvent(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATING, this.typedRecordProcessor).onCommand(ValueType.INCIDENT, IncidentIntent.RESOLVE, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorReplayTest.1
                public void processRecord(TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
                    atomicLong.set(readonlyProcessingContext.getZeebeState().getKeyGenerator().nextKey());
                }
            });
        });
        awaitUntilProcessed(writeCommand3);
        ((AbstractLongAssert) Assertions.assertThat(atomicLong.get()).describedAs("Expected the generated key on reprocessing to be equal to the written key on the stream", new Object[0])).isEqualTo(3L);
    }

    @Test
    public void shouldIgnoreKeysFromDifferentPartition() {
        long encodePartitionId = Protocol.encodePartitionId(0, 1L);
        long encodePartitionId2 = Protocol.encodePartitionId(1, 2L);
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.key(encodePartitionId).sourceRecordPosition(writeCommand);
        });
        long writeCommand2 = this.streamProcessorRule.writeCommand(encodePartitionId2, ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter2 -> {
            return fluentLogWriter2.key(encodePartitionId2).sourceRecordPosition(writeCommand2);
        });
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        ((TypedRecordProcessor) Mockito.verify(this.typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        Assertions.assertThat(this.streamProcessorRule.getZeebeState().getKeyGenerator().nextKey()).isEqualTo(encodePartitionId + 1);
    }

    private void startStreamProcessor(TypedRecordProcessor<?> typedRecordProcessor, EventApplier eventApplier) {
        this.streamProcessorRule.withEventApplierFactory(mutableZeebeState -> {
            return eventApplier;
        }).startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor).onEvent(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor);
        });
    }

    private void awaitUntilProcessed(long j) {
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat((Long) this.streamProcessorRule.getStreamProcessor(0).getLastProcessedPositionAsync().join()).isEqualTo(j);
        });
    }
}
