/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.bpmn.subprocess;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.EventSubProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.ProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.SubProcessBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValueAssert;
import io.camunda.zeebe.protocol.record.value.VariableDocumentUpdateSemantic;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.ProcessMetadataValue;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.OptionalAssert;
import org.assertj.core.groups.Tuple;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class InterruptingEventSubprocessTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String PROCESS_ID = "proc";
    private static final String JOB_TYPE = "type";
    private static String messageName;
    @Rule
    public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper();
    @Parameterized.Parameter
    public String testName;
    @Parameterized.Parameter(value=1)
    public Function<StartEventBuilder, StartEventBuilder> builder;
    @Parameterized.Parameter(value=2)
    public Consumer<Long> triggerEventSubprocess;
    private ProcessMetadataValue currentProcess;

    @Parameterized.Parameters(name="{0} event subprocess")
    public static Object[][] parameters() {
        return new Object[][]{{"timer", InterruptingEventSubprocessTest.eventSubprocess(s -> (StartEventBuilder)s.timerWithDuration("PT60S")), InterruptingEventSubprocessTest.eventTrigger(key -> {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)RecordingExporter.timerRecords((TimerIntent)TimerIntent.CREATED).withProcessInstanceKey(key.longValue()).exists()).describedAs("Expected timer to exist", new Object[0])).isTrue();
            ENGINE.increaseTime(Duration.ofSeconds(60L));
        })}, {"message", InterruptingEventSubprocessTest.eventSubprocess(s -> (StartEventBuilder)s.message(b -> b.name(messageName).zeebeCorrelationKeyExpression("key"))), InterruptingEventSubprocessTest.eventTrigger(key -> {
            RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).withProcessInstanceKey(key.longValue()).withMessageName(messageName).await();
            ENGINE.message().withName(messageName).withCorrelationKey("123").publish();
        })}, {"error", InterruptingEventSubprocessTest.eventSubprocess(s -> (StartEventBuilder)s.error("ERROR")), InterruptingEventSubprocessTest.eventTrigger(key -> ENGINE.job().ofInstance((long)key).withType(JOB_TYPE).withErrorCode("ERROR").throwError())}};
    }

    private static Function<StartEventBuilder, StartEventBuilder> eventSubprocess(Function<StartEventBuilder, StartEventBuilder> consumer) {
        return consumer;
    }

    private static Consumer<Long> eventTrigger(Consumer<Long> eventTrigger) {
        return eventTrigger;
    }

    @Before
    public void init() {
        messageName = this.helper.getMessageName();
    }

    @Test
    public void shouldTriggerEventSubprocess() {
        BpmnModelInstance model = InterruptingEventSubprocessTest.process(InterruptingEventSubprocessTest.withEventSubprocess(this.builder));
        long processInstanceKey = this.createInstanceAndTriggerEvent(model);
        Record startEventActivate = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ACTIVATE_ELEMENT).withElementId("event_sub_start").withElementType(BpmnElementType.START_EVENT).withProcessInstanceKey(processInstanceKey).getFirst();
        Record subProcessActivated = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementId("event_sub_proc").withElementType(BpmnElementType.EVENT_SUB_PROCESS).withProcessInstanceKey(processInstanceKey).getFirst();
        ((ProcessInstanceRecordValueAssert)((ProcessInstanceRecordValueAssert)((ProcessInstanceRecordValueAssert)((ProcessInstanceRecordValueAssert)((ProcessInstanceRecordValueAssert)io.camunda.zeebe.protocol.record.Assertions.assertThat((ProcessInstanceRecordValue)((ProcessInstanceRecordValue)startEventActivate.getValue())).hasProcessDefinitionKey(this.currentProcess.getProcessDefinitionKey())).hasProcessInstanceKey(processInstanceKey)).hasBpmnElementType(BpmnElementType.START_EVENT)).hasElementId("event_sub_start")).hasVersion(this.currentProcess.getVersion())).hasFlowScopeKey(subProcessActivated.getKey());
        InterruptingEventSubprocessTest.assertEventSubprocessLifecycle(processInstanceKey);
    }

    @Test
    public void shouldTriggerEventSubprocessAndCreateLocalScopeVariable() {
        BpmnModelInstance model = InterruptingEventSubprocessTest.process(InterruptingEventSubprocessTest.withEventSubprocessAndLocalScopeVariable(this.builder));
        long processInstanceKey = this.createInstanceAndTriggerEvent(model);
        Record subProcessActivated = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementId("event_sub_proc").withElementType(BpmnElementType.EVENT_SUB_PROCESS).withProcessInstanceKey(processInstanceKey).getFirst();
        InterruptingEventSubprocessTest.assertEventSubprocessLifecycle(processInstanceKey);
        RecordingExporter.variableRecords().withProcessInstanceKey(processInstanceKey).withName("localScope").withScopeKey(subProcessActivated.getKey()).await();
    }

    @Test
    public void shouldInterruptAndCompleteParent() {
        BpmnModelInstance model = InterruptingEventSubprocessTest.process(InterruptingEventSubprocessTest.withEventSubprocess(this.builder));
        long processInstanceKey = this.createInstanceAndTriggerEvent(model);
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldInterruptExecutionWaitingOnParallelGateway() {
        BpmnModelInstance process = ((ServiceTaskBuilder)((ServiceTaskBuilder)InterruptingEventSubprocessTest.withEventSubprocess(this.builder).startEvent("start_proc").parallelGateway("fork").serviceTask("task-1", t -> t.zeebeJobType("task-1")).sequenceFlowId("task-1-to-join")).parallelGateway("join").moveToNode("fork").serviceTask("task-2", t -> t.zeebeJobType(JOB_TYPE)).sequenceFlowId("task-2-to-join")).connectTo("join").endEvent("end_proc").done();
        long processInstanceKey = this.createInstanceAndWaitForTask(process);
        ENGINE.job().ofInstance(processInstanceKey).withType("task-1").complete();
        RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.SEQUENCE_FLOW_TAKEN).withProcessInstanceKey(processInstanceKey).withElementId("task-1-to-join").await();
        this.triggerEventSubprocess.accept(processInstanceKey);
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldInterruptNestedSubprocess() {
        Consumer<EventSubProcessBuilder> eventSubprocess = eventSubProcess -> this.builder.apply((StartEventBuilder)eventSubProcess.startEvent("event_sub_start").interrupting(true)).endEvent("event_sub_end");
        Consumer<SubProcessBuilder> embeddedSubprocess = subProcess -> subProcess.embeddedSubProcess().eventSubProcess("event_sub_proc", eventSubprocess).startEvent("sub_start").serviceTask("task", t -> t.zeebeJobType(JOB_TYPE)).endEvent("sub_end");
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("proc_start").subProcess("sub_proc", embeddedSubprocess).endEvent("end_proc").done();
        long processInstanceKey = this.createInstanceAndTriggerEvent(process);
        Record subProcess2 = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.COMPLETE_ELEMENT).withProcessInstanceKey(processInstanceKey).withElementId("sub_proc").getFirst();
        Record eventSubproc = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.COMPLETE_ELEMENT).withProcessInstanceKey(processInstanceKey).withElementId("event_sub_proc").getFirst();
        Assertions.assertThat((long)((ProcessInstanceRecordValue)eventSubproc.getValue()).getFlowScopeKey()).isEqualTo(subProcess2.getKey());
        Assertions.assertThat((long)((ProcessInstanceRecordValue)subProcess2.getValue()).getFlowScopeKey()).isEqualTo(processInstanceKey);
        Assertions.assertThat((long)subProcess2.getSourceRecordPosition()).isEqualTo(eventSubproc.getPosition());
        Assertions.assertThat((Object)((Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_TERMINATED).withProcessInstanceKey(processInstanceKey).withElementId("task").getFirst())).isNotNull();
    }

    @Test
    public void shouldHaveScopeVariableIfInterrupting() {
        BpmnModelInstance model = InterruptingEventSubprocessTest.process(InterruptingEventSubprocessTest.withEventSubprocessTask(this.builder, this.helper.getJobType()));
        long processInstanceKey = this.createInstanceAndWaitForTask(model);
        long procTaskKey = ((Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId("task").getFirst()).getKey();
        ENGINE.variables().ofScope(procTaskKey).withDocument(Map.of("y", 2)).withUpdateSemantic(VariableDocumentUpdateSemantic.LOCAL).update();
        this.triggerEventSubprocess.accept(processInstanceKey);
        Assertions.assertThat((boolean)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).withType(this.helper.getJobType()).exists()).isTrue();
        Record<JobBatchRecordValue> job = ENGINE.jobs().withType(this.helper.getJobType()).activate();
        Map jobVariables = ((JobRecordValue)((JobBatchRecordValue)job.getValue()).getJobs().iterator().next()).getVariables();
        Assertions.assertThat((Map)jobVariables).containsOnly(new Map.Entry[]{Map.entry("key", 123)});
    }

    @Test
    public void shouldNotPropagateVariablesToScope() {
        BpmnModelInstance model = InterruptingEventSubprocessTest.process(InterruptingEventSubprocessTest.withEventSubprocessTask(this.builder, this.helper.getJobType()));
        long processInstanceKey = this.createInstanceAndTriggerEvent(model);
        long eventSubprocKey = ((Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.EVENT_SUB_PROCESS).getFirst()).getKey();
        ENGINE.variables().ofScope(eventSubprocKey).withDocument(Map.of("y", 2)).withUpdateSemantic(VariableDocumentUpdateSemantic.LOCAL).update();
        ENGINE.job().ofInstance(processInstanceKey).withType(this.helper.getJobType()).complete();
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().withScopeKey(processInstanceKey)).extracting(r -> ((VariableRecordValue)r.getValue()).getName()).doesNotContain((Object[])new String[]{"y"});
    }

    @Test
    public void shouldCloseEventSubscriptions() {
        ProcessBuilder eventSubprocess = InterruptingEventSubprocessTest.withEventSubprocess(this.builder);
        eventSubprocess.eventSubProcess("message-event-subprocess", s -> ((StartEventBuilder)s.startEvent().message(m -> m.name("other-message").zeebeCorrelationKeyExpression("key"))).endEvent()).eventSubProcess("timer-event-subprocess", s -> ((StartEventBuilder)s.startEvent("other-timer").timerWithDuration("P1D")).endEvent());
        long processInstanceKey = this.createInstanceAndWaitForTask(InterruptingEventSubprocessTest.process(eventSubprocess));
        RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).withProcessInstanceKey(processInstanceKey).withMessageName("other-message").await();
        this.triggerEventSubprocess.accept(processInstanceKey);
        ((OptionalAssert)Assertions.assertThat((Optional)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.DELETED).withProcessInstanceKey(processInstanceKey).withMessageName("other-message").findFirst()).describedAs("Expected the message subscription to be deleted", new Object[0])).isPresent();
        ((OptionalAssert)Assertions.assertThat((Optional)RecordingExporter.timerRecords((TimerIntent)TimerIntent.CANCELED).withProcessInstanceKey(processInstanceKey).withHandlerNodeId("other-timer").findFirst()).describedAs("Expected the timer to be canceled", new Object[0])).isPresent();
    }

    private static void assertEventSubprocessLifecycle(long processInstanceKey) {
        List events = ((ProcessInstanceRecordStream)((ProcessInstanceRecordStream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).onlyEvents()).filter(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId().startsWith("event_sub_"))).limit(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_COMPLETED && ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType() == BpmnElementType.EVENT_SUB_PROCESS)).asList();
        Assertions.assertThat((List)events).extracting(new Function[]{Record::getIntent, e -> ((ProcessInstanceRecordValue)e.getValue()).getElementId()}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_ACTIVATING, "event_sub_proc"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_ACTIVATED, "event_sub_proc"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_ACTIVATING, "event_sub_start"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_ACTIVATED, "event_sub_start"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_COMPLETING, "event_sub_start"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_COMPLETED, "event_sub_start"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_ACTIVATING, "event_sub_end"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_ACTIVATED, "event_sub_end"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_COMPLETING, "event_sub_end"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_COMPLETED, "event_sub_end"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_COMPLETING, "event_sub_proc"}), Assertions.tuple((Object[])new Object[]{ProcessInstanceIntent.ELEMENT_COMPLETED, "event_sub_proc"})});
    }

    private long createInstanceAndTriggerEvent(BpmnModelInstance model) {
        long processInstanceKey = this.createInstanceAndWaitForTask(model);
        this.triggerEventSubprocess.accept(processInstanceKey);
        return processInstanceKey;
    }

    private long createInstanceAndWaitForTask(BpmnModelInstance model) {
        this.currentProcess = (ProcessMetadataValue)((DeploymentRecordValue)ENGINE.deployment().withXmlResource(model).deploy().getValue()).getProcessesMetadata().get(0);
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariables(Map.of("key", 123)).create();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).exists()).describedAs("Expected job to be created", new Object[0])).isTrue();
        return processInstanceKey;
    }

    private static BpmnModelInstance process(ProcessBuilder processBuilder) {
        return processBuilder.startEvent("start_proc").serviceTask("task", t -> t.zeebeJobType(JOB_TYPE)).endEvent("end_proc").done();
    }

    private static ProcessBuilder withEventSubprocess(Function<StartEventBuilder, StartEventBuilder> builder) {
        ProcessBuilder process = Bpmn.createExecutableProcess((String)PROCESS_ID);
        builder.apply((StartEventBuilder)process.eventSubProcess("event_sub_proc").startEvent("event_sub_start").interrupting(true)).endEvent("event_sub_end");
        return process;
    }

    private static ProcessBuilder withEventSubprocessAndLocalScopeVariable(Function<StartEventBuilder, StartEventBuilder> builder) {
        ProcessBuilder process = Bpmn.createExecutableProcess((String)PROCESS_ID);
        builder.apply((StartEventBuilder)((EventSubProcessBuilder)process.eventSubProcess("event_sub_proc").zeebeInputExpression("=null", "localScope")).startEvent("event_sub_start").interrupting(true)).endEvent("event_sub_end");
        return process;
    }

    private static ProcessBuilder withEventSubprocessTask(Function<StartEventBuilder, StartEventBuilder> builder, String jobType) {
        ProcessBuilder process = Bpmn.createExecutableProcess((String)PROCESS_ID);
        builder.apply((StartEventBuilder)process.eventSubProcess("event_sub_proc").startEvent("event_sub_start").interrupting(true)).serviceTask("event_sub_task", t -> t.zeebeJobType(jobType)).endEvent("event_sub_end");
        return process;
    }
}

