/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.util.MockTypedRecord;
import io.camunda.zeebe.engine.util.RecordStream;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.impl.record.value.timer.TimerRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.ErrorIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;

public final class SkipFailingEventsTest {
    private static final String STREAM_NAME = "foo";
    private static final ProcessInstanceRecord PROCESS_INSTANCE_RECORD = Records.processInstance(1L);
    private static final JobRecord JOB_RECORD = Records.job(1L);
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    public final AutoCloseableRule closeables = new AutoCloseableRule();
    public final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)this.tempFolder).around((TestRule)this.actorSchedulerRule).around((TestRule)this.closeables);
    protected TestStreams streams;
    protected SynchronousLogStream stream;
    @Mock
    protected CommandResponseWriter commandResponseWriter;
    private KeyGenerator keyGenerator;
    private MutableZeebeState zeebeState;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks((Object)this);
        this.streams = new TestStreams(this.tempFolder, this.closeables, this.actorSchedulerRule.get());
        this.commandResponseWriter = this.streams.getMockedResponseWriter();
        this.stream = this.streams.createLogStream(STREAM_NAME);
        AtomicLong key = new AtomicLong();
        this.keyGenerator = () -> key.getAndIncrement();
    }

    @Test
    public void shouldWriteErrorEvent() {
        ErrorProneProcessor errorProneProcessor = new ErrorProneProcessor();
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)errorProneProcessor);
        });
        long failingEventPosition = this.streams.newRecord(STREAM_NAME).event((UnpackedObject)PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.waitForRecordWhichSatisfies(e -> Records.isEvent(e, ValueType.ERROR, (Intent)ErrorIntent.CREATED));
        Assertions.assertThat((long)errorProneProcessor.getProcessCount()).isEqualTo(1L);
        ErrorRecord errorRecord = (ErrorRecord)((Record)new RecordStream(this.streams.events(STREAM_NAME)).onlyErrorRecords().getFirst()).getValue();
        Assertions.assertThat((long)errorRecord.getErrorEventPosition()).isEqualTo(failingEventPosition);
        Assertions.assertThat((String)BufferUtil.bufferAsString((DirectBuffer)errorRecord.getExceptionMessageBuffer())).isEqualTo("expected");
        Assertions.assertThat((long)errorRecord.getProcessInstanceKey()).isEqualTo(1L);
    }

    @Test
    public void shouldWriteErrorEventWithNoMessage() {
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new TypedRecordProcessor<UnifiedRecordValue>(){

                public void processRecord(TypedRecord<UnifiedRecordValue> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter) {
                    throw new NullPointerException();
                }
            });
        });
        long failingEventPosition = this.streams.newRecord(STREAM_NAME).event((UnpackedObject)PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.waitForRecordWhichSatisfies(e -> Records.isEvent(e, ValueType.ERROR, (Intent)ErrorIntent.CREATED));
        ErrorRecord errorRecord = (ErrorRecord)((Record)new RecordStream(this.streams.events(STREAM_NAME)).onlyErrorRecords().getFirst()).getValue();
        Assertions.assertThat((long)errorRecord.getErrorEventPosition()).isEqualTo(failingEventPosition);
        Assertions.assertThat((String)BufferUtil.bufferAsString((DirectBuffer)errorRecord.getExceptionMessageBuffer())).isEqualTo("Without exception message.");
        Assertions.assertThat((long)errorRecord.getProcessInstanceKey()).isEqualTo(1L);
    }

    @Test
    public void shouldBlacklistInstance() {
        DumpProcessor dumpProcessor = (DumpProcessor)Mockito.spy((Object)new DumpProcessor());
        ErrorProneProcessor processor = new ErrorProneProcessor();
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)processor).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.COMPLETE_ELEMENT, (TypedRecordProcessor)dumpProcessor);
        });
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.COMPLETE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)Records.processInstance(2L)).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.COMPLETE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.waitForRecordWhichSatisfies(e -> Records.isEvent(e, ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ELEMENT_COMPLETED));
        Assertions.assertThat((long)processor.getProcessCount()).isEqualTo(1L);
        RecordMetadata metadata = new RecordMetadata();
        metadata.valueType(ValueType.PROCESS_INSTANCE);
        MockTypedRecord<ProcessInstanceRecord> mockTypedRecord = new MockTypedRecord<ProcessInstanceRecord>(0L, metadata, PROCESS_INSTANCE_RECORD);
        Assertions.assertThat((boolean)this.zeebeState.getBlackListState().isOnBlacklist(mockTypedRecord)).isTrue();
        ((DumpProcessor)Mockito.verify((Object)dumpProcessor, (VerificationMode)Mockito.times((int)1))).processRecord((TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        Assertions.assertThat(dumpProcessor.processedInstances).containsExactly((Object[])new Long[]{2L});
    }

    @Test
    @Ignore(value="will be fixed by #7429")
    public void shouldBacklistInstanceOnReplay() throws Exception {
        Mockito.when((Object)this.commandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        long failedPos = this.streams.newRecord(STREAM_NAME).event((UnpackedObject)PROCESS_INSTANCE_RECORD).recordType(RecordType.COMMAND).intent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)Records.error((int)PROCESS_INSTANCE_RECORD.getProcessInstanceKey(), failedPos)).recordType(RecordType.EVENT).sourceRecordPosition(failedPos).intent((Intent)ErrorIntent.CREATED).key(this.keyGenerator.nextKey()).write();
        final CountDownLatch latch = new CountDownLatch(1);
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> {
            this.zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).withListener(new StreamProcessorLifecycleAware(){

                public void onRecovered(ReadonlyProcessingContext ctx) {
                    latch.countDown();
                }
            }).onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (TypedRecordProcessor)new DumpProcessor());
        });
        latch.await(2000L, TimeUnit.MILLISECONDS);
        RecordMetadata metadata = new RecordMetadata();
        metadata.valueType(ValueType.PROCESS_INSTANCE);
        MockTypedRecord<ProcessInstanceRecord> mockTypedRecord = new MockTypedRecord<ProcessInstanceRecord>(0L, metadata, PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> this.zeebeState.getBlackListState().isOnBlacklist((TypedRecord)mockTypedRecord));
    }

    @Test
    public void shouldNotBlacklistInstanceOnJobCommand() {
        Mockito.when((Object)this.commandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        final ArrayList processedInstances = new ArrayList();
        TypedRecordProcessor dumpProcessor = (TypedRecordProcessor)Mockito.spy((Object)new TypedRecordProcessor<JobRecord>(){

            public void processRecord(TypedRecord<JobRecord> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter) {
                processedInstances.add(((JobRecord)record.getValue()).getProcessInstanceKey());
                int processInstanceKey = (int)((JobRecord)record.getValue()).getProcessInstanceKey();
                streamWriter.appendFollowUpCommand(record.getKey(), (Intent)ProcessInstanceIntent.COMPLETE_ELEMENT, (RecordValue)Records.processInstance(processInstanceKey));
            }
        });
        TypedRecordProcessor<JobRecord> errorProneProcessor = new TypedRecordProcessor<JobRecord>(){

            public void processRecord(TypedRecord<JobRecord> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter) {
                throw new RuntimeException("expected");
            }
        };
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), arg_0 -> this.lambda$shouldNotBlacklistInstanceOnJobCommand$9((TypedRecordProcessor)errorProneProcessor, dumpProcessor, arg_0));
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)JOB_RECORD).recordType(RecordType.COMMAND).intent((Intent)JobIntent.COMPLETE).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)JOB_RECORD).recordType(RecordType.COMMAND).intent((Intent)JobIntent.THROW_ERROR).key(this.keyGenerator.nextKey()).write();
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)Records.job(2L)).recordType(RecordType.COMMAND).intent((Intent)JobIntent.THROW_ERROR).key(this.keyGenerator.nextKey()).write();
        this.waitForRecordWhichSatisfies(e -> Records.isCommand(e, ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.COMPLETE_ELEMENT));
        RecordMetadata metadata = new RecordMetadata();
        metadata.valueType(ValueType.PROCESS_INSTANCE);
        MockTypedRecord<ProcessInstanceRecord> mockTypedRecord = new MockTypedRecord<ProcessInstanceRecord>(0L, metadata, PROCESS_INSTANCE_RECORD);
        Assertions.assertThat((boolean)this.zeebeState.getBlackListState().isOnBlacklist(mockTypedRecord)).isFalse();
        ((TypedRecordProcessor)Mockito.verify((Object)dumpProcessor, (VerificationMode)Mockito.timeout((long)1000L).times(2))).processRecord((TypedRecord)ArgumentMatchers.any(), (TypedResponseWriter)ArgumentMatchers.any(), (TypedStreamWriter)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        Assertions.assertThat(processedInstances).containsExactly((Object[])new Long[]{1L, 2L});
    }

    @Test
    public void shouldNotBlacklistInstanceAndIgnoreTimerStartEvents() {
        Mockito.when((Object)this.commandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        final ArrayList processedInstances = new ArrayList();
        TypedRecordProcessor<DeploymentRecord> errorProneProcessor = new TypedRecordProcessor<DeploymentRecord>(){

            public void processRecord(TypedRecord<DeploymentRecord> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter) {
                if (record.getKey() == 0L) {
                    throw new RuntimeException("expected");
                }
                processedInstances.add(-1L);
                streamWriter.appendFollowUpEvent(record.getKey(), (Intent)TimerIntent.CREATED, (RecordValue)Records.timer(-1L));
            }
        };
        BpmnModelInstance process = ((StartEventBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().timerWithDuration("PT1S")).endEvent().done();
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        ((DeploymentResource)deploymentRecord.resources().add()).setResourceName("process.bpmn").setResource(Bpmn.convertToString((BpmnModelInstance)process).getBytes());
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), arg_0 -> this.lambda$shouldNotBlacklistInstanceAndIgnoreTimerStartEvents$11((TypedRecordProcessor)errorProneProcessor, arg_0));
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)deploymentRecord).recordType(RecordType.COMMAND).intent((Intent)DeploymentIntent.CREATE).key(0L).write();
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)deploymentRecord).recordType(RecordType.COMMAND).intent((Intent)DeploymentIntent.CREATE).key(1L).write();
        this.waitForRecordWhichSatisfies(e -> Records.isEvent(e, ValueType.TIMER, (Intent)TimerIntent.CREATED));
        RecordMetadata metadata = new RecordMetadata();
        metadata.valueType(ValueType.TIMER);
        MockTypedRecord<TimerRecord> mockTypedRecord = new MockTypedRecord<TimerRecord>(0L, metadata, Records.timer(-1L));
        Assertions.assertThat((boolean)this.zeebeState.getBlackListState().isOnBlacklist(mockTypedRecord)).isFalse();
        Assertions.assertThat(processedInstances).containsExactly((Object[])new Long[]{-1L});
    }

    private void waitForRecordWhichSatisfies(Predicate<LoggedEvent> filter) {
        ((Optional)TestUtil.doRepeatedly(() -> this.streams.events(STREAM_NAME).filter(filter).findFirst()).until(o -> o.isPresent())).get();
    }

    private /* synthetic */ TypedRecordProcessors lambda$shouldNotBlacklistInstanceAndIgnoreTimerStartEvents$11(TypedRecordProcessor errorProneProcessor, ProcessingContext processingContext) {
        this.zeebeState = processingContext.getZeebeState();
        return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.DEPLOYMENT, (Intent)DeploymentIntent.CREATE, errorProneProcessor);
    }

    private /* synthetic */ TypedRecordProcessors lambda$shouldNotBlacklistInstanceOnJobCommand$9(TypedRecordProcessor errorProneProcessor, TypedRecordProcessor dumpProcessor, ProcessingContext processingContext) {
        this.zeebeState = processingContext.getZeebeState();
        return TypedRecordProcessors.processors((KeyGenerator)this.zeebeState.getKeyGenerator(), (Writers)processingContext.getWriters()).onCommand(ValueType.JOB, (Intent)JobIntent.COMPLETE, errorProneProcessor).onCommand(ValueType.JOB, (Intent)JobIntent.THROW_ERROR, dumpProcessor);
    }

    protected static class DumpProcessor
    implements TypedRecordProcessor<ProcessInstanceRecord> {
        final List<Long> processedInstances = new ArrayList<Long>();

        protected DumpProcessor() {
        }

        public void processRecord(TypedRecord<ProcessInstanceRecord> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter) {
            this.processedInstances.add(((ProcessInstanceRecord)record.getValue()).getProcessInstanceKey());
            streamWriter.appendFollowUpEvent(record.getKey(), (Intent)ProcessInstanceIntent.ELEMENT_COMPLETED, (RecordValue)record.getValue());
        }
    }

    protected static class ErrorProneProcessor
    implements TypedRecordProcessor<ProcessInstanceRecord> {
        public final AtomicLong processCount = new AtomicLong(0L);

        protected ErrorProneProcessor() {
        }

        public void processRecord(TypedRecord<ProcessInstanceRecord> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter) {
            this.processCount.incrementAndGet();
            throw new RuntimeException("expected");
        }

        public long getProcessCount() {
            return this.processCount.get();
        }
    }
}

