/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.verification.VerificationMode;
import org.mockito.verification.VerificationWithTimeout;

public final class StreamProcessorReplayTest {
    private static final long TIMEOUT_MILLIS = 2000L;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout((long)2000L);
    private static final int EXPECTED_ON_RECOVERED_INVOCATIONS = 1;
    private static final ProcessInstanceRecord RECORD = Records.processInstance(1L);
    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();
    @Rule
    public MockitoRule mockitoRule = MockitoJUnit.rule();
    @Mock
    private TypedRecordProcessor<?> typedRecordProcessor;
    @Mock
    private EventApplier eventApplier;

    @Test
    public void shouldReplayEvents() {
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        this.startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier)inOrder.verify((Object)this.eventApplier, (VerificationMode)TIMEOUT)).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.anyLong(), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldSkipCommands() {
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        this.startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.anyLong(), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        ((EventApplier)inOrder.verify((Object)this.eventApplier, Mockito.never())).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ACTIVATE_ELEMENT), (RecordValue)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldSkipRejections() {
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.rejection().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        this.startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.anyLong(), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        ((EventApplier)inOrder.verify((Object)this.eventApplier, Mockito.never())).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ACTIVATE_ELEMENT), (RecordValue)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotReplayEventIfAlreadyApplied() {
        long eventKeyBeforeSnapshot = 1L;
        long eventKeyAfterSnapshot = 2L;
        this.startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        long commandPositionBeforeSnapshot = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)RECORD);
        this.streamProcessorRule.writeEvent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject)RECORD, writer -> writer.key(1L).sourceRecordPosition(commandPositionBeforeSnapshot));
        this.awaitUntilProcessed(commandPositionBeforeSnapshot);
        this.streamProcessorRule.snapshot();
        this.streamProcessorRule.closeStreamProcessor();
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().key(2L).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        this.startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.eventApplier});
        ((EventApplier)inOrder.verify((Object)this.eventApplier, Mockito.never())).applyState(ArgumentMatchers.eq((long)1L), (Intent)ArgumentMatchers.any(), (RecordValue)ArgumentMatchers.any());
        ((EventApplier)inOrder.verify((Object)this.eventApplier, (VerificationMode)TIMEOUT)).applyState(ArgumentMatchers.eq((long)2L), (Intent)ArgumentMatchers.any(), (RecordValue)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldRestoreKeyGenerator() {
        long lastGeneratedKey = 2L;
        long previousGeneratedKey = 1L;
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().key(2L).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().key(1L).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(2));
        this.startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        ((TypedRecordProcessor)Mockito.verify(this.typedRecordProcessor, (VerificationMode)TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        KeyGenerator keyGenerator = this.streamProcessorRule.getZeebeState().getKeyGenerator();
        Assertions.assertThat((long)keyGenerator.nextKey()).isEqualTo(3L);
    }

    @Test
    public void shouldIgnoreKeysFromDifferentPartition() {
        long keyOfThisPartition = Protocol.encodePartitionId((int)0, (long)1L);
        long keyOfOtherPartition = Protocol.encodePartitionId((int)1, (long)2L);
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().key(keyOfThisPartition).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().key(keyOfOtherPartition).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(2));
        this.startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        ((TypedRecordProcessor)Mockito.verify(this.typedRecordProcessor, (VerificationMode)TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        KeyGenerator keyGenerator = this.streamProcessorRule.getZeebeState().getKeyGenerator();
        Assertions.assertThat((long)keyGenerator.nextKey()).isEqualTo(keyOfThisPartition + 1L);
    }

    private void startStreamProcessor(TypedRecordProcessor<?> typedRecordProcessor, EventApplier eventApplier) {
        this.streamProcessorRule.withEventApplierFactory(zeebeState -> eventApplier).startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor));
    }

    private void awaitUntilProcessed(long position) {
        Awaitility.await().untilAsserted(() -> {
            Long processedPosition = (Long)this.streamProcessorRule.getStreamProcessor(0).getLastProcessedPositionAsync().join();
            Assertions.assertThat((Long)processedPosition).isEqualTo(position);
        });
    }
}

