package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.util.MockTypedRecord;
import io.camunda.zeebe.engine.util.RecordStream;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.ErrorIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/SkipFailingEventsTest.class */
public final class SkipFailingEventsTest {
    private static final String STREAM_NAME = "foo";
    private static final ProcessInstanceRecord PROCESS_INSTANCE_RECORD = Records.processInstance(1);
    private static final JobRecord JOB_RECORD = Records.job(1);
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    public final AutoCloseableRule closeables = new AutoCloseableRule();
    public final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.tempFolder).around(this.actorSchedulerRule).around(this.closeables);
    protected TestStreams streams;
    protected SynchronousLogStream stream;

    @Mock
    protected CommandResponseWriter commandResponseWriter;
    private KeyGenerator keyGenerator;
    private MutableZeebeState zeebeState;

    /* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/SkipFailingEventsTest$DumpProcessor.class */
    protected static class DumpProcessor implements TypedRecordProcessor<ProcessInstanceRecord> {
        final List<Long> processedInstances = new ArrayList();

        protected DumpProcessor() {
        }

        public void processRecord(TypedRecord<ProcessInstanceRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
            this.processedInstances.add(Long.valueOf(typedRecord.getValue().getProcessInstanceKey()));
            typedStreamWriter.appendFollowUpEvent(typedRecord.getKey(), ProcessInstanceIntent.ELEMENT_COMPLETED, typedRecord.getValue());
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/SkipFailingEventsTest$ErrorProneProcessor.class */
    protected static class ErrorProneProcessor implements TypedRecordProcessor<ProcessInstanceRecord> {
        public final AtomicLong processCount = new AtomicLong(0);

        protected ErrorProneProcessor() {
        }

        public void processRecord(TypedRecord<ProcessInstanceRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
            this.processCount.incrementAndGet();
            throw new RuntimeException("expected");
        }

        public long getProcessCount() {
            return this.processCount.get();
        }
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        this.streams = new TestStreams(this.tempFolder, this.closeables, this.actorSchedulerRule.get());
        this.commandResponseWriter = this.streams.getMockedResponseWriter();
        this.stream = this.streams.createLogStream(STREAM_NAME);
        AtomicLong atomicLong = new AtomicLong();
        this.keyGenerator = () -> {
            return atomicLong.getAndIncrement();
        };
    }

    @Test
    public void shouldWriteErrorEvent() {
        ErrorProneProcessor errorProneProcessor = new ErrorProneProcessor();
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors(this.zeebeState.getKeyGenerator(), processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, errorProneProcessor);
        });
        long write = this.streams.newRecord(STREAM_NAME).event(PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        waitForRecordWhichSatisfies(loggedEvent -> {
            return Records.isEvent(loggedEvent, ValueType.ERROR, ErrorIntent.CREATED);
        });
        Assertions.assertThat(errorProneProcessor.getProcessCount()).isEqualTo(1L);
        ErrorRecord value = ((Record) new RecordStream(this.streams.events(STREAM_NAME)).onlyErrorRecords().getFirst()).getValue();
        Assertions.assertThat(value.getErrorEventPosition()).isEqualTo(write);
        Assertions.assertThat(BufferUtil.bufferAsString(value.getExceptionMessageBuffer())).isEqualTo("expected");
        Assertions.assertThat(value.getProcessInstanceKey()).isEqualTo(1L);
    }

    @Test
    public void shouldWriteErrorEventWithNoMessage() {
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors(this.zeebeState.getKeyGenerator(), processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.SkipFailingEventsTest.1
                public void processRecord(TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
                    throw new NullPointerException();
                }
            });
        });
        long write = this.streams.newRecord(STREAM_NAME).event(PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        waitForRecordWhichSatisfies(loggedEvent -> {
            return Records.isEvent(loggedEvent, ValueType.ERROR, ErrorIntent.CREATED);
        });
        ErrorRecord value = ((Record) new RecordStream(this.streams.events(STREAM_NAME)).onlyErrorRecords().getFirst()).getValue();
        Assertions.assertThat(value.getErrorEventPosition()).isEqualTo(write);
        Assertions.assertThat(BufferUtil.bufferAsString(value.getExceptionMessageBuffer())).isEqualTo("Without exception message.");
        Assertions.assertThat(value.getProcessInstanceKey()).isEqualTo(1L);
    }

    @Test
    public void shouldBlacklistInstance() {
        DumpProcessor dumpProcessor = (DumpProcessor) Mockito.spy(new DumpProcessor());
        ErrorProneProcessor errorProneProcessor = new ErrorProneProcessor();
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors(this.zeebeState.getKeyGenerator(), processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, errorProneProcessor).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.COMPLETE_ELEMENT, dumpProcessor);
        });
        this.streams.newRecord(STREAM_NAME).event(PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.COMPLETE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(Records.processInstance(2L)).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.COMPLETE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        waitForRecordWhichSatisfies(loggedEvent -> {
            return Records.isEvent(loggedEvent, ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETED);
        });
        Assertions.assertThat(errorProneProcessor.getProcessCount()).isEqualTo(1L);
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.valueType(ValueType.PROCESS_INSTANCE);
        Assertions.assertThat(this.zeebeState.getBlackListState().isOnBlacklist(new MockTypedRecord(0L, recordMetadata, PROCESS_INSTANCE_RECORD))).isTrue();
        ((DumpProcessor) Mockito.verify(dumpProcessor, Mockito.times(1))).processRecord((TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        Assertions.assertThat(dumpProcessor.processedInstances).containsExactly(new Long[]{2L});
    }

    @Test
    @Ignore("will be fixed by #7429")
    public void shouldBacklistInstanceOnReplay() throws Exception {
        Mockito.when(Boolean.valueOf(this.commandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong()))).thenReturn(true);
        long write = this.streams.newRecord(STREAM_NAME).event(PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent(ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(Records.error((int) PROCESS_INSTANCE_RECORD.getProcessInstanceKey(), write)).recordType(RecordType.EVENT).sourceRecordPosition(write).intent(ErrorIntent.CREATED).key(this.keyGenerator.nextKey()).write();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors(this.zeebeState.getKeyGenerator(), processingContext.getWriters()).withListener(new StreamProcessorLifecycleAware() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.SkipFailingEventsTest.2
                public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
                    countDownLatch.countDown();
                }
            }).onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, new DumpProcessor());
        });
        countDownLatch.await(2000L, TimeUnit.MILLISECONDS);
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.valueType(ValueType.PROCESS_INSTANCE);
        MockTypedRecord mockTypedRecord = new MockTypedRecord(0L, recordMetadata, PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> {
            return this.zeebeState.getBlackListState().isOnBlacklist(mockTypedRecord);
        });
    }

    @Test
    public void shouldNotBlacklistInstanceOnJobCommand() {
        Mockito.when(Boolean.valueOf(this.commandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong()))).thenReturn(true);
        final ArrayList arrayList = new ArrayList();
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.spy(new TypedRecordProcessor<JobRecord>() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.SkipFailingEventsTest.3
            public void processRecord(TypedRecord<JobRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
                arrayList.add(Long.valueOf(typedRecord.getValue().getProcessInstanceKey()));
                typedStreamWriter.appendFollowUpCommand(typedRecord.getKey(), ProcessInstanceIntent.COMPLETE_ELEMENT, Records.processInstance((int) typedRecord.getValue().getProcessInstanceKey()));
            }
        });
        TypedRecordProcessor<JobRecord> typedRecordProcessor2 = new TypedRecordProcessor<JobRecord>() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.SkipFailingEventsTest.4
            public void processRecord(TypedRecord<JobRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
                throw new RuntimeException("expected");
            }
        };
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors(this.zeebeState.getKeyGenerator(), processingContext.getWriters()).onCommand(ValueType.JOB, JobIntent.COMPLETE, typedRecordProcessor2).onCommand(ValueType.JOB, JobIntent.THROW_ERROR, typedRecordProcessor);
        });
        this.streams.newRecord(STREAM_NAME).event(JOB_RECORD).recordType(RecordType.COMMAND).intent(JobIntent.COMPLETE).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(JOB_RECORD).recordType(RecordType.COMMAND).intent(JobIntent.THROW_ERROR).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event(Records.job(2L)).recordType(RecordType.COMMAND).intent(JobIntent.THROW_ERROR).key(this.keyGenerator.nextKey()).write();
        waitForRecordWhichSatisfies(loggedEvent -> {
            return Records.isCommand(loggedEvent, ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.COMPLETE_ELEMENT);
        });
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.valueType(ValueType.PROCESS_INSTANCE);
        Assertions.assertThat(this.zeebeState.getBlackListState().isOnBlacklist(new MockTypedRecord(0L, recordMetadata, PROCESS_INSTANCE_RECORD))).isFalse();
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, Mockito.timeout(1000L).times(2))).processRecord((TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        Assertions.assertThat(arrayList).containsExactly(new Long[]{1L, 2L});
    }

    @Test
    public void shouldNotBlacklistInstanceAndIgnoreTimerStartEvents() {
        Mockito.when(Boolean.valueOf(this.commandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong()))).thenReturn(true);
        final ArrayList arrayList = new ArrayList();
        TypedRecordProcessor<DeploymentRecord> typedRecordProcessor = new TypedRecordProcessor<DeploymentRecord>() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.SkipFailingEventsTest.5
            public void processRecord(TypedRecord<DeploymentRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
                if (typedRecord.getKey() == 0) {
                    throw new RuntimeException("expected");
                }
                arrayList.add(-1L);
                typedStreamWriter.appendFollowUpEvent(typedRecord.getKey(), TimerIntent.CREATED, Records.timer(-1L));
            }
        };
        BpmnModelInstance done = Bpmn.createExecutableProcess("process").startEvent().timerWithDuration("PT1S").endEvent().done();
        UnpackedObject deploymentRecord = new DeploymentRecord();
        ((DeploymentResource) deploymentRecord.resources().add()).setResourceName("process.bpmn").setResource(Bpmn.convertToString(done).getBytes());
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors(this.zeebeState.getKeyGenerator(), processingContext.getWriters()).onCommand(ValueType.DEPLOYMENT, DeploymentIntent.CREATE, typedRecordProcessor);
        });
        this.streams.newRecord(STREAM_NAME).event(deploymentRecord).recordType(RecordType.COMMAND).intent(DeploymentIntent.CREATE).key(0L).write();
        this.streams.newRecord(STREAM_NAME).event(deploymentRecord).recordType(RecordType.COMMAND).intent(DeploymentIntent.CREATE).key(1L).write();
        waitForRecordWhichSatisfies(loggedEvent -> {
            return Records.isEvent(loggedEvent, ValueType.TIMER, TimerIntent.CREATED);
        });
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.valueType(ValueType.TIMER);
        Assertions.assertThat(this.zeebeState.getBlackListState().isOnBlacklist(new MockTypedRecord(0L, recordMetadata, Records.timer(-1L)))).isFalse();
        Assertions.assertThat(arrayList).containsExactly(new Long[]{-1L});
    }

    private void waitForRecordWhichSatisfies(Predicate<LoggedEvent> predicate) {
        ((Optional) TestUtil.doRepeatedly(() -> {
            return this.streams.events(STREAM_NAME).filter(predicate).findFirst();
        }).until(optional -> {
            return Boolean.valueOf(optional.isPresent());
        })).get();
    }
}
