package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.EmptyProcessingResult;
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.state.processing.DbKeyGenerator;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamPlatform;
import io.camunda.zeebe.engine.util.StreamPlatformExtension;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.util.exception.RecoverableException;
import java.util.List;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

@ExtendWith({StreamPlatformExtension.class})
/* loaded from: input_file:io/camunda/zeebe/streamprocessor/StreamProcessorTest.class */
public final class StreamProcessorTest {
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);
    private static final ProcessInstanceRecord RECORD = Records.processInstance(1);
    private StreamPlatform streamPlatform;

    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/StreamProcessorTest$RecordIntentMatcher.class */
    private static final class RecordIntentMatcher implements ArgumentMatcher<TypedRecord> {
        private final Intent toMatchIntent;

        private RecordIntentMatcher(Intent intent) {
            this.toMatchIntent = intent;
        }

        public boolean matches(TypedRecord typedRecord) {
            return this.toMatchIntent.equals(typedRecord.getIntent());
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/StreamProcessorTest$TestProcessor.class */
    private static final class TestProcessor implements RecordProcessor {
        RecordProcessorContext recordProcessorContext;
        ProcessingResult processingResult = EmptyProcessingResult.INSTANCE;
        ProcessingResult processingResultOnError = EmptyProcessingResult.INSTANCE;
        private Consumer<RecordProcessorContext> processingAction = recordProcessorContext -> {
        };
        private Consumer<RecordProcessorContext> onProcessingErrorAction = recordProcessorContext -> {
        };

        private TestProcessor() {
        }

        public void init(RecordProcessorContext recordProcessorContext) {
            this.recordProcessorContext = recordProcessorContext;
        }

        public boolean accepts(ValueType valueType) {
            return true;
        }

        public void replay(TypedRecord typedRecord) {
        }

        public ProcessingResult process(TypedRecord typedRecord, ProcessingResultBuilder processingResultBuilder) {
            this.processingAction.accept(this.recordProcessorContext);
            return this.processingResult;
        }

        public ProcessingResult onProcessingError(Throwable th, TypedRecord typedRecord, ProcessingResultBuilder processingResultBuilder) {
            this.onProcessingErrorAction.accept(this.recordProcessorContext);
            return this.processingResultOnError;
        }
    }

    @Test
    public void shouldCallStreamProcessorLifecycle() throws Exception {
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.pauseProcessing();
        this.streamPlatform.resumeProcessing();
        this.streamPlatform.closeStreamProcessor();
        StreamProcessorLifecycleAware mockProcessorLifecycleAware = this.streamPlatform.getMockProcessorLifecycleAware();
        InOrder inOrder = Mockito.inOrder(new Object[]{mockProcessorLifecycleAware});
        ((StreamProcessorLifecycleAware) inOrder.verify(mockProcessorLifecycleAware, TIMEOUT)).onRecovered((ReadonlyStreamProcessorContext) ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware) inOrder.verify(mockProcessorLifecycleAware, TIMEOUT)).onPaused();
        ((StreamProcessorLifecycleAware) inOrder.verify(mockProcessorLifecycleAware, TIMEOUT)).onResumed();
        ((StreamProcessorLifecycleAware) inOrder.verify(mockProcessorLifecycleAware, TIMEOUT)).onClose();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldCallStreamProcessorLifecycleOnFail() {
        StreamProcessorLifecycleAware mockProcessorLifecycleAware = this.streamPlatform.getMockProcessorLifecycleAware();
        ((StreamProcessorLifecycleAware) Mockito.doThrow(new Throwable[]{new RuntimeException("force fail")}).when(mockProcessorLifecycleAware)).onRecovered((ReadonlyStreamProcessorContext) ArgumentMatchers.any());
        this.streamPlatform.startStreamProcessor();
        ((StreamProcessorLifecycleAware) Mockito.verify(mockProcessorLifecycleAware, TIMEOUT)).onFailed();
    }

    @Test
    public void shouldCallRecordProcessorLifecycle() {
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder(new Object[]{defaultMockedRecordProcessor});
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).init((RecordProcessorContext) ArgumentMatchers.any());
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).accepts(ValueType.PROCESS_INSTANCE);
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).replay((TypedRecord) ArgumentMatchers.any());
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).accepts(ValueType.PROCESS_INSTANCE);
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldCallOnErrorWhenProcessingFails() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        RuntimeException runtimeException = new RuntimeException("processing error");
        ((RecordProcessor) Mockito.doThrow(new Throwable[]{runtimeException}).when(defaultMockedRecordProcessor)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder(new Object[]{defaultMockedRecordProcessor});
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).init((RecordProcessorContext) ArgumentMatchers.any());
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).accepts(ValueType.PROCESS_INSTANCE);
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).onProcessingError((Throwable) ArgumentMatchers.eq(runtimeException), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldLoopWhenOnErrorFails() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        RuntimeException runtimeException = new RuntimeException("processing error");
        ((RecordProcessor) Mockito.doThrow(new Throwable[]{runtimeException}).when(defaultMockedRecordProcessor)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((RecordProcessor) Mockito.doThrow(new Throwable[]{runtimeException}).when(defaultMockedRecordProcessor)).onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder(new Object[]{defaultMockedRecordProcessor});
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).init((RecordProcessorContext) ArgumentMatchers.any());
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).accepts(ValueType.PROCESS_INSTANCE);
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((RecordProcessor) inOrder.verify(defaultMockedRecordProcessor, TIMEOUT.atLeast(5))).onProcessingError((Throwable) ArgumentMatchers.eq(runtimeException), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldRetryProcessingRecordOnRecoverableException() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        ((RecordProcessor) Mockito.doThrow(new Throwable[]{new RecoverableException("processing error")}).when(defaultMockedRecordProcessor)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.atLeast(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotCallProcessWhenNotAcceptingRecord() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        Mockito.when(Boolean.valueOf(defaultMockedRecordProcessor.accepts((ValueType) ArgumentMatchers.any()))).thenReturn(false);
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT)).accepts((ValueType) ArgumentMatchers.any());
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, Mockito.never())).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldProcessOnlyCommands() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0), RecordToWrite.rejection().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(1))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldSetSourcePointerForFollowUpRecords() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.appendRecordReturnEither(1L, RecordType.EVENT, ProcessInstanceIntent.ACTIVATE_ELEMENT, RejectionType.NULL_VAL, "", Records.processInstance(1L));
        bufferedProcessingResultBuilder.appendRecordReturnEither(2L, RecordType.COMMAND, ProcessInstanceIntent.COMPLETE_ELEMENT, RejectionType.NULL_VAL, "", Records.processInstance(1L));
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.argThat(new RecordIntentMatcher(ProcessInstanceIntent.ACTIVATE_ELEMENT)), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        LogStreamReader newLogStreamReader = this.streamPlatform.getLogStream().newLogStreamReader();
        newLogStreamReader.seekToFirstEvent();
        LoggedEvent loggedEvent = (LoggedEvent) newLogStreamReader.next();
        AssertionsForClassTypes.assertThat(loggedEvent.getSourceEventPosition()).isEqualTo(-1L);
        long position = loggedEvent.getPosition();
        Awaitility.await("should write follow up events").untilAsserted(() -> {
            AssertionsForClassTypes.assertThat(newLogStreamReader.hasNext()).isTrue();
        });
        AssertionsForClassTypes.assertThat(newLogStreamReader.hasNext()).isTrue();
        AssertionsForClassTypes.assertThat(((LoggedEvent) newLogStreamReader.next()).getSourceEventPosition()).isEqualTo(position);
        AssertionsForClassTypes.assertThat(newLogStreamReader.hasNext()).isTrue();
        AssertionsForClassTypes.assertThat(((LoggedEvent) newLogStreamReader.next()).getSourceEventPosition()).isEqualTo(position);
    }

    @Test
    public void shouldExecutePostCommitTask() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        PostCommitTask postCommitTask = (PostCommitTask) Mockito.mock(PostCommitTask.class);
        Mockito.when(Boolean.valueOf(postCommitTask.flush())).thenReturn(true);
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.appendPostCommitTask(postCommitTask);
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((PostCommitTask) Mockito.verify(postCommitTask, TIMEOUT)).flush();
    }

    @Test
    public void shouldRepeatExecutePostCommitTask() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        PostCommitTask postCommitTask = (PostCommitTask) Mockito.mock(PostCommitTask.class);
        Mockito.when(Boolean.valueOf(postCommitTask.flush())).thenReturn(false);
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.appendPostCommitTask(postCommitTask);
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((PostCommitTask) Mockito.verify(postCommitTask, TIMEOUT.atLeast(5))).flush();
    }

    @Test
    public void shouldNotRepeatPostCommitOnException() throws Exception {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        PostCommitTask postCommitTask = (PostCommitTask) Mockito.mock(PostCommitTask.class);
        Mockito.when(Boolean.valueOf(postCommitTask.flush())).thenThrow(new Throwable[]{new RuntimeException("expected")});
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.appendPostCommitTask(postCommitTask);
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build()).thenReturn(EmptyProcessingResult.INSTANCE);
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((PostCommitTask) Mockito.verify(postCommitTask, TIMEOUT.times(1))).flush();
    }

    @Test
    public void shouldUpdateStateOnSuccessfulProcessing() {
        TestProcessor testProcessor = (TestProcessor) Mockito.spy(new TestProcessor());
        testProcessor.processingAction = recordProcessorContext -> {
            DbKeyGenerator dbKeyGenerator = new DbKeyGenerator(1, recordProcessorContext.getZeebeDb(), recordProcessorContext.getTransactionContext());
            dbKeyGenerator.nextKey();
            dbKeyGenerator.nextKey();
            dbKeyGenerator.nextKey();
        };
        testProcessor.processingResult = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        }).appendPostCommitTask(() -> {
            return true;
        }).build();
        ((TestProcessor) Mockito.doCallRealMethod().doReturn(EmptyProcessingResult.INSTANCE).when(testProcessor)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        this.streamPlatform.withRecordProcessors(List.of(testProcessor)).startStreamProcessor();
        ZeebeDb zeebeDb = testProcessor.recordProcessorContext.getZeebeDb();
        DbKeyGenerator dbKeyGenerator = new DbKeyGenerator(1, zeebeDb, zeebeDb.createContext());
        long nextKey = dbKeyGenerator.nextKey();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((TestProcessor) Mockito.verify(testProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        AssertionsForClassTypes.assertThat(dbKeyGenerator.nextKey()).isEqualTo(nextKey + 4);
    }

    @Test
    public void shouldNotUpdateStateOnExceptionInProcessing() {
        TestProcessor testProcessor = (TestProcessor) Mockito.spy(new TestProcessor());
        testProcessor.processingAction = recordProcessorContext -> {
            DbKeyGenerator dbKeyGenerator = new DbKeyGenerator(1, recordProcessorContext.getZeebeDb(), recordProcessorContext.getTransactionContext());
            dbKeyGenerator.nextKey();
            dbKeyGenerator.nextKey();
            dbKeyGenerator.nextKey();
            throw new RuntimeException("expected");
        };
        testProcessor.processingResult = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        }).build();
        ((TestProcessor) Mockito.doCallRealMethod().doReturn(EmptyProcessingResult.INSTANCE).when(testProcessor)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        this.streamPlatform.withRecordProcessors(List.of(testProcessor)).startStreamProcessor();
        ZeebeDb zeebeDb = testProcessor.recordProcessorContext.getZeebeDb();
        DbKeyGenerator dbKeyGenerator = new DbKeyGenerator(1, zeebeDb, zeebeDb.createContext());
        long nextKey = dbKeyGenerator.nextKey();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((TestProcessor) Mockito.verify(testProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        AssertionsForClassTypes.assertThat(dbKeyGenerator.nextKey()).isEqualTo(nextKey + 1);
    }

    @Test
    public void shouldUpdateStateOnProcessingErrorCall() {
        TestProcessor testProcessor = (TestProcessor) Mockito.spy(new TestProcessor());
        testProcessor.processingAction = recordProcessorContext -> {
            throw new RuntimeException("expected");
        };
        testProcessor.onProcessingErrorAction = recordProcessorContext2 -> {
            DbKeyGenerator dbKeyGenerator = new DbKeyGenerator(1, recordProcessorContext2.getZeebeDb(), recordProcessorContext2.getTransactionContext());
            dbKeyGenerator.nextKey();
            dbKeyGenerator.nextKey();
            dbKeyGenerator.nextKey();
        };
        ((TestProcessor) Mockito.doCallRealMethod().doReturn(EmptyProcessingResult.INSTANCE).when(testProcessor)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        this.streamPlatform.withRecordProcessors(List.of(testProcessor)).startStreamProcessor();
        ZeebeDb zeebeDb = testProcessor.recordProcessorContext.getZeebeDb();
        DbKeyGenerator dbKeyGenerator = new DbKeyGenerator(1, zeebeDb, zeebeDb.createContext());
        long nextKey = dbKeyGenerator.nextKey();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((TestProcessor) Mockito.verify(testProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        AssertionsForClassTypes.assertThat(dbKeyGenerator.nextKey()).isEqualTo(nextKey + 4);
    }

    @Test
    public void shouldNotUpdateStateOnExceptionOnProcessingErrorCall() {
        TestProcessor testProcessor = (TestProcessor) Mockito.spy(new TestProcessor());
        testProcessor.processingAction = recordProcessorContext -> {
            throw new RuntimeException("expected");
        };
        testProcessor.onProcessingErrorAction = recordProcessorContext2 -> {
            DbKeyGenerator dbKeyGenerator = new DbKeyGenerator(1, recordProcessorContext2.getZeebeDb(), recordProcessorContext2.getTransactionContext());
            dbKeyGenerator.nextKey();
            dbKeyGenerator.nextKey();
            dbKeyGenerator.nextKey();
            throw new RuntimeException("expected");
        };
        ((TestProcessor) Mockito.doCallRealMethod().doReturn(EmptyProcessingResult.INSTANCE).when(testProcessor)).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((TestProcessor) Mockito.doCallRealMethod().doReturn(EmptyProcessingResult.INSTANCE).when(testProcessor)).onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        this.streamPlatform.withRecordProcessors(List.of(testProcessor)).startStreamProcessor();
        ZeebeDb zeebeDb = testProcessor.recordProcessorContext.getZeebeDb();
        DbKeyGenerator dbKeyGenerator = new DbKeyGenerator(1, zeebeDb, zeebeDb.createContext());
        long nextKey = dbKeyGenerator.nextKey();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((TestProcessor) Mockito.verify(testProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        AssertionsForClassTypes.assertThat(dbKeyGenerator.nextKey()).isEqualTo(nextKey + 1);
    }

    @Test
    public void shouldWriteResponse() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.withResponse(RecordType.EVENT, 3L, ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD, ValueType.PROCESS_INSTANCE, RejectionType.NULL_VAL, "", 1L, 12);
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build()).thenReturn(EmptyProcessingResult.INSTANCE);
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        CommandResponseWriter mockCommandResponseWriter = this.streamPlatform.getMockCommandResponseWriter();
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).key(3L);
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).intent(ProcessInstanceIntent.ELEMENT_ACTIVATING);
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).recordType(RecordType.EVENT);
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).valueType(ValueType.PROCESS_INSTANCE);
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldWriteResponseOnFailedEventProcessing() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new RuntimeException()}).thenReturn(EmptyProcessingResult.INSTANCE);
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.withResponse(RecordType.EVENT, 3L, ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD, ValueType.PROCESS_INSTANCE, RejectionType.NULL_VAL, "", 1L, 12);
        Mockito.when(defaultMockedRecordProcessor.onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build()).thenReturn(EmptyProcessingResult.INSTANCE);
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(1))).onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        CommandResponseWriter mockCommandResponseWriter = this.streamPlatform.getMockCommandResponseWriter();
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).key(3L);
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).intent(ProcessInstanceIntent.ELEMENT_ACTIVATING);
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).recordType(RecordType.EVENT);
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).valueType(ValueType.PROCESS_INSTANCE);
        ((CommandResponseWriter) Mockito.verify(mockCommandResponseWriter, TIMEOUT.times(1))).tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldInvokeOnProcessedListenerWhenReturnResult() {
        StreamProcessorListener mockStreamProcessorListener = this.streamPlatform.getMockStreamProcessorListener();
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.appendPostCommitTask(() -> {
            return true;
        });
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((StreamProcessorListener) Mockito.verify(mockStreamProcessorListener, TIMEOUT.times(2))).onProcessed((TypedRecord) ArgumentMatchers.any());
    }

    @Test
    public void shouldInvokeSkippedOnProcessedListenerWhenReturnEmptyResult() {
        StreamProcessorListener mockStreamProcessorListener = this.streamPlatform.getMockStreamProcessorListener();
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        }).build());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((StreamProcessorListener) Mockito.verify(mockStreamProcessorListener, TIMEOUT.times(2))).onSkipped((LoggedEvent) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotSkipWhenResultContainsTaskOnly() {
        StreamProcessorListener mockStreamProcessorListener = this.streamPlatform.getMockStreamProcessorListener();
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.appendPostCommitTask(() -> {
            return true;
        });
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((StreamProcessorListener) Mockito.verify(mockStreamProcessorListener, TIMEOUT.times(0))).onSkipped((LoggedEvent) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotSkipWhenResultContainsRecordOnly() {
        StreamProcessorListener mockStreamProcessorListener = this.streamPlatform.getMockStreamProcessorListener();
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.appendRecordReturnEither(1L, RecordType.EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATING, RejectionType.NULL_VAL, "", Records.processInstance(1L));
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(LoggedEvent.class);
        ((StreamProcessorListener) Mockito.verify(mockStreamProcessorListener, TIMEOUT.times(2))).onSkipped((LoggedEvent) forClass.capture());
        Assertions.assertThat(forClass.getAllValues()).extracting(loggedEvent -> {
            RecordMetadata recordMetadata = new RecordMetadata();
            loggedEvent.readMetadata(recordMetadata);
            return recordMetadata.getRecordType();
        }).containsOnly(new RecordType[]{RecordType.EVENT});
    }

    @Test
    public void shouldNotSkipWhenResultContainsResponseOnly() {
        StreamProcessorListener mockStreamProcessorListener = this.streamPlatform.getMockStreamProcessorListener();
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.withResponse(RecordType.EVENT, 1L, ProcessInstanceIntent.ELEMENT_ACTIVATING, Records.processInstance(1L), ValueType.PROCESS_INSTANCE, RejectionType.NULL_VAL, "", -1L, -1);
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(bufferedProcessingResultBuilder.build());
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((StreamProcessorListener) Mockito.verify(mockStreamProcessorListener, TIMEOUT.times(0))).onSkipped((LoggedEvent) ArgumentMatchers.any());
    }

    @Test
    public void shouldPauseProcessing() {
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.pauseProcessing();
        StreamProcessorLifecycleAware mockProcessorLifecycleAware = this.streamPlatform.getMockProcessorLifecycleAware();
        ((StreamProcessorLifecycleAware) Mockito.verify(mockProcessorLifecycleAware, TIMEOUT)).onRecovered((ReadonlyStreamProcessorContext) ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware) Mockito.verify(mockProcessorLifecycleAware, TIMEOUT)).onPaused();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((RecordProcessor) Mockito.verify(this.streamPlatform.getDefaultMockedRecordProcessor(), Mockito.never())).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldResumeProcessing() {
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.pauseProcessing();
        StreamProcessorLifecycleAware mockProcessorLifecycleAware = this.streamPlatform.getMockProcessorLifecycleAware();
        ((StreamProcessorLifecycleAware) Mockito.verify(mockProcessorLifecycleAware, TIMEOUT)).onRecovered((ReadonlyStreamProcessorContext) ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware) Mockito.verify(mockProcessorLifecycleAware, TIMEOUT)).onPaused();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, Mockito.never())).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        this.streamPlatform.resumeProcessing();
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotUpdateLastWrittenPositionWhenSkipped() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        Awaitility.await("Last written position should be updated").untilAsserted(() -> {
            AssertionsForClassTypes.assertThat((Long) this.streamPlatform.getStreamProcessor().getLastWrittenPositionAsync().join()).isEqualTo(-1L);
        });
    }
}
