/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.util.MockTypedRecord;
import io.camunda.zeebe.engine.util.RecordStream;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.impl.record.value.timer.TimerRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.ErrorIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.stream.api.CommandResponseWriter;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.state.KeyGenerator;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public final class EngineErrorHandlingTest {
    private static final String STREAM_NAME = "foo";
    private final TemporaryFolder tempFolder = new TemporaryFolder();
    private final AutoCloseableRule closeables = new AutoCloseableRule();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)this.tempFolder).around((TestRule)this.actorSchedulerRule).around((TestRule)this.closeables);
    private TestStreams streams;
    private KeyGenerator keyGenerator;
    private CommandResponseWriter mockCommandResponseWriter;
    private MutableZeebeState zeebeState;

    @Before
    public void setUp() {
        this.streams = new TestStreams(this.tempFolder, this.closeables, this.actorSchedulerRule.get());
        this.mockCommandResponseWriter = this.streams.getMockedResponseWriter();
        this.streams.createLogStream(STREAM_NAME);
        AtomicLong key = new AtomicLong();
        this.keyGenerator = () -> key.getAndIncrement();
    }

    @Test
    public void shouldAutoRejectCommandOnProcessingFailure() {
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> TypedRecordProcessors.processors((KeyGenerator)this.keyGenerator, (Writers)processingContext.getWriters()).onCommand(ValueType.DEPLOYMENT, (Intent)DeploymentIntent.CREATE, (TypedRecordProcessor)new TypedRecordProcessor<DeploymentRecord>(){

            public void processRecord(TypedRecord<DeploymentRecord> record) {
                if (record.getKey() == 0L) {
                    throw new RuntimeException("expected");
                }
                processingContext.getWriters().state().appendFollowUpEvent(record.getKey(), (Intent)DeploymentIntent.CREATED, (RecordValue)record.getValue());
            }
        }));
        long failingKey = this.keyGenerator.nextKey();
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)this.deployment(STREAM_NAME)).recordType(RecordType.COMMAND).intent((Intent)DeploymentIntent.CREATE).requestId(255L).requestStreamId(99).key(failingKey).write();
        long secondEventPosition = this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)this.deployment("foo2")).recordType(RecordType.COMMAND).intent((Intent)DeploymentIntent.CREATE).key(this.keyGenerator.nextKey()).write();
        LoggedEvent writtenEvent = (LoggedEvent)((Optional)TestUtil.doRepeatedly(() -> this.streams.events(STREAM_NAME).filter(e -> Records.isEvent(e, ValueType.DEPLOYMENT, (Intent)DeploymentIntent.CREATED)).findFirst()).until(o -> o.isPresent())).get();
        Assertions.assertThat((long)writtenEvent.getKey()).isEqualTo(1L);
        Assertions.assertThat((long)writtenEvent.getSourceEventPosition()).isEqualTo(secondEventPosition);
        ((CommandResponseWriter)Mockito.verify((Object)this.mockCommandResponseWriter)).tryWriteResponse(ArgumentMatchers.eq((int)99), ArgumentMatchers.eq((long)255L));
        Record deploymentRejection = (Record)new RecordStream(this.streams.events(STREAM_NAME)).onlyDeploymentRecords().onlyRejections().withIntent((Intent)DeploymentIntent.CREATE).getFirst();
        Assertions.assertThat((long)deploymentRejection.getKey()).isEqualTo(failingKey);
        Assertions.assertThat((Comparable)deploymentRejection.getRejectionType()).isEqualTo((Object)RejectionType.PROCESSING_ERROR);
    }

    DeploymentRecord deployment(String name) {
        DeploymentRecord event = new DeploymentRecord();
        ((DeploymentResource)event.resources().add()).setResource(BufferUtil.wrapString((String)STREAM_NAME)).setResourceName(BufferUtil.wrapString((String)name));
        return event;
    }

    @Test
    public void shouldWriteErrorEvent() {
        ErrorProneProcessor errorProneProcessor = new ErrorProneProcessor();
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)errorProneProcessor);
        });
        long failingEventPosition = this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.processInstance(1L)).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.waitForRecordWhichSatisfies(e -> Records.isEvent(e, ValueType.ERROR, (Intent)ErrorIntent.CREATED));
        Assertions.assertThat((long)errorProneProcessor.getProcessCount()).isEqualTo(1L);
        ErrorRecord errorRecord = (ErrorRecord)((Record)new RecordStream(this.streams.events(STREAM_NAME)).onlyErrorRecords().getFirst()).getValue();
        Assertions.assertThat((long)errorRecord.getErrorEventPosition()).isEqualTo(failingEventPosition);
        Assertions.assertThat((String)BufferUtil.bufferAsString((DirectBuffer)errorRecord.getExceptionMessageBuffer())).isEqualTo("expected");
        Assertions.assertThat((long)errorRecord.getProcessInstanceKey()).isEqualTo(1L);
    }

    @Test
    public void shouldWriteErrorEventWithNoMessage() {
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

                public void processRecord(TypedRecord<UnifiedRecordValue> record) {
                    throw new NullPointerException();
                }
            });
        });
        long failingEventPosition = this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.processInstance(1L)).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.waitForRecordWhichSatisfies(e -> Records.isEvent(e, ValueType.ERROR, (Intent)ErrorIntent.CREATED));
        ErrorRecord errorRecord = (ErrorRecord)((Record)new RecordStream(this.streams.events(STREAM_NAME)).onlyErrorRecords().getFirst()).getValue();
        Assertions.assertThat((long)errorRecord.getErrorEventPosition()).isEqualTo(failingEventPosition);
        Assertions.assertThat((String)BufferUtil.bufferAsString((DirectBuffer)errorRecord.getExceptionMessageBuffer())).isEqualTo("Without exception message.");
        Assertions.assertThat((long)errorRecord.getProcessInstanceKey()).isEqualTo(1L);
    }

    @Test
    public void shouldBlacklistInstance() {
        AtomicReference dumpProcessorRef = new AtomicReference();
        ErrorProneProcessor processor = new ErrorProneProcessor();
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            dumpProcessorRef.set((DumpProcessor)Mockito.spy((Object)new DumpProcessor(processingContext.getWriters())));
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)processor).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.COMPLETE_ELEMENT, (TypedRecordProcessor)dumpProcessorRef.get());
        });
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.processInstance(1L)).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.processInstance(1L)).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.COMPLETE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.processInstance(2L)).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.COMPLETE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.waitForRecordWhichSatisfies(e -> Records.isEvent(e, ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ELEMENT_COMPLETED));
        Assertions.assertThat((long)processor.getProcessCount()).isEqualTo(1L);
        RecordMetadata metadata = new RecordMetadata();
        metadata.valueType(ValueType.PROCESS_INSTANCE);
        MockTypedRecord<ProcessInstanceRecord> mockTypedRecord = new MockTypedRecord<ProcessInstanceRecord>(0L, metadata, Records.processInstance(1L));
        Assertions.assertThat((boolean)this.zeebeState.getBlackListState().isOnBlacklist(mockTypedRecord)).isTrue();
        ((DumpProcessor)Mockito.verify((Object)((DumpProcessor)dumpProcessorRef.get()), (VerificationMode)Mockito.times((int)1))).processRecord((TypedRecord)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        Assertions.assertThat(((DumpProcessor)dumpProcessorRef.get()).processedInstances).containsExactly((Object[])new Long[]{2L});
    }

    @Test
    public void shouldBlacklistInstanceOnReplay() throws Exception {
        Mockito.when((Object)this.mockCommandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        long failedPos = this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.processInstance(1L)).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.error((int)Records.processInstance(1L).getProcessInstanceKey(), failedPos)).recordType(RecordType.EVENT).sourceRecordPosition(failedPos).intent((Intent)ErrorIntent.CREATED).key(this.keyGenerator.nextKey()).write();
        final CountDownLatch latch = new CountDownLatch(1);
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).withListener(new StreamProcessorLifecycleAware(){

                public void onRecovered(ReadonlyStreamProcessorContext ctx) {
                    latch.countDown();
                }
            }).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new DumpProcessor(processingContext.getWriters()));
        });
        latch.await(2000L, TimeUnit.MILLISECONDS);
        RecordMetadata metadata = new RecordMetadata();
        metadata.valueType(ValueType.PROCESS_INSTANCE);
        MockTypedRecord<ProcessInstanceRecord> mockTypedRecord = new MockTypedRecord<ProcessInstanceRecord>(0L, metadata, Records.processInstance(1L));
        TestUtil.waitUntil(() -> this.zeebeState.getBlackListState().isOnBlacklist((TypedRecord)mockTypedRecord));
    }

    @Test
    public void shouldNotBlacklistInstanceOnJobCommand() {
        Mockito.when((Object)this.mockCommandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        ArrayList processedInstances = new ArrayList();
        AtomicReference dumpProcessorRef = new AtomicReference();
        TypedRecordProcessor<JobRecord> errorProneProcessor = new TypedRecordProcessor<JobRecord>(){

            public void processRecord(TypedRecord<JobRecord> record) {
                throw new RuntimeException("expected");
            }
        };
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), arg_0 -> this.lambda$shouldNotBlacklistInstanceOnJobCommand$13(dumpProcessorRef, processedInstances, (TypedRecordProcessor)errorProneProcessor, arg_0));
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.job(1L)).recordType(RecordType.COMMAND).intent((Intent)JobIntent.COMPLETE).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.job(1L)).recordType(RecordType.COMMAND).intent((Intent)JobIntent.THROW_ERROR).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)Records.job(2L)).recordType(RecordType.COMMAND).intent((Intent)JobIntent.THROW_ERROR).key(this.keyGenerator.nextKey()).write();
        this.waitForRecordWhichSatisfies(e -> Records.isCommand(e, ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.COMPLETE_ELEMENT));
        RecordMetadata metadata = new RecordMetadata();
        metadata.valueType(ValueType.PROCESS_INSTANCE);
        MockTypedRecord<ProcessInstanceRecord> mockTypedRecord = new MockTypedRecord<ProcessInstanceRecord>(0L, metadata, Records.processInstance(1L));
        Assertions.assertThat((boolean)this.zeebeState.getBlackListState().isOnBlacklist(mockTypedRecord)).isFalse();
        ((TypedRecordProcessor)Mockito.verify((Object)((TypedRecordProcessor)dumpProcessorRef.get()), (VerificationMode)Mockito.timeout((long)1000L).times(2))).processRecord((TypedRecord)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        Assertions.assertThat(processedInstances).containsExactly((Object[])new Long[]{1L, 2L});
    }

    @Test
    public void shouldNotBlacklistInstanceAndIgnoreTimerStartEvents() {
        Mockito.when((Object)this.mockCommandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        final ArrayList processedInstances = new ArrayList();
        BpmnModelInstance process = ((StartEventBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().timerWithDuration("PT1S")).endEvent().done();
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        ((DeploymentResource)deploymentRecord.resources().add()).setResourceName("process.bpmn").setResource(Bpmn.convertToString((BpmnModelInstance)process).getBytes());
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.DEPLOYMENT, (Intent)DeploymentIntent.CREATE, (TypedRecordProcessor)new TypedRecordProcessor<DeploymentRecord>(){

                public void processRecord(TypedRecord<DeploymentRecord> record) {
                    if (record.getKey() == 0L) {
                        throw new RuntimeException("expected");
                    }
                    processedInstances.add(-1L);
                    processingContext.getWriters().state().appendFollowUpEvent(record.getKey(), (Intent)TimerIntent.CREATED, (RecordValue)Records.timer(-1L));
                }
            });
        });
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)deploymentRecord).recordType(RecordType.COMMAND).intent((Intent)DeploymentIntent.CREATE).key(0L).write();
        this.streams.newRecord(STREAM_NAME).event((UnifiedRecordValue)deploymentRecord).recordType(RecordType.COMMAND).intent((Intent)DeploymentIntent.CREATE).key(1L).write();
        this.waitForRecordWhichSatisfies(e -> Records.isEvent(e, ValueType.TIMER, (Intent)TimerIntent.CREATED));
        RecordMetadata metadata = new RecordMetadata();
        metadata.valueType(ValueType.TIMER);
        MockTypedRecord<TimerRecord> mockTypedRecord = new MockTypedRecord<TimerRecord>(0L, metadata, Records.timer(-1L));
        Assertions.assertThat((boolean)this.zeebeState.getBlackListState().isOnBlacklist(mockTypedRecord)).isFalse();
        Assertions.assertThat(processedInstances).containsExactly((Object[])new Long[]{-1L});
    }

    private void waitForRecordWhichSatisfies(Predicate<LoggedEvent> filter) {
        ((Optional)TestUtil.doRepeatedly(() -> this.streams.events(STREAM_NAME).filter(filter).findFirst()).until(o -> o.isPresent())).get();
    }

    private /* synthetic */ TypedRecordProcessors lambda$shouldNotBlacklistInstanceOnJobCommand$13(AtomicReference dumpProcessorRef, final List processedInstances, TypedRecordProcessor errorProneProcessor, final TypedRecordProcessorContext processingContext) {
        this.zeebeState = processingContext.getZeebeState();
        dumpProcessorRef.set((TypedRecordProcessor)Mockito.spy((Object)new TypedRecordProcessor<JobRecord>(){

            public void processRecord(TypedRecord<JobRecord> record) {
                processedInstances.add(((JobRecord)record.getValue()).getProcessInstanceKey());
                int processInstanceKey = (int)((JobRecord)record.getValue()).getProcessInstanceKey();
                processingContext.getWriters().command().appendFollowUpCommand(record.getKey(), (Intent)ProcessInstanceIntent.COMPLETE_ELEMENT, (RecordValue)Records.processInstance(processInstanceKey));
            }
        }));
        return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.JOB, (Intent)JobIntent.COMPLETE, errorProneProcessor).onCommand(ValueType.JOB, (Intent)JobIntent.THROW_ERROR, (TypedRecordProcessor)dumpProcessorRef.get());
    }

    protected static class ErrorProneProcessor
    implements TypedRecordProcessor<ProcessInstanceRecord> {
        public final AtomicLong processCount = new AtomicLong(0L);

        protected ErrorProneProcessor() {
        }

        public void processRecord(TypedRecord<ProcessInstanceRecord> record) {
            this.processCount.incrementAndGet();
            throw new RuntimeException("expected");
        }

        public long getProcessCount() {
            return this.processCount.get();
        }
    }

    protected static class DumpProcessor
    implements TypedRecordProcessor<ProcessInstanceRecord> {
        final List<Long> processedInstances = new ArrayList<Long>();
        private final StateWriter stateWriter;

        public DumpProcessor(Writers writers) {
            this.stateWriter = writers.state();
        }

        public void processRecord(TypedRecord<ProcessInstanceRecord> record) {
            this.processedInstances.add(((ProcessInstanceRecord)record.getValue()).getProcessInstanceKey());
            this.stateWriter.appendFollowUpEvent(record.getKey(), (Intent)ProcessInstanceIntent.ELEMENT_COMPLETED, (RecordValue)record.getValue());
        }
    }
}

