/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableLastProcessedPositionState;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.verification.VerificationMode;
import org.mockito.verification.VerificationWithTimeout;

public final class StreamProcessorReplayModeTest {
    private static final long TIMEOUT_MILLIS = 2000L;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout((long)2000L);
    private static final int PARTITION_ID = 1;
    private static final ProcessInstanceRecord RECORD = Records.processInstance(1L);
    @Rule
    public final StreamProcessorRule replayUntilEnd = new StreamProcessorRule(1).withStreamProcessorMode(StreamProcessorMode.PROCESSING);
    @Rule
    public final StreamProcessorRule replayContinuously = new StreamProcessorRule(1).withStreamProcessorMode(StreamProcessorMode.REPLAY);
    @Rule
    public MockitoRule mockitoRule = MockitoJUnit.rule();
    @Mock
    private TypedRecordProcessor<?> typedRecordProcessor;
    @Mock
    private EventApplier eventApplier;

    @Test
    public void shouldReplayUntilEnd() {
        this.replayUntilEnd.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        this.startStreamProcessor(this.replayUntilEnd);
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((Comparable)this.getCurrentPhase(this.replayUntilEnd)).isEqualTo((Object)StreamProcessor.Phase.PROCESSING));
        this.replayUntilEnd.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier)inOrder.verify((Object)this.eventApplier, (VerificationMode)TIMEOUT)).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, (VerificationMode)TIMEOUT)).processRecord(ArgumentMatchers.anyLong(), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldReplayContinuously() {
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        this.startStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier)inOrder.verify((Object)this.eventApplier, TIMEOUT.times(2))).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, Mockito.never())).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
        Assertions.assertThat((Comparable)this.getCurrentPhase(this.replayContinuously)).isEqualTo((Object)StreamProcessor.Phase.REPLAY);
    }

    @Test
    public void shouldReplayIfNoEventsAfterSnapshot() {
        this.startStreamProcessor(this.replayContinuously);
        long snapshotPosition = 1L;
        this.replayContinuously.getZeebeState().getLastProcessedPositionState().markAsProcessed(1L);
        this.replayContinuously.snapshot();
        this.replayContinuously.closeStreamProcessor();
        this.startStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeEvent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject)RECORD, writer -> writer.key(1L).sourceRecordPosition(1L));
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier)inOrder.verify((Object)this.eventApplier, TIMEOUT.times(1))).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotReplayWhenPaused() {
        this.startWithPausedStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        inOrder.verifyNoMoreInteractions();
        Assertions.assertThat((Comparable)this.getCurrentPhase(this.replayContinuously)).isEqualTo((Object)StreamProcessor.Phase.PAUSED);
    }

    @Test
    public void shouldPauseReplay() {
        StreamProcessor streamProcessor = this.startStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        Awaitility.await((String)"should have replayed first events").until(this.replayContinuously::getLastSuccessfulProcessedRecordPosition, pos -> pos > 0L);
        streamProcessor.pauseProcessing().join();
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier)inOrder.verify((Object)this.eventApplier, TIMEOUT.times(1))).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
        Assertions.assertThat((Comparable)this.getCurrentPhase(this.replayContinuously)).isEqualTo((Object)StreamProcessor.Phase.PAUSED);
    }

    @Test
    public void shouldReplayAfterResumed() {
        this.startWithPausedStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        this.replayContinuously.resumeProcessing(1);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier)inOrder.verify((Object)this.eventApplier, TIMEOUT.times(1))).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify(this.typedRecordProcessor, Mockito.never())).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
        Assertions.assertThat((Comparable)this.getCurrentPhase(this.replayContinuously)).isEqualTo((Object)StreamProcessor.Phase.REPLAY);
    }

    @Test
    public void shouldReplayMoreAfterResumed() {
        StreamProcessor streamProcessor = this.startStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        Awaitility.await((String)"should have replayed first events").until(this.replayContinuously::getLastSuccessfulProcessedRecordPosition, pos -> pos > 0L);
        streamProcessor.pauseProcessing().join();
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, (ProcessInstanceRecordValue)RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)RECORD).causedBy(0));
        streamProcessor.resumeProcessing();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier)inOrder.verify((Object)this.eventApplier, TIMEOUT.times(2))).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
        Assertions.assertThat((Comparable)this.getCurrentPhase(this.replayContinuously)).isEqualTo((Object)StreamProcessor.Phase.REPLAY);
    }

    @Test
    public void shouldUpdateLastProcessedAndWrittenPositionOnReplay() {
        this.startStreamProcessor(this.replayContinuously);
        long commandPosition = this.replayContinuously.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)RECORD);
        long eventPosition = this.replayContinuously.writeEvent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject)RECORD, event -> event.sourceRecordPosition(commandPosition));
        ((EventApplier)Mockito.verify((Object)this.eventApplier, (VerificationMode)TIMEOUT)).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue)ArgumentMatchers.any());
        Awaitility.await().untilAsserted(() -> {
            Long lastProcessedPosition = this.getLastProcessedPosition(this.replayContinuously);
            Long lastWrittenPosition = this.getLastWrittenPosition(this.replayContinuously);
            ((AbstractLongAssert)Assertions.assertThat((Long)lastProcessedPosition).describedAs("Expected the position of the command to be the last processed position", new Object[0])).isEqualTo(commandPosition);
            ((AbstractLongAssert)Assertions.assertThat((Long)lastWrittenPosition).describedAs("Expected the position of the event to be the last written position", new Object[0])).isEqualTo(eventPosition);
        });
    }

    @Test
    public void shouldSetLastProcessedPositionOnStateToSourcePosition() {
        this.startStreamProcessor(this.replayContinuously);
        long commandPosition = this.replayContinuously.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)RECORD);
        this.replayContinuously.writeEvent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject)RECORD, event -> event.sourceRecordPosition(commandPosition));
        ((EventApplier)Mockito.verify((Object)this.eventApplier, (VerificationMode)TIMEOUT)).applyState(ArgumentMatchers.anyLong(), (Intent)ArgumentMatchers.eq((Object)ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue)ArgumentMatchers.any());
        Awaitility.await().until(() -> this.getLastProcessedPosition(this.replayContinuously), Predicate.isEqual(commandPosition));
        ((AbstractLongAssert)Assertions.assertThat((long)this.replayContinuously.getLastSuccessfulProcessedRecordPosition()).describedAs("Last processed position in the state must be the last source position", new Object[0])).isEqualTo(commandPosition);
    }

    @Test
    public void shouldNotSetLastProcessedPositionIfLessThanSnapshotPosition() {
        long commandPositionBeforeSnapshot = 1L;
        long snapshotPosition = 2L;
        this.startStreamProcessor(this.replayContinuously);
        this.replayContinuously.getZeebeState().getLastProcessedPositionState().markAsProcessed(2L);
        this.replayContinuously.snapshot();
        this.replayContinuously.closeStreamProcessor();
        this.startStreamProcessor(this.replayContinuously);
        long eventPosition = this.replayContinuously.writeEvent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject)RECORD, writer -> writer.sourceRecordPosition(1L));
        ((StreamProcessorListener)Mockito.verify((Object)this.replayContinuously.getMockStreamProcessorListener(), (VerificationMode)TIMEOUT)).onReplayed(-1L, eventPosition);
        MutableLastProcessedPositionState lastProcessedPositionState = this.replayContinuously.getZeebeState().getLastProcessedPositionState();
        ((AbstractLongAssert)Assertions.assertThat((long)lastProcessedPositionState.getLastSuccessfulProcessedRecordPosition()).describedAs("Expected that the last processed position is not less than the snapshot position", new Object[0])).isEqualTo(2L);
    }

    private StreamProcessor startStreamProcessor(StreamProcessorRule streamProcessorRule) {
        return streamProcessorRule.withEventApplierFactory(zeebeState -> this.eventApplier).startTypedStreamProcessorNotAwaitOpening((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, this.typedRecordProcessor));
    }

    private void startWithPausedStreamProcessor(StreamProcessorRule streamProcessorRule) {
        this.startStreamProcessor(streamProcessorRule).pauseProcessing().join();
    }

    private StreamProcessor.Phase getCurrentPhase(StreamProcessorRule streamProcessorRule) {
        return (StreamProcessor.Phase)this.getStreamProcessor(streamProcessorRule).getCurrentPhase().join();
    }

    private Long getLastProcessedPosition(StreamProcessorRule streamProcessorRule) {
        return (Long)this.getStreamProcessor(streamProcessorRule).getLastProcessedPositionAsync().join();
    }

    private Long getLastWrittenPosition(StreamProcessorRule streamProcessorRule) {
        return (Long)this.getStreamProcessor(streamProcessorRule).getLastWrittenPositionAsync().join();
    }

    private StreamProcessor getStreamProcessor(StreamProcessorRule streamProcessorRule) {
        return streamProcessorRule.getStreamProcessor(1);
    }
}

