package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.engine.util.MockTypedRecord;
import io.camunda.zeebe.engine.util.RecordStream;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.ErrorIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.stream.api.CommandResponseWriter;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.state.KeyGenerator;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.InstantSource;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/EngineErrorHandlingTest.class */
public final class EngineErrorHandlingTest {
    private static final String STREAM_NAME = "foo";
    private final TemporaryFolder tempFolder = new TemporaryFolder();
    private final AutoCloseableRule closeables = new AutoCloseableRule();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.tempFolder).around(this.actorSchedulerRule).around(this.closeables);
    private TestStreams streams;
    private KeyGenerator keyGenerator;
    private CommandResponseWriter mockCommandResponseWriter;
    private MutableProcessingState processingState;

    /* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/EngineErrorHandlingTest$DumpProcessor.class */
    protected static class DumpProcessor implements TypedRecordProcessor<ProcessInstanceRecord> {
        final List<Long> processedInstances = new ArrayList();
        private final StateWriter stateWriter;

        public DumpProcessor(Writers writers) {
            this.stateWriter = writers.state();
        }

        public void processRecord(TypedRecord<ProcessInstanceRecord> typedRecord) {
            this.processedInstances.add(Long.valueOf(typedRecord.getValue().getProcessInstanceKey()));
            this.stateWriter.appendFollowUpEvent(typedRecord.getKey(), ProcessInstanceIntent.ELEMENT_COMPLETED, typedRecord.getValue());
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/EngineErrorHandlingTest$ErrorProneProcessor.class */
    protected static class ErrorProneProcessor implements TypedRecordProcessor<ProcessInstanceRecord> {
        public final AtomicLong processCount = new AtomicLong(0);

        protected ErrorProneProcessor() {
        }

        public void processRecord(TypedRecord<ProcessInstanceRecord> typedRecord) {
            this.processCount.incrementAndGet();
            throw new RuntimeException("expected");
        }

        public long getProcessCount() {
            return this.processCount.get();
        }
    }

    @Before
    public void setUp() {
        this.streams = new TestStreams(this.tempFolder, this.closeables, this.actorSchedulerRule.get(), InstantSource.system());
        this.mockCommandResponseWriter = this.streams.getMockedResponseWriter();
        this.streams.createLogStream(STREAM_NAME);
        AtomicLong atomicLong = new AtomicLong();
        this.keyGenerator = () -> {
            return atomicLong.getAndIncrement();
        };
    }

    @Test
    public void shouldAutoRejectCommandOnProcessingFailure() {
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), typedRecordProcessorContext -> {
            return TypedRecordProcessors.processors(this.keyGenerator, typedRecordProcessorContext.getWriters()).onCommand(ValueType.DEPLOYMENT, DeploymentIntent.CREATE, new TypedRecordProcessor<DeploymentRecord>(this) { // from class: io.camunda.zeebe.engine.processing.streamprocessor.EngineErrorHandlingTest.1
                public void processRecord(TypedRecord<DeploymentRecord> typedRecord) {
                    if (typedRecord.getKey() == 0) {
                        throw new RuntimeException("expected");
                    }
                    typedRecordProcessorContext.getWriters().state().appendFollowUpEvent(typedRecord.getKey(), DeploymentIntent.CREATED, typedRecord.getValue());
                }
            });
        });
        long nextKey = this.keyGenerator.nextKey();
        this.streams.newRecord(STREAM_NAME).event(deployment(STREAM_NAME)).recordType(RecordType.COMMAND).intent(DeploymentIntent.CREATE).requestId(255L).requestStreamId(99).key(nextKey).write();
        long write = this.streams.newRecord(STREAM_NAME).event(deployment("foo2")).recordType(RecordType.COMMAND).intent(DeploymentIntent.CREATE).key(this.keyGenerator.nextKey()).write();
        LoggedEvent loggedEvent = (LoggedEvent) ((Optional) TestUtil.doRepeatedly(() -> {
            return this.streams.events(STREAM_NAME).filter(loggedEvent2 -> {
                return Records.isEvent(loggedEvent2, ValueType.DEPLOYMENT, DeploymentIntent.CREATED);
            }).findFirst();
        }).until(optional -> {
            return Boolean.valueOf(optional.isPresent());
        })).get();
        Assertions.assertThat(loggedEvent.getKey()).isEqualTo(1L);
        Assertions.assertThat(loggedEvent.getSourceEventPosition()).isEqualTo(write);
        ((CommandResponseWriter) Mockito.verify(this.mockCommandResponseWriter)).tryWriteResponse(ArgumentMatchers.eq(99), ArgumentMatchers.eq(255L));
        Record record = (Record) new RecordStream(this.streams.events(STREAM_NAME)).onlyDeploymentRecords().onlyRejections().withIntent(DeploymentIntent.CREATE).getFirst();
        Assertions.assertThat(record.getKey()).isEqualTo(nextKey);
        Assertions.assertThat(record.getRejectionType()).isEqualTo(RejectionType.PROCESSING_ERROR);
    }

    DeploymentRecord deployment(String str) {
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        ((DeploymentResource) deploymentRecord.resources().add()).setResource(BufferUtil.wrapString(STREAM_NAME)).setResourceName(BufferUtil.wrapString(str));
        return deploymentRecord;
    }

    @Test
    public void shouldWriteErrorEvent() {
        ErrorProneProcessor errorProneProcessor = new ErrorProneProcessor();
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), typedRecordProcessorContext -> {
            this.processingState = typedRecordProcessorContext.getProcessingState();
            return TypedRecordProcessors.processors(this.processingState.getKeyGenerator(), typedRecordProcessorContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, errorProneProcessor);
        });
        long write = this.streams.newRecord(STREAM_NAME).event(Records.processInstance(1L)).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        waitForRecordWhichSatisfies(loggedEvent -> {
            return Records.isEvent(loggedEvent, ValueType.ERROR, ErrorIntent.CREATED);
        });
        Assertions.assertThat(errorProneProcessor.getProcessCount()).isEqualTo(1L);
        ErrorRecord value = ((Record) new RecordStream(this.streams.events(STREAM_NAME)).onlyErrorRecords().getFirst()).getValue();
        Assertions.assertThat(value.getErrorEventPosition()).isEqualTo(write);
        Assertions.assertThat(BufferUtil.bufferAsString(value.getExceptionMessageBuffer())).isEqualTo("expected");
        Assertions.assertThat(value.getProcessInstanceKey()).isEqualTo(1L);
    }

    @Test
    public void shouldWriteErrorEventWithNoMessage() {
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), typedRecordProcessorContext -> {
            this.processingState = typedRecordProcessorContext.getProcessingState();
            return TypedRecordProcessors.processors(this.processingState.getKeyGenerator(), typedRecordProcessorContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, new TypedRecordProcessor<UnifiedRecordValue>(this) { // from class: io.camunda.zeebe.engine.processing.streamprocessor.EngineErrorHandlingTest.2
                public void processRecord(TypedRecord<UnifiedRecordValue> typedRecord) {
                    throw new NullPointerException();
                }
            });
        });
        long write = this.streams.newRecord(STREAM_NAME).event(Records.processInstance(1L)).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        waitForRecordWhichSatisfies(loggedEvent -> {
            return Records.isEvent(loggedEvent, ValueType.ERROR, ErrorIntent.CREATED);
        });
        ErrorRecord value = ((Record) new RecordStream(this.streams.events(STREAM_NAME)).onlyErrorRecords().getFirst()).getValue();
        Assertions.assertThat(value.getErrorEventPosition()).isEqualTo(write);
        Assertions.assertThat(BufferUtil.bufferAsString(value.getExceptionMessageBuffer())).isEqualTo("Without exception message.");
        Assertions.assertThat(value.getProcessInstanceKey()).isEqualTo(1L);
    }

    @Test
    public void shouldBanInstance() {
        AtomicReference atomicReference = new AtomicReference();
        ErrorProneProcessor errorProneProcessor = new ErrorProneProcessor();
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), typedRecordProcessorContext -> {
            atomicReference.set((DumpProcessor) Mockito.spy(new DumpProcessor(typedRecordProcessorContext.getWriters())));
            this.processingState = typedRecordProcessorContext.getProcessingState();
            return TypedRecordProcessors.processors(this.processingState.getKeyGenerator(), typedRecordProcessorContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, errorProneProcessor).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.COMPLETE_ELEMENT, (TypedRecordProcessor) atomicReference.get());
        });
        this.streams.newRecord(STREAM_NAME).event(Records.processInstance(1L)).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(Records.processInstance(1L)).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.COMPLETE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(Records.processInstance(2L)).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.COMPLETE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        waitForRecordWhichSatisfies(loggedEvent -> {
            return Records.isEvent(loggedEvent, ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETED);
        });
        Assertions.assertThat(errorProneProcessor.getProcessCount()).isEqualTo(1L);
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.valueType(ValueType.PROCESS_INSTANCE);
        Assertions.assertThat(this.processingState.getBannedInstanceState().isBanned(new MockTypedRecord(0L, recordMetadata, Records.processInstance(1L)))).isTrue();
        ((DumpProcessor) Mockito.verify((DumpProcessor) atomicReference.get(), Mockito.times(1))).processRecord((TypedRecord) ArgumentMatchers.any());
        Assertions.assertThat(((DumpProcessor) atomicReference.get()).processedInstances).containsExactly(new Long[]{2L});
    }

    @Test
    public void shouldBanInstanceOnReplay() throws Exception {
        long write = this.streams.newRecord(STREAM_NAME).event(Records.processInstance(1L)).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(Records.error((int) Records.processInstance(1L).getProcessInstanceKey(), write)).recordType(RecordType.EVENT).sourceRecordPosition(write).intent(ErrorIntent.CREATED).key(this.keyGenerator.nextKey()).write();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), typedRecordProcessorContext -> {
            this.processingState = typedRecordProcessorContext.getProcessingState();
            return TypedRecordProcessors.processors(this.processingState.getKeyGenerator(), typedRecordProcessorContext.getWriters()).withListener(new StreamProcessorLifecycleAware(this) { // from class: io.camunda.zeebe.engine.processing.streamprocessor.EngineErrorHandlingTest.3
                public void onRecovered(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
                    countDownLatch.countDown();
                }
            }).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, new DumpProcessor(typedRecordProcessorContext.getWriters()));
        });
        countDownLatch.await(2000L, TimeUnit.MILLISECONDS);
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.valueType(ValueType.PROCESS_INSTANCE);
        MockTypedRecord mockTypedRecord = new MockTypedRecord(0L, recordMetadata, Records.processInstance(1L));
        TestUtil.waitUntil(() -> {
            return this.processingState.getBannedInstanceState().isBanned(mockTypedRecord);
        });
    }

    @Test
    public void shouldNotBanInstanceOnJobCommand() {
        ArrayList arrayList = new ArrayList();
        TypedRecordProcessor<JobRecord> typedRecordProcessor = new TypedRecordProcessor<JobRecord>(this) { // from class: io.camunda.zeebe.engine.processing.streamprocessor.EngineErrorHandlingTest.4
            public void processRecord(TypedRecord<JobRecord> typedRecord) {
                throw new RuntimeException("expected");
            }
        };
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), typedRecordProcessorContext -> {
            this.processingState = typedRecordProcessorContext.getProcessingState();
            return TypedRecordProcessors.processors(this.processingState.getKeyGenerator(), typedRecordProcessorContext.getWriters()).onCommand(ValueType.JOB, JobIntent.COMPLETE, typedRecordProcessor).onCommand(ValueType.JOB, JobIntent.THROW_ERROR, new TypedRecordProcessor<JobRecord>(this) { // from class: io.camunda.zeebe.engine.processing.streamprocessor.EngineErrorHandlingTest.5
                public void processRecord(TypedRecord<JobRecord> typedRecord) {
                    arrayList.add(Long.valueOf(typedRecord.getValue().getProcessInstanceKey()));
                    typedRecordProcessorContext.getWriters().command().appendFollowUpCommand(typedRecord.getKey(), ProcessInstanceIntent.COMPLETE_ELEMENT, Records.processInstance((int) typedRecord.getValue().getProcessInstanceKey()));
                }
            });
        });
        this.streams.newRecord(STREAM_NAME).event(Records.job(1L)).recordType(RecordType.COMMAND).intent(JobIntent.COMPLETE).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(Records.job(1L)).recordType(RecordType.COMMAND).intent(JobIntent.THROW_ERROR).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(Records.job(2L)).recordType(RecordType.COMMAND).intent(JobIntent.THROW_ERROR).key(this.keyGenerator.nextKey()).write();
        TestUtil.doRepeatedly(() -> {
            return this.streams.events(STREAM_NAME).filter(loggedEvent -> {
                return Records.isCommand(loggedEvent, ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.COMPLETE_ELEMENT);
            }).toList();
        }).until(list -> {
            return Boolean.valueOf(list.size() == 2);
        });
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.valueType(ValueType.PROCESS_INSTANCE);
        Assertions.assertThat(this.processingState.getBannedInstanceState().isBanned(new MockTypedRecord(0L, recordMetadata, Records.processInstance(1L)))).isFalse();
        Assertions.assertThat(arrayList).containsExactly(new Long[]{1L, 2L});
    }

    @Test
    public void shouldNotBanInstanceAndIgnoreTimerStartEvents() {
        ArrayList arrayList = new ArrayList();
        BpmnModelInstance done = Bpmn.createExecutableProcess("process").startEvent().timerWithDuration("PT1S").endEvent().done();
        UnifiedRecordValue deploymentRecord = new DeploymentRecord();
        ((DeploymentResource) deploymentRecord.resources().add()).setResourceName("process.bpmn").setResource(Bpmn.convertToString(done).getBytes());
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), typedRecordProcessorContext -> {
            this.processingState = typedRecordProcessorContext.getProcessingState();
            return TypedRecordProcessors.processors(this.processingState.getKeyGenerator(), typedRecordProcessorContext.getWriters()).onCommand(ValueType.DEPLOYMENT, DeploymentIntent.CREATE, new TypedRecordProcessor<DeploymentRecord>(this) { // from class: io.camunda.zeebe.engine.processing.streamprocessor.EngineErrorHandlingTest.6
                public void processRecord(TypedRecord<DeploymentRecord> typedRecord) {
                    if (typedRecord.getKey() == 0) {
                        throw new RuntimeException("expected");
                    }
                    arrayList.add(-1L);
                    typedRecordProcessorContext.getWriters().state().appendFollowUpEvent(typedRecord.getKey(), TimerIntent.CREATED, Records.timer(-1L));
                }
            });
        });
        this.streams.newRecord(STREAM_NAME).event(deploymentRecord).recordType(RecordType.COMMAND).intent(DeploymentIntent.CREATE).key(0L).write();
        this.streams.newRecord(STREAM_NAME).event(deploymentRecord).recordType(RecordType.COMMAND).intent(DeploymentIntent.CREATE).key(1L).write();
        waitForRecordWhichSatisfies(loggedEvent -> {
            return Records.isEvent(loggedEvent, ValueType.TIMER, TimerIntent.CREATED);
        });
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.valueType(ValueType.TIMER);
        Assertions.assertThat(this.processingState.getBannedInstanceState().isBanned(new MockTypedRecord(0L, recordMetadata, Records.timer(-1L)))).isFalse();
        Assertions.assertThat(arrayList).containsExactly(new Long[]{-1L});
    }

    private void waitForRecordWhichSatisfies(Predicate<LoggedEvent> predicate) {
        ((Optional) TestUtil.doRepeatedly(() -> {
            return this.streams.events(STREAM_NAME).filter(predicate).findFirst();
        }).until(optional -> {
            return Boolean.valueOf(optional.isPresent());
        })).get();
    }
}
