/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.util.RecordStream;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

public final class TypedStreamProcessorTest {
    private static final String STREAM_NAME = "foo";
    protected SynchronousLogStream stream;
    private final TemporaryFolder tempFolder = new TemporaryFolder();
    private final AutoCloseableRule closeables = new AutoCloseableRule();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)this.tempFolder).around((TestRule)this.actorSchedulerRule).around((TestRule)this.closeables);
    private TestStreams streams;
    private KeyGenerator keyGenerator;
    private CommandResponseWriter mockCommandResponseWriter;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks((Object)this);
        this.streams = new TestStreams(this.tempFolder, this.closeables, this.actorSchedulerRule.get());
        this.mockCommandResponseWriter = this.streams.getMockedResponseWriter();
        this.stream = this.streams.createLogStream(STREAM_NAME);
        AtomicLong key = new AtomicLong();
        this.keyGenerator = () -> key.getAndIncrement();
    }

    @Test
    public void shouldWriteSourceEventAndProducerOnBatch() {
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> TypedRecordProcessors.processors((KeyGenerator)this.keyGenerator, (Writers)processingContext.getWriters()).onCommand(ValueType.DEPLOYMENT, (Intent)DeploymentIntent.CREATE, (TypedRecordProcessor)new BatchProcessor()));
        long firstEventPosition = this.streams.newRecord(STREAM_NAME).event((UnpackedObject)this.deployment(STREAM_NAME)).recordType(RecordType.COMMAND).intent((Intent)DeploymentIntent.CREATE).write();
        LoggedEvent writtenEvent = (LoggedEvent)((Optional)TestUtil.doRepeatedly(() -> this.streams.events(STREAM_NAME).filter(e -> Records.isEvent(e, ValueType.DEPLOYMENT, (Intent)DeploymentIntent.CREATED)).findFirst()).until(o -> o.isPresent())).get();
        Assertions.assertThat((long)writtenEvent.getSourceEventPosition()).isEqualTo(firstEventPosition);
    }

    @Test
    public void shouldSkipFailingEvent() {
        this.streams.startStreamProcessor(STREAM_NAME, DefaultZeebeDbFactory.defaultFactory(), processingContext -> TypedRecordProcessors.processors((KeyGenerator)this.keyGenerator, (Writers)processingContext.getWriters()).onCommand(ValueType.DEPLOYMENT, (Intent)DeploymentIntent.CREATE, (TypedRecordProcessor)new ErrorProneProcessor()));
        AtomicLong requestId = new AtomicLong(0L);
        AtomicInteger requestStreamId = new AtomicInteger(0);
        Mockito.when((Object)this.mockCommandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).then(invocationOnMock -> {
            int streamIdArg = (Integer)invocationOnMock.getArgument(0);
            long requestIdArg = (Long)invocationOnMock.getArgument(1);
            requestId.set(requestIdArg);
            requestStreamId.set(streamIdArg);
            return true;
        });
        long failingKey = this.keyGenerator.nextKey();
        this.streams.newRecord(STREAM_NAME).event((UnpackedObject)this.deployment(STREAM_NAME)).recordType(RecordType.COMMAND).intent((Intent)DeploymentIntent.CREATE).requestId(255L).requestStreamId(99).key(failingKey).write();
        long secondEventPosition = this.streams.newRecord(STREAM_NAME).event((UnpackedObject)this.deployment("foo2")).recordType(RecordType.COMMAND).intent((Intent)DeploymentIntent.CREATE).key(this.keyGenerator.nextKey()).write();
        LoggedEvent writtenEvent = (LoggedEvent)((Optional)TestUtil.doRepeatedly(() -> this.streams.events(STREAM_NAME).filter(e -> Records.isEvent(e, ValueType.DEPLOYMENT, (Intent)DeploymentIntent.CREATED)).findFirst()).until(o -> o.isPresent())).get();
        Assertions.assertThat((long)writtenEvent.getKey()).isEqualTo(1L);
        Assertions.assertThat((long)writtenEvent.getSourceEventPosition()).isEqualTo(secondEventPosition);
        ((CommandResponseWriter)Mockito.verify((Object)this.mockCommandResponseWriter)).tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        Assertions.assertThat((long)requestId.get()).isEqualTo(255L);
        Assertions.assertThat((int)requestStreamId.get()).isEqualTo(99);
        Record deploymentRejection = (Record)new RecordStream(this.streams.events(STREAM_NAME)).onlyDeploymentRecords().onlyRejections().withIntent((Intent)DeploymentIntent.CREATE).getFirst();
        Assertions.assertThat((long)deploymentRejection.getKey()).isEqualTo(failingKey);
        Assertions.assertThat((Comparable)deploymentRejection.getRejectionType()).isEqualTo((Object)RejectionType.PROCESSING_ERROR);
    }

    protected DeploymentRecord deployment(String name) {
        DeploymentRecord event = new DeploymentRecord();
        ((DeploymentResource)event.resources().add()).setResource(BufferUtil.wrapString((String)STREAM_NAME)).setResourceName(BufferUtil.wrapString((String)name));
        return event;
    }

    protected class BatchProcessor
    implements TypedRecordProcessor<DeploymentRecord> {
        protected BatchProcessor() {
        }

        public void processRecord(TypedRecord<DeploymentRecord> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter) {
            streamWriter.appendFollowUpEvent(TypedStreamProcessorTest.this.keyGenerator.nextKey(), (Intent)DeploymentIntent.CREATED, (RecordValue)record.getValue());
            streamWriter.flush();
        }
    }

    protected static class ErrorProneProcessor
    implements TypedRecordProcessor<DeploymentRecord> {
        protected ErrorProneProcessor() {
        }

        public void processRecord(TypedRecord<DeploymentRecord> record, TypedResponseWriter responseWriter, TypedStreamWriter streamWriter) {
            if (record.getKey() == 0L) {
                throw new RuntimeException("expected");
            }
            streamWriter.appendFollowUpEvent(record.getKey(), (Intent)DeploymentIntent.CREATED, (RecordValue)record.getValue());
            streamWriter.flush();
        }
    }
}

