/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.bpmn.compensation;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.AbstractThrowEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.BoundaryEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.CallActivityBuilder;
import io.camunda.zeebe.model.bpmn.builder.CompensateEventDefinitionBuilder;
import io.camunda.zeebe.model.bpmn.builder.EventSubProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.ExclusiveGatewayBuilder;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.SubProcessBuilder;
import io.camunda.zeebe.model.bpmn.instance.EndEvent;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.CompensationSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.VariableIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.BpmnEventType;
import io.camunda.zeebe.protocol.record.value.CompensationSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class CompensationEventExecutionTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String PROCESS_ID = "compensation-process";
    private static final String CHILD_PROCESS_ID = "child-process";
    private static final String SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY = "compensableActivity";
    private static final String SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2 = "compensableActivity2";
    private static final String SERVICE_TASK_TYPE_COMPENSATION_HANDLER = "compensationHandler";
    private static final String SERVICE_TASK_TYPE_COMPENSATION_HANDLER2 = "compensationHandler2";
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    @Test
    public void shouldExecuteAProcessWithCompensationIntermediateEvent() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().userTask().intermediateThrowEvent("compensation-event", i -> i.compensateEventDefinition().compensateEventDefinitionDone()).endEvent().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("io.camunda.zeebe:userTask").complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent, r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.USER_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.UNSPECIFIED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED, BpmnEventType.COMPENSATION}), Assertions.tuple((Object[])new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.COMPENSATION}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.NONE}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.UNSPECIFIED})});
    }

    @Test
    public void shouldExecuteAProcessWithCompensationEndEvent() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().userTask().endEvent("compensation-event", e -> e.compensateEventDefinition().compensateEventDefinitionDone()).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("io.camunda.zeebe:userTask").complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent, r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.USER_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.UNSPECIFIED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED, BpmnEventType.COMPENSATION}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.COMPENSATION}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.UNSPECIFIED})});
    }

    @Test
    public void shouldCreateAndUpdateCompensationSubscriptionForCompletedTask() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-throw-event.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Record compensationActivityActivated = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId("ActivityToCompensate").getFirst();
        Record compensationThrowEventActivated = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.INTERMEDIATE_THROW_EVENT).withEventType(BpmnEventType.COMPENSATION).getFirst();
        Record compensationHandlerActivated = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId("CompensationHandler").getFirst();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(3L)).extracting(Record::getValue).extracting(new Function[]{CompensationSubscriptionRecordValue::getTenantId, CompensationSubscriptionRecordValue::getProcessInstanceKey, CompensationSubscriptionRecordValue::getProcessDefinitionKey, CompensationSubscriptionRecordValue::getCompensableActivityId, CompensationSubscriptionRecordValue::getCompensableActivityInstanceKey, CompensationSubscriptionRecordValue::getCompensableActivityScopeKey, CompensationSubscriptionRecordValue::getCompensationHandlerId}).containsOnly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)compensationActivityActivated.getValue()).getTenantId(), processInstanceKey, ((ProcessInstanceRecordValue)compensationActivityActivated.getValue()).getProcessDefinitionKey(), "ActivityToCompensate", compensationActivityActivated.getKey(), ((ProcessInstanceRecordValue)compensationActivityActivated.getValue()).getFlowScopeKey(), "CompensationHandler"})});
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(3L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getThrowEventId(), r -> ((CompensationSubscriptionRecordValue)r.getValue()).getThrowEventInstanceKey(), r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerInstanceKey()}).containsSequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.CREATED, "", -1L, -1L}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationThrowEvent", compensationThrowEventActivated.getKey(), compensationHandlerActivated.getKey()}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationThrowEvent", compensationThrowEventActivated.getKey(), compensationHandlerActivated.getKey()})});
    }

    @Test
    public void shouldActivateAndCompleteCompensationHandlerForIntermediateThrowEvent() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-throw-event.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent, r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, "ActivityToCompensate"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationThrowEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATING, "CompensationBoundaryEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationBoundaryEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETING, "CompensationBoundaryEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationBoundaryEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationThrowEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, PROCESS_ID})});
    }

    @Test
    public void shouldActivateAndCompleteCompensationHandlerForEndEvent() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-end-event.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent, r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, "ActivityToCompensate"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationEndEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATING, "CompensationBoundaryEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationBoundaryEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETING, "CompensationBoundaryEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationBoundaryEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationEndEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, PROCESS_ID})});
    }

    @Test
    public void shouldActivateAndCompleteMultipleCompensationHandlerForThrowEvent() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/multiple-compensation-throw-event.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent, r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationThrowEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler2"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationThrowEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, PROCESS_ID})});
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(6L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler2"})});
    }

    @Test
    public void shouldActivateAndCompleteMultipleCompensationHandlerForEndEvent() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/multiple-compensation-end-event.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent, r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationEndEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler2"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationEndEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, PROCESS_ID})});
    }

    @Test
    public void shouldTerminateCompensationHandler() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-throw-event.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.processInstance().withInstanceKey(processInstanceKey).cancel();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceTerminated()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent, r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationThrowEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_TERMINATED, "CompensationThrowEvent"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_TERMINATED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_TERMINATED, PROCESS_ID})});
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(3L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.DELETED, "CompensationHandler"})});
    }

    @Test
    public void shouldDeleteAllSubscriptionsWhenProcessIsCompletedWithoutTriggerCompensationHandler() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-no-throw-event.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("io.camunda.zeebe:userTask").complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(2L)).extracting(new Function[]{Record::getValueType, Record::getIntent}).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.DELETED})});
    }

    @Test
    public void shouldDeleteAllSubscriptionsWhenProcessIsTerminatedWithoutTriggerCompensationHandler() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-no-throw-event-terminate.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("io.camunda.zeebe:userTask").complete();
        ENGINE.processInstance().withInstanceKey(processInstanceKey).cancel();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(2L)).extracting(new Function[]{Record::getValueType, Record::getIntent}).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.DELETED})});
    }

    @Test
    public void shouldTriggerCompensationHandlerInSubprocesses() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-embedded-subprocess.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(8L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler2"})});
    }

    @Test
    public void shouldNotTriggerCompensationIfSubprocessIsNotCompleted() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-single-embedded-subprocess.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("completableActivity").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("NotActivableTask").complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(4L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensableActivityId()}).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.DELETED, "embedded-subprocess"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.DELETED, "ActivityToCompensate"})});
    }

    @Test
    public void shouldNotTriggerCompensationOnParentScope() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-embedded-subprocess-parent.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(5L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler2"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.DELETED, "CompensationHandler"})});
    }

    @Test
    public void shouldNotCreateSubprocessSubscriptionWithoutChildSubscription() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/subprocess-after-compensation-activity.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldInvokeCompensationHandlerTheSameAmountAsExecuted() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-multi-instance.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY, 3);
        this.completeJobs(processInstanceKey, SERVICE_TASK_TYPE_COMPENSATION_HANDLER, 3);
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(20L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldTriggerOnlyCorrectHandlerForMultiInstance() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-throw-error.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY, 3);
        AtomicInteger jobCount = new AtomicInteger(1);
        ((JobBatchRecordValue)ENGINE.jobs().withType("activity").activate().getValue()).getJobKeys().forEach(jobKey -> {
            if (jobCount.get() == 2) {
                ENGINE.job().withKey((long)jobKey).throwError();
            } else {
                ENGINE.job().withKey((long)jobKey).complete();
            }
            jobCount.getAndIncrement();
        });
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(8L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldTriggerCompensationHandlerOnlyOnce() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-multi-throw-event.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(6L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, ""}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
    }

    @Test
    public void shouldCompleteSubprocessAfterAllCompensationHandlerAreCompleted() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-subprocess-multi-handler.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(9L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler2"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, ""})});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerCompensationHandlerIfTheParentSubprocessIsNotCompleted() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-multi-subprocess.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("activity2").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("activity").complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(6L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.CREATED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.DELETED, "CompensationHandler"})});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerHandlersForMultiInstanceInsideNotCompletedSubprocess() {
        Consumer<SubProcessBuilder> subprocessLevel2 = subprocess -> ((SubProcessBuilder)subprocess.multiInstance(m -> m.zeebeInputCollectionExpression("[1,2,3]"))).embeddedSubProcess().startEvent().serviceTask("A", task -> task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"));
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess-1", subprocess -> subprocess.embeddedSubProcess().startEvent().subProcess("subprocess-2", subprocessLevel2).serviceTask("B", task -> task.zeebeJobType("B")).endEvent()).parallelGateway("join").moveToNode("fork").serviceTask("C", task -> task.zeebeJobType("C")).intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().compensateEventDefinitionDone().connectTo("join").endEvent().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, "A", 3);
        ENGINE.job().ofInstance(processInstanceKey).withType("C").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerHandlersForSubprocessIfParentMultiInstancesSubprocessesAreNotCompleted() {
        Consumer<SubProcessBuilder> subprocessLevel2 = subprocess -> subprocess.embeddedSubProcess().startEvent().serviceTask("A", task -> task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"));
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess", subprocess -> ((SubProcessBuilder)subprocess.multiInstance(m -> m.zeebeInputCollectionExpression("[1,2,3]"))).embeddedSubProcess().startEvent().subProcess("subprocess-2", subprocessLevel2).serviceTask("B", task -> task.zeebeJobType("B"))).parallelGateway("join").moveToNode("fork").serviceTask("C", task -> task.zeebeJobType("C")).intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().compensateEventDefinitionDone().connectTo("join").endEvent().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, "A", 3);
        List jobKeysOfTaskB = ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).withType("B").limit(3L)).map(Record::getKey).toList();
        ENGINE.job().withKey((Long)jobKeysOfTaskB.getFirst()).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("C").complete();
        jobKeysOfTaskB.stream().skip(1L).forEach(key -> ENGINE.job().withKey((long)key).complete());
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldCompleteThrowEventThatTriggeredCompensationHandler() {
        Consumer<SubProcessBuilder> subprocessBuilder = subprocess -> ((SubProcessBuilder)subprocess.multiInstance(m -> m.zeebeInputCollectionExpression("[1,2,3]"))).embeddedSubProcess().startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).intermediateThrowEvent("compensation-throw-event", AbstractThrowEventBuilder::compensateEventDefinition).endEvent().zeebeOutputExpression("loopCounter", "completed");
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().subProcess("subprocess", subprocessBuilder).endEvent().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, "A", 3);
        List jobKeysUndoA = ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).withType("Undo-A").limit(3L)).map(Record::getKey).toList();
        ENGINE.job().withKey((Long)jobKeysUndoA.get(1)).complete();
        ENGINE.job().withKey((Long)jobKeysUndoA.get(2)).complete();
        ENGINE.job().withKey((Long)jobKeysUndoA.get(0)).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})});
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords()).extracting(Record::getValue).extracting(new Function[]{VariableRecordValue::getName, VariableRecordValue::getValue}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"loopCounter", "1"}), Assertions.tuple((Object[])new Object[]{"loopCounter", "2"}), Assertions.tuple((Object[])new Object[]{"loopCounter", "3"}), Assertions.tuple((Object[])new Object[]{"completed", "2"}), Assertions.tuple((Object[])new Object[]{"completed", "3"}), Assertions.tuple((Object[])new Object[]{"completed", "1"})});
    }

    @Test
    public void shouldApplyInputMappingsOfCompensationHandler() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> ((ServiceTaskBuilder)compensation.serviceTask("Undo-A").zeebeJobType("Undo-A")).zeebeInputExpression("x + 1", "y"))).endEvent().compensateEventDefinition().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("x", 1).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey)).extracting(new Function[]{Record::getValueType, Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.TRIGGERED}), Assertions.tuple((Object[])new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{ValueType.VARIABLE, VariableIntent.CREATED}), Assertions.tuple((Object[])new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATED})});
        long compensationHandlerInstanceKey = ((Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId("Undo-A").getFirst()).getKey();
        Record variableCreated = (Record)RecordingExporter.variableRecords((VariableIntent)VariableIntent.CREATED).withProcessInstanceKey(processInstanceKey).withName("y").getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((VariableRecordValue)((VariableRecordValue)variableCreated.getValue())).hasScopeKey(compensationHandlerInstanceKey).hasValue("2");
    }

    @Test
    public void shouldApplyOutputMappingsOfCompensationHandler() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> ((ServiceTaskBuilder)compensation.serviceTask("Undo-A").zeebeJobType("Undo-A")).zeebeOutputExpression("x + 1", "y"))).endEvent().compensateEventDefinition().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").withVariable("x", 1).complete();
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey)).extracting(new Function[]{Record::getValueType, Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.TRIGGERED}), Assertions.tuple((Object[])new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{ValueType.VARIABLE, VariableIntent.CREATED}), Assertions.tuple((Object[])new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.COMPLETED})});
        Record variableCreated = (Record)RecordingExporter.variableRecords((VariableIntent)VariableIntent.CREATED).withProcessInstanceKey(processInstanceKey).withName("y").getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((VariableRecordValue)((VariableRecordValue)variableCreated.getValue())).hasScopeKey(processInstanceKey).hasValue("2");
    }

    @Test
    public void shouldPropagateVariablesOfCompensationHandler() {
        BpmnModelInstance process = ((SubProcessBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().subProcess("subprocess").zeebeInputExpression("0", "local")).embeddedSubProcess().startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).endEvent().compensateEventDefinition().subProcessDone().endEvent().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").withVariables(Map.ofEntries(Map.entry("local", 1), Map.entry("global", 2))).complete();
        long subprocessInstanceKey = ((Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId("subprocess").getFirst()).getKey();
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords()).extracting(Record::getValue).extracting(new Function[]{VariableRecordValue::getScopeKey, VariableRecordValue::getName, VariableRecordValue::getValue}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{subprocessInstanceKey, "local", "0"}), Assertions.tuple((Object[])new Object[]{subprocessInstanceKey, "local", "1"}), Assertions.tuple((Object[])new Object[]{processInstanceKey, "global", "2"})});
    }

    @Test
    public void shouldTriggerCompensationForMultiInstanceActivityOnlyOnce() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-multi-instance-activity.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY, 3);
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(3L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldTriggerCompensationAfterAllMultiInstanceActivitiesAreCompleted() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-multi-instance-activity-parallel.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY, 3);
        this.completeJobs(processInstanceKey, "activity", 1);
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(3L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.CREATED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerCompensationIfMultiInstanceActivitiesAreNotCompleted() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-multi-instance-activity-parallel.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        List jobKeys = ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).limit(3L)).map(Record::getKey).toList();
        ENGINE.job().withKey((Long)jobKeys.getFirst()).complete();
        this.completeJobs(processInstanceKey, "activity", 1);
        jobKeys.stream().skip(1L).forEach(key -> ENGINE.job().withKey((long)key).complete());
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey)).extracting(new Function[]{Record::getValueType, Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.CREATED}), Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.DELETED}), Assertions.tuple((Object[])new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.TRIGGERED})});
    }

    @Test
    public void shouldTriggerCompensationForActivityOnIntermediateThrowEvent() {
        BpmnModelInstance process = ((CompensateEventDefinitionBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).parallelGateway("join").moveToNode("fork").serviceTask("B", task -> ((ServiceTaskBuilder)task.zeebeJobType("B")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-B").zeebeJobType("Undo-B"))).connectTo("join").intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().activityRef("A")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForActivityOnEndEvent() {
        BpmnModelInstance process = ((CompensateEventDefinitionBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).parallelGateway("join").moveToNode("fork").serviceTask("B", task -> ((ServiceTaskBuilder)task.zeebeJobType("B")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-B").zeebeJobType("Undo-B"))).connectTo("join").endEvent("compensation-throw-event").compensateEventDefinition().activityRef("A")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForActivityTheSameAmountAsExecuted() {
        BpmnModelInstance process = ((ExclusiveGatewayBuilder)((StartEventBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().zeebeOutputExpression("0", "iteration")).serviceTask("A", task -> ((ServiceTaskBuilder)((ServiceTaskBuilder)task.zeebeJobType("A")).zeebeOutputExpression("iteration + 1", "iteration")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).exclusiveGateway("loop").defaultFlow()).connectTo("A").moveToNode("loop").conditionExpression("iteration > 1").intermediateThrowEvent("compensation-throw-event", event -> event.compensateEventDefinition().activityRef("A")).endEvent().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, "A", 2);
        this.completeJobs(processInstanceKey, "Undo-A", 2);
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerCompensationForActivityAgain() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).parallelGateway("fork").intermediateThrowEvent("compensation-throw-event-1", event -> event.compensateEventDefinition().activityRef("A")).moveToNode("fork").intermediateThrowEvent("compensation-throw-event-2", event -> event.compensateEventDefinition().activityRef("A")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event-2", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event-1", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event-1", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event-2", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).containsOnlyOnce((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerCompensationForActivityIfActive() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).moveToNode("fork").serviceTask("B", task -> task.zeebeJobType("B")).intermediateThrowEvent("compensation-throw-event", event -> event.compensateEventDefinition().activityRef("A")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForSubprocess() {
        BpmnModelInstance process = ((CompensateEventDefinitionBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess").embeddedSubProcess().startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).subProcessDone().parallelGateway("join").moveToNode("fork").serviceTask("B", task -> ((ServiceTaskBuilder)task.zeebeJobType("B")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-B").zeebeJobType("Undo-B"))).connectTo("join").intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().activityRef("subprocess")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForMultiInstanceSubprocess() {
        BpmnModelInstance process = ((CompensateEventDefinitionBuilder)((SubProcessBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess").multiInstance(m -> m.zeebeInputCollectionExpression("[1,2,3]"))).embeddedSubProcess().startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).subProcessDone().parallelGateway("join").moveToNode("fork").serviceTask("B", task -> ((ServiceTaskBuilder)task.zeebeJobType("B")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-B").zeebeJobType("Undo-B"))).connectTo("join").intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().activityRef("subprocess")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, "A", 3);
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        this.completeJobs(processInstanceKey, "Undo-A", 3);
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerCompensationForSubprocessAgain() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().subProcess("subprocess").embeddedSubProcess().startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).subProcessDone().parallelGateway("fork").intermediateThrowEvent("compensation-throw-event-1", event -> event.compensateEventDefinition().activityRef("subprocess")).moveToNode("fork").intermediateThrowEvent("compensation-throw-event-2", event -> event.compensateEventDefinition().activityRef("subprocess")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event-2", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event-1", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event-1", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event-2", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).containsOnlyOnce((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerCompensationForSubprocessIfActive() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess").embeddedSubProcess().startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).subProcessDone().moveToNode("fork").serviceTask("B", task -> task.zeebeJobType("B")).intermediateThrowEvent("compensation-throw-event", event -> event.compensateEventDefinition().activityRef("subprocess")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldInvokeSubprocessCompensationHandler() {
        Consumer<BoundaryEventBuilder> compensationSubprocess = compensation -> compensation.subProcess().embeddedSubProcess().startEvent().serviceTask("B", t -> t.zeebeJobType("B")).serviceTask("C", t -> t.zeebeJobType("C")).endEvent();
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensationSubprocess)).endEvent().compensateEventDefinition().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("C").complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SUB_PROCESS, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SUB_PROCESS, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCompensateSubprocess() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().subProcess("subprocess", subprocess -> subprocess.embeddedSubProcess().startEvent().serviceTask("A", task -> task.zeebeJobType("A")).endEvent()).boundaryEvent().compensation(compensation -> compensation.serviceTask("B").zeebeJobType("B")).moveToActivity("subprocess").endEvent().compensateEventDefinition().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.SUB_PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCompensateSubprocessWithInnerCompensationHandler() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().subProcess("subprocess", subprocess -> subprocess.embeddedSubProcess().startEvent().serviceTask("A", task -> task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).boundaryEvent().compensation(compensation -> compensation.serviceTask("B").zeebeJobType("B")).moveToActivity("subprocess").endEvent("compensation-throw-event").compensateEventDefinition().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldInvokeCallActivityCompensationHandler() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.callActivity("Undo-A").zeebeProcessId(CHILD_PROCESS_ID))).endEvent().compensateEventDefinition().done();
        BpmnModelInstance childProcess = Bpmn.createExecutableProcess((String)CHILD_PROCESS_ID).startEvent().endEvent().done();
        ENGINE.deployment().withXmlResource(process).withXmlResource(childProcess).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().limit(PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED)).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnProcessId(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{PROCESS_ID, BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, BpmnElementType.CALL_ACTIVITY, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{CHILD_PROCESS_ID, BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{CHILD_PROCESS_ID, BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, BpmnElementType.CALL_ACTIVITY, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCompensateCallActivity() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().callActivity("A", callActivity -> ((CallActivityBuilder)callActivity.zeebeProcessId(CHILD_PROCESS_ID)).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).endEvent().compensateEventDefinition().done();
        BpmnModelInstance childProcess = Bpmn.createExecutableProcess((String)CHILD_PROCESS_ID).startEvent().endEvent().done();
        ENGINE.deployment().withXmlResource(process).withXmlResource(childProcess).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.CALL_ACTIVITY, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerCompensationIfCallActivityIsActive() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").callActivity("A", callActivity -> ((CallActivityBuilder)callActivity.zeebeProcessId(CHILD_PROCESS_ID)).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).parallelGateway("join").endEvent().moveToNode("fork").serviceTask("B", task -> task.zeebeJobType("B")).intermediateThrowEvent("compensation-throw-event", AbstractThrowEventBuilder::compensateEventDefinition).connectTo("join").endEvent().done();
        BpmnModelInstance childProcess = Bpmn.createExecutableProcess((String)CHILD_PROCESS_ID).startEvent().serviceTask("A", task -> task.zeebeJobType("A")).endEvent().done();
        ENGINE.deployment().withXmlResource(process).withXmlResource(childProcess).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        long childProcessInstanceKey = ((Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withParentProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.PROCESS).getFirst()).getKey();
        ENGINE.job().ofInstance(childProcessInstanceKey).withType("A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerCompensationForChildProcess() {
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().callActivity("A", callActivity -> ((CallActivityBuilder)callActivity.zeebeProcessId(CHILD_PROCESS_ID)).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).endEvent("compensation-throw-event").compensateEventDefinition().done();
        BpmnModelInstance childProcess = Bpmn.createExecutableProcess((String)CHILD_PROCESS_ID).startEvent().serviceTask("B", task -> task.zeebeJobType("B")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-B").zeebeJobType("Undo-B")).done();
        ENGINE.deployment().withXmlResource(process).withXmlResource(childProcess).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        long childProcessInstanceKey = ((Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withParentProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.PROCESS).getFirst()).getKey();
        ENGINE.job().ofInstance(childProcessInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().limit(PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED)).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnProcessId(), r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CHILD_PROCESS_ID, "B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, "A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, "compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, "Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, "compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CHILD_PROCESS_ID, "Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldInvokeMultiInstanceActivityCompensationHandler() {
        Consumer<BoundaryEventBuilder> compensationHandler = compensation -> ((ServiceTaskBuilder)compensation.serviceTask("Undo-A").zeebeJobType("Undo-A")).multiInstance().zeebeInputCollectionExpression("[1,2,3]");
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensationHandler)).endEvent("compensation-throw-event").compensateEventDefinition().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        this.completeJobs(processInstanceKey, "Undo-A", 3);
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"compensation-throw-event", BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", BpmnElementType.MULTI_INSTANCE_BODY, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-A", BpmnElementType.MULTI_INSTANCE_BODY, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldInvokeMultiInstanceSubprocessCompensationHandler() {
        Consumer<BoundaryEventBuilder> compensationHandler = compensation -> ((ServiceTaskBuilder)((SubProcessBuilder)compensation.subProcess().multiInstance(m -> m.zeebeInputCollectionExpression("[1,2,3]"))).embeddedSubProcess().startEvent().serviceTask("Undo-A").zeebeJobType("Undo-A")).endEvent();
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensationHandler)).endEvent().compensateEventDefinition().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        this.completeJobs(processInstanceKey, "Undo-A", 3);
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SUB_PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SUB_PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SUB_PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldDeleteSubscriptionForTerminatedSubprocess() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-subprocess-terminated.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey)).extracting(new Function[]{Record::getValueType, Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.CREATED}), Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.DELETED}), Assertions.tuple((Object[])new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.TRIGGERED})});
    }

    @Test
    public void shouldDeleteSubscriptionInTerminatedSubprocessScope() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-subprocess-terminated-multiscope.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("C").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("undoA").complete();
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(7L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.CREATED, "undoA"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.CREATED, "undoB"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.CREATED, "undoC"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.DELETED, "undoB"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.DELETED, "undoC"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "undoA"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.COMPLETED, "undoA"})});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldDeleteSubscriptionInTerminatedSubprocessMultiInstanceScope() {
        BpmnModelInstance process = this.createModelFromClasspathResource("/compensation/compensation-multiinstance-subprocess-terminated.bpmn");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.completeJobs(processInstanceKey, "A", 2);
        Assertions.assertThat((Stream)RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(6L)).extracting(new Function[]{Record::getIntent, r -> ((CompensationSubscriptionRecordValue)r.getValue()).getCompensationHandlerId()}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.CREATED, "undoA"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "undoA"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.CREATED, "undoA"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.TRIGGERED, "undoA"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.DELETED, "undoA"}), Assertions.tuple((Object[])new Object[]{CompensationSubscriptionIntent.DELETED, "undoA"})});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldTriggerCompensationFromEventSubprocess() {
        Consumer<EventSubProcessBuilder> compensationEventSubprocess = eventSubprocess -> ((StartEventBuilder)eventSubprocess.startEvent().error()).endEvent().compensateEventDefinition();
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).eventSubProcess("event-subprocess", compensationEventSubprocess).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).serviceTask("B", task -> task.zeebeJobType("B")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").withErrorCode("error").throwError();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnEventType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.EVENT_SUB_PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.EVENT_SUB_PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldTriggerCompensationFromEventSubprocessInsideSubprocess() {
        Consumer<EventSubProcessBuilder> compensationEventSubprocess = eventSubprocess -> ((StartEventBuilder)eventSubprocess.startEvent().error()).endEvent("compensation-throw-event").compensateEventDefinition();
        Consumer<SubProcessBuilder> subprocessBuilder = subprocess -> subprocess.embeddedSubProcess().eventSubProcess("event-subprocess", compensationEventSubprocess).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).serviceTask("B", task -> task.zeebeJobType("B"));
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess", subprocessBuilder).parallelGateway("join").moveToNode("fork").serviceTask("C", task -> ((ServiceTaskBuilder)task.zeebeJobType("C")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-C").zeebeJobType("Undo-C"))).connectTo("join").endEvent().done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("C").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").withErrorCode("error").throwError();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-C", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationFromEventSubprocessWithCompensationHandlerInside() {
        Consumer<EventSubProcessBuilder> compensationEventSubprocess = eventSubprocess -> ((StartEventBuilder)eventSubprocess.startEvent().error()).serviceTask("C", task -> ((ServiceTaskBuilder)task.zeebeJobType("C")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-C").zeebeJobType("Undo-C"))).endEvent("compensation-throw-event").compensateEventDefinition();
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).eventSubProcess("event-subprocess", compensationEventSubprocess).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).serviceTask("B", task -> task.zeebeJobType("B")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").withErrorCode("error").throwError();
        ENGINE.job().ofInstance(processInstanceKey).withType("C").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-C").complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"Undo-C", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerCompensationFromEventSubprocessInsideSubprocess() {
        Consumer<SubProcessBuilder> subprocessBuilder = subprocess -> subprocess.embeddedSubProcess().startEvent().serviceTask("D", task -> ((ServiceTaskBuilder)task.zeebeJobType("D")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-D").zeebeJobType("Undo-D"))).endEvent("compensation-throw-event").compensateEventDefinition();
        Consumer<EventSubProcessBuilder> eventSubprocessBuilder = eventSubprocess -> ((StartEventBuilder)eventSubprocess.startEvent().error()).serviceTask("C", task -> ((ServiceTaskBuilder)task.zeebeJobType("C")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-C").zeebeJobType("Undo-C"))).subProcess("subprocess", subprocessBuilder);
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).eventSubProcess("event-subprocess", eventSubprocessBuilder).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).serviceTask("B", task -> task.zeebeJobType("B")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").withErrorCode("error").throwError();
        ENGINE.job().ofInstance(processInstanceKey).withType("C").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("D").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-D").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-D", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-C", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForActivityFromEventSubprocess() {
        Consumer<EventSubProcessBuilder> compensationEventSubprocess = eventSubprocess -> ((StartEventBuilder)eventSubprocess.startEvent().error()).endEvent("compensation-throw-event");
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).eventSubProcess("event-subprocess", compensationEventSubprocess).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).serviceTask("B", task -> ((ServiceTaskBuilder)task.zeebeJobType("B")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-B").zeebeJobType("Undo-B"))).serviceTask("C", task -> task.zeebeJobType("C")).done();
        EndEvent endEvent = (EndEvent)process.getModelElementById("compensation-throw-event");
        endEvent.builder().compensateEventDefinition().activityRef("A");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("C").withErrorCode("error").throwError();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForActivityFromEventSubprocessInsideSubprocess() {
        Consumer<EventSubProcessBuilder> compensationEventSubprocess = eventSubprocess -> ((StartEventBuilder)eventSubprocess.startEvent().error()).endEvent("compensation-throw-event");
        Consumer<SubProcessBuilder> subprocessBuilder = subprocess -> subprocess.embeddedSubProcess().eventSubProcess("event-subprocess", compensationEventSubprocess).startEvent().serviceTask("B", task -> ((ServiceTaskBuilder)task.zeebeJobType("B")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-B").zeebeJobType("Undo-B"))).serviceTask("C", task -> ((ServiceTaskBuilder)task.zeebeJobType("C")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-C").zeebeJobType("Undo-C"))).serviceTask("D", task -> task.zeebeJobType("D"));
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).subProcess("subprocess", subprocessBuilder).endEvent().done();
        EndEvent endEvent = (EndEvent)process.getModelElementById("compensation-throw-event");
        endEvent.builder().compensateEventDefinition().activityRef("B");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("C").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("D").withErrorCode("error").throwError();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-B").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-C", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForActivityInsideEventSubprocess() {
        Consumer<EventSubProcessBuilder> compensationEventSubprocess = eventSubprocess -> ((StartEventBuilder)eventSubprocess.startEvent().error()).serviceTask("C", task -> ((ServiceTaskBuilder)task.zeebeJobType("C")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-C").zeebeJobType("Undo-C"))).endEvent("compensation-throw-event");
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).eventSubProcess("event-subprocess", compensationEventSubprocess).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).serviceTask("B", task -> task.zeebeJobType("B")).done();
        EndEvent endEvent = (EndEvent)process.getModelElementById("compensation-throw-event");
        endEvent.builder().compensateEventDefinition().activityRef("C");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("B").withErrorCode("error").throwError();
        ENGINE.job().ofInstance(processInstanceKey).withType("C").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("Undo-C").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"Undo-C", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerCompensationForActivityFromEventSubprocessIfActive() {
        Consumer<EventSubProcessBuilder> compensationEventSubprocess = eventSubprocess -> ((StartEventBuilder)((StartEventBuilder)eventSubprocess.startEvent().interrupting(false)).signal("signal")).endEvent("compensation-throw-event");
        BpmnModelInstance process = Bpmn.createExecutableProcess((String)PROCESS_ID).eventSubProcess("event-subprocess", compensationEventSubprocess).startEvent().serviceTask("A", task -> ((ServiceTaskBuilder)task.zeebeJobType("A")).boundaryEvent().compensation(compensation -> compensation.serviceTask("Undo-A").zeebeJobType("Undo-A"))).done();
        EndEvent endEvent = (EndEvent)process.getModelElementById("compensation-throw-event");
        endEvent.builder().compensateEventDefinition().activityRef("A");
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.signal().withSignalName("signal").broadcast();
        ENGINE.job().ofInstance(processInstanceKey).withType("A").complete();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"event-subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})})).doesNotContain((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    private BpmnModelInstance createModelFromClasspathResource(String classpath) {
        InputStream resourceAsStream = this.getClass().getResourceAsStream(classpath);
        return Bpmn.readModelFromStream((InputStream)resourceAsStream);
    }

    private void completeJobs(long processInstanceKey, String jobType, int number) {
        ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).withType(jobType).limit((long)number)).map(Record::getKey).forEach(jobKey -> ENGINE.job().withKey((long)jobKey).complete());
    }
}

