package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessingComposite;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.test.util.TestUtil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReprocessingTest.class */
public final class StreamProcessorReprocessingTest {
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);
    private static final ProcessInstanceRecord PROCESS_INSTANCE_RECORD = Records.processInstance(1);

    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();
    private StreamProcessingComposite.StreamProcessorTestFactory processorTestFactory;

    @Before
    public void setup() {
        EventApplier eventApplier = (EventApplier) Mockito.mock(EventApplier.class);
        this.streamProcessorRule.withEventApplierFactory(mutableZeebeState -> {
            return eventApplier;
        });
        this.processorTestFactory = (typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.1
                public void processRecord(TypedRecord<UnifiedRecordValue> typedRecord) {
                    typedRecordProcessorContext.getWriters().sideEffect().appendSideEffect(() -> {
                        return true;
                    });
                }
            });
        };
    }

    @Test
    public void shouldStopProcessingWhenPaused() throws Exception {
        IntStream.range(0, 5000).forEach(i -> {
            this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, i);
        });
        this.streamProcessorRule.writeBatch(RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, PROCESS_INSTANCE_RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATED, PROCESS_INSTANCE_RECORD).causedBy(0));
        Awaitility.await().until(() -> {
            return this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED);
        }, (v0) -> {
            return v0.exists();
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor).withListener(new StreamProcessorLifecycleAware() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.2
                public void onRecovered(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
                    countDownLatch.countDown();
                }
            });
        }).pauseProcessing().join();
        Assertions.assertThat(countDownLatch.await(15L, TimeUnit.SECONDS)).isTrue();
        Mockito.clearInvocations(new TypedRecordProcessor[]{typedRecordProcessor});
        this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(51966L));
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(0))).processRecord((TypedRecord) ArgumentMatchers.any());
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(0))).processRecord((TypedRecord) ArgumentMatchers.any());
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(0))).processRecord((TypedRecord) ArgumentMatchers.any());
    }

    @Test
    public void shouldContinueProcessingWhenResumed() throws Exception {
        IntStream.range(0, 5000).forEach(i -> {
            this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, i);
        });
        this.streamProcessorRule.writeBatch(RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, PROCESS_INSTANCE_RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATED, PROCESS_INSTANCE_RECORD).causedBy(0));
        Awaitility.await().until(() -> {
            return this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED);
        }, (v0) -> {
            return v0.exists();
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor).withListener(new StreamProcessorLifecycleAware() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.3
                public void onRecovered(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
                    countDownLatch.countDown();
                }
            });
        });
        startTypedStreamProcessor.pauseProcessing();
        startTypedStreamProcessor.resumeProcessing();
        Assertions.assertThat(countDownLatch.await(15L, TimeUnit.SECONDS)).isTrue();
        Mockito.clearInvocations(new TypedRecordProcessor[]{typedRecordProcessor});
        this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(51966L));
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord((TypedRecord) ArgumentMatchers.any());
    }

    @Test
    public void shouldCallOnPausedAfterOnRecovered() {
        IntStream.range(0, 5000).forEach(i -> {
            this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, i);
        });
        this.streamProcessorRule.writeBatch(RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, PROCESS_INSTANCE_RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATED, PROCESS_INSTANCE_RECORD).causedBy(0));
        Awaitility.await().until(() -> {
            return this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED);
        }, (v0) -> {
            return v0.exists();
        });
        StreamProcessorLifecycleAware streamProcessorLifecycleAware = (StreamProcessorLifecycleAware) Mockito.mock(StreamProcessorLifecycleAware.class);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.withListener(streamProcessorLifecycleAware);
        });
        startTypedStreamProcessor.pauseProcessing();
        startTypedStreamProcessor.resumeProcessing();
        InOrder inOrder = Mockito.inOrder(new Object[]{streamProcessorLifecycleAware});
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onRecovered((ReadonlyStreamProcessorContext) ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onPaused();
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onResumed();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldCallOnPausedBeforeOnResumedNoMatterWhenResumedWasCalled() {
        IntStream.range(0, 5000).forEach(i -> {
            this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, i);
        });
        this.streamProcessorRule.writeBatch(RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, PROCESS_INSTANCE_RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATED, PROCESS_INSTANCE_RECORD).causedBy(0));
        Awaitility.await().until(() -> {
            return this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED);
        }, (v0) -> {
            return v0.exists();
        });
        StreamProcessorLifecycleAware streamProcessorLifecycleAware = (StreamProcessorLifecycleAware) Mockito.mock(StreamProcessorLifecycleAware.class);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.withListener(streamProcessorLifecycleAware);
        });
        startTypedStreamProcessor.resumeProcessing();
        startTypedStreamProcessor.pauseProcessing();
        startTypedStreamProcessor.resumeProcessing();
        InOrder inOrder = Mockito.inOrder(new Object[]{streamProcessorLifecycleAware});
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onRecovered((ReadonlyStreamProcessorContext) ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onPaused();
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onResumed();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldStartAfterLastProcessedEventInSnapshot() {
        this.streamProcessorRule.startTypedStreamProcessor(this.processorTestFactory);
        this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, PROCESS_INSTANCE_RECORD);
        this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(2L));
        ((StreamProcessorListener) Mockito.verify(this.streamProcessorRule.getMockStreamProcessorListener(), TIMEOUT.times(2))).onProcessed((TypedRecord) ArgumentMatchers.any());
        this.streamProcessorRule.snapshot();
        this.streamProcessorRule.closeStreamProcessor();
        Mockito.clearInvocations(new StreamProcessorListener[]{this.streamProcessorRule.getMockStreamProcessorListener()});
        this.streamProcessorRule.startTypedStreamProcessor(this.processorTestFactory);
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(3L));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TypedRecord.class);
        ((StreamProcessorListener) Mockito.verify(this.streamProcessorRule.getMockStreamProcessorListener(), TIMEOUT)).onProcessed((TypedRecord) forClass.capture());
        Assertions.assertThat(forClass.getAllValues()).extracting((v0) -> {
            return v0.getPosition();
        }).containsExactly(new Long[]{Long.valueOf(writeCommand)});
    }

    @Test
    public void shouldUpdateLastProcessedPositionAfterReplay() throws Exception {
        UnpackedObject unpackedObject = PROCESS_INSTANCE_RECORD;
        long writeCommand = this.streamProcessorRule.writeCommand(1L, ProcessInstanceIntent.ACTIVATE_ELEMENT, unpackedObject);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, unpackedObject, fluentLogWriter -> {
            return fluentLogWriter.key(1L).sourceRecordPosition(writeCommand);
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATING).exists();
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.withListener(new StreamProcessorLifecycleAware() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.4
                public void onRecovered(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
                    countDownLatch.countDown();
                }
            });
        });
        countDownLatch.await();
        Assertions.assertThat((Long) startTypedStreamProcessor.getLastProcessedPositionAsync().get()).isEqualTo(writeCommand);
    }

    @Test
    public void shouldUpdateLastWrittenPositionAfterReplay() throws Exception {
        UnpackedObject unpackedObject = PROCESS_INSTANCE_RECORD;
        long writeCommand = this.streamProcessorRule.writeCommand(1L, ProcessInstanceIntent.ACTIVATE_ELEMENT, unpackedObject);
        long writeEvent = this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, unpackedObject, fluentLogWriter -> {
            return fluentLogWriter.key(1L).sourceRecordPosition(writeCommand);
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATING).exists();
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.withListener(new StreamProcessorLifecycleAware() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.5
                public void onRecovered(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
                    countDownLatch.countDown();
                }
            });
        });
        countDownLatch.await();
        Assertions.assertThat((Long) startTypedStreamProcessor.getLastWrittenPositionAsync().get()).isEqualTo(writeEvent);
    }

    @Test
    public void shouldUpdateLastProcessedEventWhenSnapshot() throws Exception {
        this.streamProcessorRule.startTypedStreamProcessor(this.processorTestFactory);
        this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, PROCESS_INSTANCE_RECORD);
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(2L));
        ((StreamProcessorListener) Mockito.verify(this.streamProcessorRule.getMockStreamProcessorListener(), TIMEOUT.times(2))).onProcessed((TypedRecord) ArgumentMatchers.any());
        this.streamProcessorRule.snapshot();
        this.streamProcessorRule.closeStreamProcessor();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.withListener(new StreamProcessorLifecycleAware() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.6
                public void onRecovered(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
                    countDownLatch.countDown();
                }
            });
        });
        countDownLatch.await();
        Assertions.assertThat((Long) startTypedStreamProcessor.getLastProcessedPositionAsync().get()).isEqualTo(writeCommand);
    }
}
