/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.mutable.MutableJobState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.exception.RecoverableException;
import io.camunda.zeebe.util.sched.ActorControl;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.mockito.verification.VerificationWithTimeout;

public final class StreamProcessorTest {
    private static final long TIMEOUT_MILLIS = 2000L;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout((long)2000L);
    private static final ProcessInstanceRecord PROCESS_INSTANCE_RECORD = Records.processInstance(1L);
    private static final JobRecord JOB_RECORD = Records.job(1L).setType("test");
    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();
    private ActorControl processingContextActor;

    @Test
    public void shouldCallStreamProcessorLifecycle() throws Exception {
        StreamProcessorLifecycleAware lifecycleAware = (StreamProcessorLifecycleAware)Mockito.mock(StreamProcessorLifecycleAware.class);
        final CountDownLatch recoveredLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.withListener(lifecycleAware).withListener(new StreamProcessorLifecycleAware(){

            public void onRecovered(ReadonlyProcessingContext context) {
                recoveredLatch.countDown();
            }
        }));
        recoveredLatch.await();
        this.streamProcessorRule.closeStreamProcessor();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{lifecycleAware});
        ((StreamProcessorLifecycleAware)inOrder.verify((Object)lifecycleAware, Mockito.times((int)1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware)inOrder.verify((Object)lifecycleAware, Mockito.times((int)1))).onClose();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldCallStreamProcessorLifecycleOnFail() throws InterruptedException {
        final CountDownLatch failedLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessorNotAwaitOpening((processors, state) -> processors.withListener(new StreamProcessorLifecycleAware(){

            public void onRecovered(ReadonlyProcessingContext context) {
                throw new RuntimeException("force fail");
            }

            public void onFailed() {
                failedLatch.countDown();
            }
        }));
        Assertions.assertThat((boolean)failedLatch.await(1000L, TimeUnit.MILLISECONDS)).isTrue();
    }

    @Test
    public void shouldCallRecordProcessorLifecycle() throws Exception {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class);
        final CountDownLatch recoveredLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor).withListener(new StreamProcessorLifecycleAware(){

            public void onRecovered(ReadonlyProcessingContext context) {
                recoveredLatch.countDown();
            }
        }));
        recoveredLatch.await();
        this.streamProcessorRule.closeStreamProcessor();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, Mockito.times((int)1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, Mockito.times((int)1))).onClose();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldProcessRecord() {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor));
        long position = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)Records.processInstance(1L));
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq((long)position), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((long)this.streamProcessorRule.getLastSuccessfulProcessedRecordPosition()).isEqualTo(position));
    }

    @Test
    public void shouldRetryProcessingRecordOnRecoverableException() {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class);
        AtomicInteger count = new AtomicInteger(0);
        ((TypedRecordProcessor)Mockito.doAnswer(invocationOnMock -> {
            if (count.getAndIncrement() == 0) {
                throw new RecoverableException("recoverable");
            }
            return null;
        }).when((Object)typedRecordProcessor)).processRecord(ArgumentMatchers.anyLong(), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor));
        long position = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, TIMEOUT.times(2))).processRecord(ArgumentMatchers.eq((long)position), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldIgnoreRecordWhenNoProcessorExistForThisType() {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor));
        long firstPosition = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        long secondPosition = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.TERMINATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq((long)firstPosition), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.eq((long)secondPosition), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldProcessOnlyCommands() {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor));
        long commandPosition = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        long eventPosition = this.streamProcessorRule.writeEvent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        long rejectionPosition = this.streamProcessorRule.writeCommandRejection((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        long nextCommandPosition = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, (VerificationMode)TIMEOUT)).onRecovered((ReadonlyProcessingContext)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, (VerificationMode)TIMEOUT)).processRecord(ArgumentMatchers.eq((long)commandPosition), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.eq((long)eventPosition), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.eq((long)rejectionPosition), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        ((TypedRecordProcessor)inOrder.verify((Object)typedRecordProcessor, (VerificationMode)TIMEOUT)).processRecord(ArgumentMatchers.eq((long)nextCommandPosition), (TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldWriteFollowUpEvent() {
        StreamProcessor streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                streamWriter.appendFollowUpEvent(record.getKey(), (Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (RecordValue)record.getValue());
            }
        }));
        long position = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        Record<ProcessInstanceRecord> activatedEvent = this.waitForActivatingEvent();
        Assertions.assertThat(activatedEvent).isNotNull();
        Assertions.assertThat((long)activatedEvent.getSourceRecordPosition()).isEqualTo(position);
        Assertions.assertThat((Long)((Long)streamProcessor.getLastWrittenPositionAsync().join())).isEqualTo(activatedEvent.getPosition());
        Assertions.assertThat((Long)((Long)streamProcessor.getLastProcessedPositionAsync().join())).isEqualTo(position);
    }

    @Test
    public void shouldExecuteSideEffects() throws Exception {
        final CountDownLatch processLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                sideEffect.accept(() -> {
                    processLatch.countDown();
                    return true;
                });
            }
        }));
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        Assertions.assertThat((boolean)processLatch.await(5L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void shouldRepeatExecuteSideEffects() throws Exception {
        final CountDownLatch processLatch = new CountDownLatch(2);
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                sideEffect.accept(() -> {
                    processLatch.countDown();
                    return processLatch.getCount() < 1L;
                });
            }
        }));
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        Assertions.assertThat((boolean)processLatch.await(5L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void shouldSkipSideEffectsOnException() throws Exception {
        final CountDownLatch processLatch = new CountDownLatch(2);
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                sideEffect.accept(() -> {
                    throw new RuntimeException("expected");
                });
                processLatch.countDown();
            }
        }));
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        Assertions.assertThat((boolean)processLatch.await(5L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void shouldNotUpdateStateOnExceptionInProcessing() {
        long jobKey = 1L;
        this.streamProcessorRule.startTypedStreamProcessor((builder, processingContext) -> {
            this.processingContextActor = processingContext.getActor();
            final MutableZeebeState state = processingContext.getZeebeState();
            return builder.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

                public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                    state.getJobState().create(1L, JOB_RECORD);
                    throw new RuntimeException("expected");
                }
            });
        });
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        ((StreamProcessorListener)Mockito.verify((Object)this.streamProcessorRule.getMockStreamProcessorListener(), (VerificationMode)TIMEOUT.times(2))).onProcessed((TypedRecord)ArgumentMatchers.any());
        this.processingContextActor.call(() -> {
            MutableJobState jobState = this.streamProcessorRule.getZeebeState().getJobState();
            JobRecord job = jobState.getJob(1L);
            Assertions.assertThat((Object)job).isNull();
        }).join();
    }

    @Test
    public void shouldUpdateStateAfterProcessing() {
        long jobKey = 1L;
        this.streamProcessorRule.startTypedStreamProcessor((builder, processingContext) -> {
            this.processingContextActor = processingContext.getActor();
            final MutableZeebeState state = processingContext.getZeebeState();
            return builder.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

                public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                    state.getJobState().create(1L, JOB_RECORD);
                }
            });
        });
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        ((StreamProcessorListener)Mockito.verify((Object)this.streamProcessorRule.getMockStreamProcessorListener(), (VerificationMode)TIMEOUT)).onProcessed((TypedRecord)ArgumentMatchers.any());
        this.processingContextActor.call(() -> {
            MutableJobState jobState = this.streamProcessorRule.getZeebeState().getJobState();
            JobRecord job = jobState.getJob(1L);
            Assertions.assertThat((Object)job).isNotNull();
        }).join();
    }

    @Test
    public void shouldWriteResponse() {
        this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                responseWriter.writeEventOnCommand(3L, (Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject)record.getValue(), record);
            }
        }));
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        CommandResponseWriter commandResponseWriter = this.streamProcessorRule.getCommandResponseWriter();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{commandResponseWriter});
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).key(3L);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).intent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).recordType(RecordType.EVENT);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).valueType(ValueType.PROCESS_INSTANCE);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldWriteResponseOnFailedEventProcessing() {
        this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                responseWriter.writeEventOnCommand(3L, (Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject)record.getValue(), record);
                throw new RuntimeException("expected");
            }
        }));
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        CommandResponseWriter commandResponseWriter = this.streamProcessorRule.getCommandResponseWriter();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{commandResponseWriter});
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).key(3L);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).intent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).recordType(RecordType.EVENT);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).valueType(ValueType.PROCESS_INSTANCE);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).recordType(RecordType.COMMAND_REJECTION);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).rejectionType(RejectionType.PROCESSING_ERROR);
        ((CommandResponseWriter)inOrder.verify((Object)commandResponseWriter, TIMEOUT.times(1))).tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldInvokeOnProcessedListener() {
        this.streamProcessorRule.startTypedStreamProcessor((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class)));
        long position = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        ArgumentCaptor processedCommandCaptor = ArgumentCaptor.forClass(TypedRecord.class);
        ((StreamProcessorListener)Mockito.verify((Object)this.streamProcessorRule.getMockStreamProcessorListener(), (VerificationMode)TIMEOUT)).onProcessed((TypedRecord)processedCommandCaptor.capture());
        Assertions.assertThat((long)((TypedRecord)processedCommandCaptor.getValue()).getPosition()).isEqualTo(position);
    }

    @Test
    public void shouldNotifyLifecycleListenersOnPauseAndResume() throws InterruptedException {
        final CountDownLatch pauseLatch = new CountDownLatch(1);
        final CountDownLatch resumeLatch = new CountDownLatch(1);
        StreamProcessor streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.withListener(new StreamProcessorLifecycleAware(){

            public void onPaused() {
                pauseLatch.countDown();
            }

            public void onResumed() {
                resumeLatch.countDown();
            }
        }));
        streamProcessor.pauseProcessing();
        streamProcessor.resumeProcessing();
        pauseLatch.await();
        resumeLatch.await();
        Assertions.assertThat((long)pauseLatch.getCount()).isEqualTo(0L);
        Assertions.assertThat((long)resumeLatch.getCount()).isEqualTo(0L);
    }

    @Test
    public void shouldResumeProcessMoreRecordsAfterPause() throws InterruptedException {
        final CountDownLatch pauseLatch = new CountDownLatch(1);
        final CountDownLatch resumeLatch = new CountDownLatch(1);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class);
        StreamProcessor streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor).withListener(new StreamProcessorLifecycleAware(){

            public void onPaused() {
                pauseLatch.countDown();
            }

            public void onResumed() {
                resumeLatch.countDown();
            }
        }));
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        streamProcessor.pauseProcessing();
        pauseLatch.await();
        long positionProcessedAfterResume = this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        streamProcessor.resumeProcessing();
        resumeLatch.await();
        ((StreamProcessorListener)Mockito.verify((Object)this.streamProcessorRule.getMockStreamProcessorListener(), (VerificationMode)TIMEOUT.times(2))).onProcessed((TypedRecord)ArgumentMatchers.any());
        Assertions.assertThat((long)this.streamProcessorRule.getLastSuccessfulProcessedRecordPosition()).isEqualTo(positionProcessedAfterResume);
    }

    @Test
    public void shouldNotOverwriteLastWrittenPositionIfNoFollowUpEvent() throws ExecutionException, InterruptedException {
        this.streamProcessorRule.startTypedStreamProcessor((processors, state) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

            public void processRecord(long position, TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter, Consumer<SideEffectProducer> sideEffect) {
                streamWriter.appendFollowUpEvent(record.getKey(), (Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING, (RecordValue)record.getValue());
            }
        }).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.CANCEL, (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class)));
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        long position = this.waitForActivatingEvent().getPosition();
        this.streamProcessorRule.writeCommand((Intent)ProcessInstanceIntent.CANCEL, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        ((StreamProcessorListener)Mockito.verify((Object)this.streamProcessorRule.getMockStreamProcessorListener(), (VerificationMode)TIMEOUT.times(2))).onProcessed((TypedRecord)ArgumentMatchers.any());
        long lastWrittenPos = (Long)this.streamProcessorRule.getStreamProcessor(0).getLastWrittenPositionAsync().get();
        Assertions.assertThat((long)lastWrittenPos).isEqualTo(position);
    }

    private Record<ProcessInstanceRecord> waitForActivatingEvent() {
        return (Record)((Optional)TestUtil.doRepeatedly(() -> this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING).findAny()).until(Optional::isPresent)).get();
    }
}

