/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.util.ListLogStorage;
import io.camunda.zeebe.engine.util.RecordStream;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessingComposite;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import org.agrona.CloseHelper;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.mockito.Mockito;

public final class StreamProcessorInconsistentPositionTest {
    private static final ProcessInstanceRecord PROCESS_INSTANCE_RECORD = Records.processInstance(1L);
    private final TemporaryFolder tempFolder = new TemporaryFolder();
    private final AutoCloseableRule closeables = new AutoCloseableRule();
    private final ControlledActorClock clock = new ControlledActorClock();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule((ActorClock)this.clock);
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)this.tempFolder).around((TestRule)this.actorSchedulerRule).around((TestRule)this.closeables);
    private StreamProcessingComposite firstStreamProcessorComposite;
    private StreamProcessingComposite secondStreamProcessorComposite;
    private TestStreams testStreams;

    @Before
    public void setup() {
        this.testStreams = new TestStreams(this.tempFolder, this.closeables, this.actorSchedulerRule.get());
        ListLogStorage listLogStorage = new ListLogStorage();
        this.testStreams.createLogStream(StreamProcessingComposite.getLogName(1), 1, listLogStorage);
        this.testStreams.createLogStream(StreamProcessingComposite.getLogName(2), 2, listLogStorage);
        this.firstStreamProcessorComposite = new StreamProcessingComposite(this.testStreams, 1, DefaultZeebeDbFactory.defaultFactory());
        this.secondStreamProcessorComposite = new StreamProcessingComposite(this.testStreams, 2, DefaultZeebeDbFactory.defaultFactory());
    }

    @After
    public void tearDown() {
        CloseHelper.quietClose(() -> this.testStreams.closeProcessor(StreamProcessingComposite.getLogName(1)));
    }

    @Test
    public void shouldNotStartOnInconsistentLog() {
        long position = this.firstStreamProcessorComposite.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        long secondPosition = this.firstStreamProcessorComposite.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> new RecordStream(this.testStreams.events(StreamProcessingComposite.getLogName(1))).onlyProcessInstanceRecords().withIntent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).count() == 2L);
        long otherPosition = this.secondStreamProcessorComposite.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        long otherSecondPosition = this.secondStreamProcessorComposite.writeCommand((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, (UnpackedObject)PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> new RecordStream(this.testStreams.events(StreamProcessingComposite.getLogName(2))).onlyProcessInstanceRecords().withIntent((Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT).count() == 4L);
        Assertions.assertThat((long)position).isEqualTo(otherPosition);
        Assertions.assertThat((long)secondPosition).isEqualTo(otherSecondPosition);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor)Mockito.mock(TypedRecordProcessor.class);
        StreamProcessor streamProcessor = this.firstStreamProcessorComposite.startTypedStreamProcessorNotAwaitOpening((processors, context) -> processors.onCommand(ValueType.PROCESS_INSTANCE, (Intent)ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor));
        TestUtil.waitUntil(() -> ((StreamProcessor)streamProcessor).isFailed());
        Assertions.assertThat((boolean)streamProcessor.isFailed()).isTrue();
    }
}

