/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.state;

import io.camunda.zeebe.engine.EngineConfiguration;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.BoundaryEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.IntermediateCatchEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.MultiInstanceLoopCharacteristicsBuilder;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.UserTaskBuilder;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.UserTaskIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.protocol.record.value.UserTaskRecordValue;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public final class ReplayStateTest {
    private static final String PROCESS_ID = "process";
    @Parameterized.Parameter
    public TestCase testCase;
    private long lastProcessedPosition = -1L;
    @Rule
    public final EngineRule engine = EngineRule.singlePartition().withOnProcessedCallback(record -> {
        this.lastProcessedPosition = record.getPosition();
    }).withOnSkippedCallback(record -> {
        this.lastProcessedPosition = record.getPosition();
    });

    @Parameterized.Parameters(name="{0}")
    public static Collection<TestCase> testRecords() {
        return List.of(ReplayStateTest.testCase("activated service task").withProcess(Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", t -> t.zeebeJobType("test")).done()).withExecution(engine -> {
            engine.processInstance().ofBpmnProcessId(PROCESS_ID).create();
            RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementType(BpmnElementType.SERVICE_TASK).await();
            return (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        }), ReplayStateTest.testCase("expired buffered message").withExecution(engine -> {
            Duration timeToLive = Duration.ofMinutes(1L);
            engine.message().withName("test").withCorrelationKey("1").withTimeToLive(timeToLive).publish();
            engine.getClock().addTime(timeToLive.plus(EngineConfiguration.DEFAULT_MESSAGES_TTL_CHECKER_INTERVAL));
            return (Record)RecordingExporter.messageRecords((MessageIntent)MessageIntent.EXPIRED).getFirst();
        }), ReplayStateTest.testCase("throw error end event").withProcess(((IntermediateCatchEventBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().subProcess("subProcess").embeddedSubProcess().startEvent().endEvent("errorEndEvent", b -> b.error("error")).subProcessDone().boundaryEvent("errorCatchEvent", b -> ((BoundaryEventBuilder)b.error("error")).cancelActivity(Boolean.valueOf(true))).endEvent().moveToActivity("subProcess").intermediateCatchEvent("neverProcessed").message(m -> m.name("message").zeebeCorrelationKey("=\"key\""))).done()).withExecution(engine -> {
            long piKey = engine.processInstance().ofBpmnProcessId(PROCESS_ID).create();
            return (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.PROCESS).withRecordKey(piKey)).getFirst();
        }), ReplayStateTest.testCase("interrupting message boundary event on receive task").withProcess(Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().receiveTask("task", t -> t.message(m -> m.name("task").zeebeCorrelationKeyExpression("1"))).boundaryEvent("event", b -> ((BoundaryEventBuilder)b.cancelActivity(Boolean.valueOf(true))).message(m -> m.name("event").zeebeCorrelationKeyExpression("1"))).endEvent("end").done()).withExecution(engine -> {
            long piKey = engine.processInstance().ofBpmnProcessId(PROCESS_ID).create();
            engine.message().withName("event").withCorrelationKey("1").publish();
            return (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(piKey).withElementType(BpmnElementType.PROCESS).getFirst();
        }), ReplayStateTest.testCase("non-interrupting timer boundary event on receive task").withProcess(Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().receiveTask("task", t -> t.message(m -> m.name("task").zeebeCorrelationKeyExpression("1"))).boundaryEvent("event", b -> ((BoundaryEventBuilder)b.cancelActivity(Boolean.valueOf(false))).timerWithDuration("PT0S")).endEvent("end").done()).withExecution(engine -> {
            long piKey = engine.processInstance().ofBpmnProcessId(PROCESS_ID).create();
            return (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(piKey).withElementType(BpmnElementType.END_EVENT).withElementId("end").getFirst();
        }), ReplayStateTest.testCase("parallel multi-instance service task").withProcess(Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", t -> ((ServiceTaskBuilder)t.zeebeJobType("type")).multiInstance(m -> ((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)m.parallel()).zeebeInputElement("item")).zeebeInputCollectionExpression("items")).zeebeOutputElementExpression("result")).zeebeOutputCollection("results"))).endEvent().done()).withExecution(engine -> {
            long piKey = engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("items", List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))).create();
            Awaitility.await((String)"until there are 3 jobs ready to be activated").pollInSameThread().until(() -> ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).limit(3L)).count() >= 3L);
            JobBatchRecordValue jobs = (JobBatchRecordValue)engine.jobs().withMaxJobsToActivate(3).withType("type").activate().getValue();
            jobs.getJobKeys().forEach(key -> engine.job().withKey((long)key).withVariable("result", 0).complete());
            return (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(piKey).withElementType(BpmnElementType.PROCESS).getFirst();
        }), ReplayStateTest.testCase("sequential multi-instance service task").withProcess(Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", t -> ((ServiceTaskBuilder)t.zeebeJobType("type")).multiInstance(m -> ((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)m.sequential()).zeebeInputElement("item")).zeebeInputCollectionExpression("items")).zeebeOutputElementExpression("result")).zeebeOutputCollection("results"))).endEvent().done()).withExecution(engine -> {
            long piKey = engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("items", List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))).create();
            for (int i = 0; i < 3; ++i) {
                int expectedJobCount = i + 1;
                Awaitility.await((String)("until there are " + expectedJobCount + " jobs ready to be activated")).pollInSameThread().until(() -> ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).limit((long)expectedJobCount)).count() >= (long)expectedJobCount);
                JobBatchRecordValue jobs = (JobBatchRecordValue)engine.jobs().withMaxJobsToActivate(1).withType("type").activate().getValue();
                jobs.getJobKeys().forEach(key -> engine.job().withKey((long)key).withVariable("result", 0).complete());
            }
            return (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(piKey).withElementType(BpmnElementType.PROCESS).getFirst();
        }), ReplayStateTest.testCase("interrupting parallel multi-instance service task").withProcess(Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType("type")).multiInstance(m -> ((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)m.parallel()).zeebeInputElement("item")).zeebeInputCollectionExpression("items")).zeebeOutputElementExpression("result")).zeebeOutputCollection("results"))).boundaryEvent("event", b -> ((BoundaryEventBuilder)b.cancelActivity(Boolean.valueOf(true))).message(m -> m.name("message").zeebeCorrelationKey("=\"key\""))).endEvent()).endEvent().done()).withExecution(engine -> {
            long piKey = engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("items", List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))).create();
            Awaitility.await((String)"until there are 3 jobs ready to be activated").pollInSameThread().until(() -> ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).limit(3L)).count() >= 2L);
            JobBatchRecordValue jobs = (JobBatchRecordValue)engine.jobs().withMaxJobsToActivate(2).withType("type").activate().getValue();
            jobs.getJobKeys().forEach(key -> engine.job().withKey((long)key).withVariable("result", 0).complete());
            engine.message().withName("message").withCorrelationKey("key").publish();
            return (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(piKey).withElementType(BpmnElementType.PROCESS).getFirst();
        }), ReplayStateTest.testCase("link deployed form to job-based user task").withProcess(Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().userTask("task", t -> t.zeebeFormId("Form_0w7r08e")).endEvent().done()).withForm("/form/test-form-1.form").withExecution(engine -> {
            engine.processInstance().ofBpmnProcessId(PROCESS_ID).create();
            Record job = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
            Assertions.assertThat((Map)((JobRecordValue)job.getValue()).getCustomHeaders()).containsKey((Object)"io.camunda.zeebe:formKey");
            return job;
        }), ReplayStateTest.testCase("link deployed form to native user task").withProcess(((UserTaskBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().userTask("task", t -> t.zeebeFormId("Form_0w7r08e")).zeebeUserTask()).endEvent().done()).withForm("/form/test-form-1.form").withExecution(engine -> {
            engine.processInstance().ofBpmnProcessId(PROCESS_ID).create();
            Record userTask = (Record)RecordingExporter.userTaskRecords((UserTaskIntent)UserTaskIntent.CREATED).getFirst();
            Assertions.assertThat((long)((UserTaskRecordValue)userTask.getValue()).getFormKey()).isGreaterThan(-1L);
            return userTask;
        }), ReplayStateTest.testCase("correlate buffered message to start event").withProcess(((StartEventBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().message("start")).serviceTask("task", t -> t.zeebeJobType("task")).endEvent().done()).withExecution(engine -> {
            PublishMessageClient messageCommand = engine.message().withName("start").withCorrelationKey("go").withTimeToLive(Duration.ofMinutes(5L));
            messageCommand.withVariables(Map.of("x", 1)).publish();
            messageCommand.withVariables(Map.of("x", 2)).publish();
            long firstJobKey = ((Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst()).getKey();
            engine.job().withKey(firstJobKey).complete();
            long secondJobKey = ((Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).skip(1L)).getFirst()).getKey();
            engine.job().withKey(secondJobKey).complete();
            return (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.PROCESS).skip(1L)).getFirst();
        }));
    }

    @Before
    public void init() {
        this.lastProcessedPosition = -1L;
    }

    @Test
    public void shouldRestoreState() {
        this.testCase.processes.forEach(process -> this.engine.deployment().withXmlResource((BpmnModelInstance)process).deploy());
        this.testCase.forms.forEach(form -> this.engine.deployment().withJsonClasspathResource((String)form).deploy());
        Record<?> finalRecord = this.testCase.execution.apply(this.engine);
        Awaitility.await((String)"await until the last record is processed").untilAsserted(() -> Assertions.assertThat((long)this.lastProcessedPosition).isGreaterThanOrEqualTo(finalRecord.getPosition()));
        Map<ZbColumnFamilies, Map<Object, Object>> processingState = this.engine.collectState();
        this.engine.stop();
        this.engine.start();
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((Comparable)((StreamProcessor.Phase)this.engine.getStreamProcessor(1).getCurrentPhase().join())).isEqualTo((Object)StreamProcessor.Phase.PROCESSING));
        Awaitility.await((String)"await that the replay state is equal to the processing state").untilAsserted(() -> {
            Map<ZbColumnFamilies, Map<Object, Object>> replayState = this.engine.collectState();
            SoftAssertions softly = new SoftAssertions();
            processingState.entrySet().stream().filter(entry -> entry.getKey() != ZbColumnFamilies.DEFAULT).forEach(entry -> {
                ZbColumnFamilies column = (ZbColumnFamilies)entry.getKey();
                Map processingEntries = (Map)entry.getValue();
                Map replayEntries = (Map)replayState.get(column);
                if (processingEntries.isEmpty()) {
                    ((MapAssert)softly.assertThat(replayEntries).describedAs("The state column '%s' should be empty after replay", new Object[]{column})).isEmpty();
                } else {
                    ((MapAssert)softly.assertThat(replayEntries).describedAs("The state column '%s' has different entries after replay", new Object[]{column})).containsExactlyInAnyOrderEntriesOf(processingEntries);
                }
            });
            softly.assertAll();
        });
    }

    private static TestCase testCase(String description) {
        return new TestCase(description);
    }

    private static final class TestCase {
        private final String description;
        private final List<BpmnModelInstance> processes = new ArrayList<BpmnModelInstance>();
        private final List<String> forms = new ArrayList<String>();
        private Function<EngineRule, Record<?>> execution = engine -> (Record)RecordingExporter.records().getFirst();

        private TestCase(String description) {
            this.description = description;
        }

        private TestCase withProcess(BpmnModelInstance process) {
            this.processes.add(process);
            return this;
        }

        public TestCase withForm(String pathToForm) {
            this.forms.add(pathToForm);
            return this;
        }

        private TestCase withExecution(Function<EngineRule, Record<?>> execution) {
            this.execution = execution;
            return this;
        }

        public String toString() {
            return "TestCase{" + this.description + "}";
        }
    }
}

