/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.stream.StreamWrapper;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.mockito.verification.VerificationWithTimeout;

public final class StreamProcessorReprocessingTest {
    private static final long TIMEOUT_MILLIS = 2000L;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout((long)2000L);
    private static final ProcessInstanceRecord PROCESS_INSTANCE_RECORD = Records.processInstance(1L);
    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();

    @Before
    public void setup() {
        EventApplier mockEventApplier = (EventApplier)Mockito.mock(EventApplier.class);
        this.streamProcessorRule.withEventApplierFactory(state -> mockEventApplier);
    }

    @Test
    public void shouldStopProcessingWhenPaused() throws Exception {
        IntStream.range(0, 5000).forEach(i -> this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, i));
        this.streamProcessorRule.writeBatch(RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)PROCESS_INSTANCE_RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATED, (ProcessInstanceRecordValue)PROCESS_INSTANCE_RECORD).causedBy(0));
        Awaitility.await().until(() -> this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATED), StreamWrapper::exists);
        final CountDownLatch onRecoveredLatch = new CountDownLatch(1);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class);
        StreamProcessor streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor).withListener(new StreamProcessorLifecycleAware(){

            public void onRecovered(ReadonlyProcessingContext context) {
                onRecoveredLatch.countDown();
            }
        }));
        streamProcessor.pauseProcessing().join();
        boolean success = onRecoveredLatch.await(15L, TimeUnit.SECONDS);
        Assertions.assertThat((boolean)success).isTrue();
        Mockito.clearInvocations((Object[])new TypedRecordProcessor[]{typedRecordProcessor});
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)Records.processInstance(51966L));
        ((TypedRecordProcessor)Mockito.verify((Object)typedRecordProcessor, (VerificationMode)TIMEOUT.times(0))).processRecord(ArgumentMatchers.anyLong(), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        ((TypedRecordProcessor)Mockito.verify((Object)typedRecordProcessor, (VerificationMode)TIMEOUT.times(0))).processRecord((TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        ((TypedRecordProcessor)Mockito.verify((Object)typedRecordProcessor, (VerificationMode)TIMEOUT.times(0))).processRecord((TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any());
    }

    @Test
    public void shouldContinueProcessingWhenResumed() throws Exception {
        IntStream.range(0, 5000).forEach(i -> this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, i));
        this.streamProcessorRule.writeBatch(RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)PROCESS_INSTANCE_RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATED, (ProcessInstanceRecordValue)PROCESS_INSTANCE_RECORD).causedBy(0));
        Awaitility.await().until(() -> this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATED), StreamWrapper::exists);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class);
        StreamProcessor streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor).withListener(new StreamProcessorLifecycleAware(){

            public void onRecovered(ReadonlyProcessingContext context) {
                countDownLatch.countDown();
            }
        }));
        streamProcessor.pauseProcessing();
        streamProcessor.resumeProcessing();
        boolean success = countDownLatch.await(15L, TimeUnit.SECONDS);
        Assertions.assertThat((boolean)success).isTrue();
        Mockito.clearInvocations((Object[])new TypedRecordProcessor[]{typedRecordProcessor});
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)Records.processInstance(51966L));
        ((TypedRecordProcessor)Mockito.verify((Object)typedRecordProcessor, (VerificationMode)TIMEOUT.times(1))).processRecord(ArgumentMatchers.anyLong(), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
    }

    @Test
    public void shouldCallOnPausedAfterOnRecovered() {
        IntStream.range(0, 5000).forEach(i -> this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, i));
        this.streamProcessorRule.writeBatch(RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)PROCESS_INSTANCE_RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATED, (ProcessInstanceRecordValue)PROCESS_INSTANCE_RECORD).causedBy(0));
        Awaitility.await().until(() -> this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATED), StreamWrapper::exists);
        StreamProcessorLifecycleAware lifecycleAware = (StreamProcessorLifecycleAware)Mockito.mock(StreamProcessorLifecycleAware.class);
        StreamProcessor streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.withListener(lifecycleAware));
        streamProcessor.pauseProcessing();
        streamProcessor.resumeProcessing();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{lifecycleAware});
        ((StreamProcessorLifecycleAware)inOrder.verify((Object)lifecycleAware, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware)inOrder.verify((Object)lifecycleAware, TIMEOUT.times(1))).onPaused();
        ((StreamProcessorLifecycleAware)inOrder.verify((Object)lifecycleAware, TIMEOUT.times(1))).onResumed();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldCallOnPausedBeforeOnResumedNoMatterWhenResumedWasCalled() {
        IntStream.range(0, 5000).forEach(i -> this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, i));
        this.streamProcessorRule.writeBatch(RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, (ProcessInstanceRecordValue)PROCESS_INSTANCE_RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATED, (ProcessInstanceRecordValue)PROCESS_INSTANCE_RECORD).causedBy(0));
        Awaitility.await().until(() -> this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATED), StreamWrapper::exists);
        StreamProcessorLifecycleAware lifecycleAware = (StreamProcessorLifecycleAware)Mockito.mock(StreamProcessorLifecycleAware.class);
        StreamProcessor streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.withListener(lifecycleAware));
        streamProcessor.resumeProcessing();
        streamProcessor.pauseProcessing();
        streamProcessor.resumeProcessing();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{lifecycleAware});
        ((StreamProcessorLifecycleAware)inOrder.verify((Object)lifecycleAware, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware)inOrder.verify((Object)lifecycleAware, TIMEOUT.times(1))).onPaused();
        ((StreamProcessorLifecycleAware)inOrder.verify((Object)lifecycleAware, TIMEOUT.times(1))).onResumed();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldStartAfterLastProcessedEventInSnapshot() {
        this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
            }
        }));
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)Records.processInstance(2L));
        ((StreamProcessorListener)Mockito.verify((Object)this.streamProcessorRule.getMockStreamProcessorListener(), (VerificationMode)TIMEOUT.times(2))).onProcessed((TypedRecord)ArgumentMatchers.any());
        this.streamProcessorRule.snapshot();
        this.streamProcessorRule.closeStreamProcessor();
        Mockito.clearInvocations((Object[])new StreamProcessorListener[]{this.streamProcessorRule.getMockStreamProcessorListener()});
        this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
            }
        }));
        long position = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)Records.processInstance(3L));
        ArgumentCaptor processedCommandCaptor = ArgumentCaptor.forClass(TypedRecord.class);
        ((StreamProcessorListener)Mockito.verify((Object)this.streamProcessorRule.getMockStreamProcessorListener(), (VerificationMode)TIMEOUT)).onProcessed((TypedRecord)processedCommandCaptor.capture());
        Assertions.assertThat((List)processedCommandCaptor.getAllValues()).extracting(Record::getPosition).containsExactly((Object[])new Long[]{position});
    }

    @Test
    public void shouldUpdateLastProcessedPositionAfterReplay() throws Exception {
        long recordKey = 1L;
        ProcessInstanceRecord record = PROCESS_INSTANCE_RECORD;
        long firstPosition = this.streamProcessorRule.writeCommand(1L, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)record);
        this.streamProcessorRule.writeEvent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject)record, event -> event.key(1L).sourceRecordPosition(firstPosition));
        TestUtil.waitUntil(() -> this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING).exists());
        final CountDownLatch recoveredLatch = new CountDownLatch(1);
        StreamProcessor streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.withListener(new StreamProcessorLifecycleAware(){

            public void onRecovered(ReadonlyProcessingContext context) {
                recoveredLatch.countDown();
            }
        }));
        recoveredLatch.await();
        Assertions.assertThat((Long)((Long)streamProcessor.getLastProcessedPositionAsync().get())).isEqualTo(firstPosition);
    }

    @Test
    public void shouldUpdateLastProcessedEventWhenSnapshot() throws Exception {
        this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
            }
        }));
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        long snapshotPosition = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)Records.processInstance(2L));
        ((StreamProcessorListener)Mockito.verify((Object)this.streamProcessorRule.getMockStreamProcessorListener(), (VerificationMode)TIMEOUT.times(2))).onProcessed((TypedRecord)ArgumentMatchers.any());
        this.streamProcessorRule.snapshot();
        this.streamProcessorRule.closeStreamProcessor();
        final CountDownLatch recoveredLatch = new CountDownLatch(1);
        StreamProcessor streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.withListener(new StreamProcessorLifecycleAware(){

            public void onRecovered(ReadonlyProcessingContext context) {
                recoveredLatch.countDown();
            }
        }));
        recoveredLatch.await();
        Assertions.assertThat((Long)((Long)streamProcessor.getLastProcessedPositionAsync().get())).isEqualTo(snapshotPosition);
    }
}

