package io.zeebe.broker.workflow.processor;

import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.data.WorkflowInstanceSubscriptionRecord;
import io.zeebe.broker.subscription.message.processor.MessageObserver;
import io.zeebe.broker.util.StreamProcessorControl;
import io.zeebe.broker.util.StreamProcessorRule;
import io.zeebe.broker.util.TypedRecordStream;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.BpmnElementType;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.impl.record.value.timer.TimerRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.TimerIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessorTest.class */
public class WorkflowInstanceStreamProcessorTest {
    private static final String PROCESS_ID = "process";
    private static final BpmnModelInstance SERVICE_TASK_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent("start").sequenceFlowId("flow1").serviceTask("task", serviceTaskBuilder -> {
        serviceTaskBuilder.zeebeTaskType("taskType");
    }).sequenceFlowId("flow2").endEvent("end").done();
    private static final BpmnModelInstance SUB_PROCESS_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().subProcess("subProcess").embeddedSubProcess().startEvent().serviceTask("task", serviceTaskBuilder -> {
        serviceTaskBuilder.zeebeTaskType("taskType");
    }).endEvent().subProcessDone().endEvent().done();
    private static final BpmnModelInstance MESSAGE_CATCH_EVENT_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("catch-event", intermediateCatchEventBuilder -> {
        intermediateCatchEventBuilder.message(messageBuilder -> {
            messageBuilder.name("order canceled").zeebeCorrelationKey("orderId");
        });
    }).done();
    private static final BpmnModelInstance TIMER_BOUNDARY_EVENT_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().serviceTask("task1", serviceTaskBuilder -> {
        serviceTaskBuilder.zeebeTaskType("type");
    }).boundaryEvent("timer1").cancelActivity(true).timerWithDuration("PT1S").endEvent("timer1End").moveToActivity("task1").endEvent("end").done();
    private StreamProcessorControl streamProcessor;

    @Rule
    public Timeout timeoutRule = new Timeout(2, TimeUnit.MINUTES);
    public StreamProcessorRule envRule = new StreamProcessorRule();
    public WorkflowInstanceStreamProcessorRule streamProcessorRule = new WorkflowInstanceStreamProcessorRule(this.envRule);

    @Rule
    public RuleChain chain = RuleChain.outerRule(this.envRule).around(this.streamProcessorRule);

    @Before
    public void setUp() {
        this.streamProcessor = this.streamProcessorRule.getStreamProcessor();
    }

    @Test
    public void shouldRejectCancellationInDirectSuccession() {
        this.streamProcessorRule.deploy(SERVICE_TASK_WORKFLOW);
        this.streamProcessor.blockAfterJobEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == JobIntent.CREATE;
        });
        TypedRecord<WorkflowInstanceRecord> createAndReceiveWorkflowInstance = this.streamProcessorRule.createAndReceiveWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process");
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createAndReceiveWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createAndReceiveWorkflowInstance.getValue());
        this.envRule.writeCommand(createAndReceiveWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createAndReceiveWorkflowInstance.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        LifecycleAssert.assertThat((List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList())).compliesWithCompleteLifecycle();
        TypedRecord typedRecord2 = (TypedRecord) this.envRule.events().onlyWorkflowInstanceRecords().onlyRejections().findFirst().get();
        Assertions.assertThat(typedRecord2.getMetadata().getIntent()).isEqualTo(WorkflowInstanceIntent.CANCEL);
        Assertions.assertThat(BufferUtil.bufferAsString(typedRecord2.getMetadata().getRejectionReason())).isEqualTo("Expected to cancel a workflow instance with key '" + createAndReceiveWorkflowInstance.getKey() + "', but no such workflow was found");
    }

    @Test
    public void shouldCancelActivityInStateReady() {
        this.streamProcessorRule.deploy(SERVICE_TASK_WORKFLOW);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(isForElement("start", WorkflowInstanceIntent.ELEMENT_COMPLETED));
        TypedRecord<WorkflowInstanceRecord> createAndReceiveWorkflowInstance = this.streamProcessorRule.createAndReceiveWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process");
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createAndReceiveWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createAndReceiveWorkflowInstance.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        List list = (List) this.envRule.events().onlyWorkflowInstanceRecords().collect(Collectors.toList());
        List list2 = (List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList());
        List list3 = (List) this.envRule.events().onlyStatesOf("task").collect(Collectors.toList());
        LifecycleAssert.assertThat(list2).compliesWithCompleteLifecycle().endsWith(WorkflowInstanceIntent.ELEMENT_TERMINATED, new WorkflowInstanceIntent[0]);
        LifecycleAssert.assertThat(list3).compliesWithCompleteLifecycle();
        WorkflowInstanceAssert.assertThat(list).doesNotEvaluateFlowAfterTerminatingElement("process");
    }

    @Test
    public void shouldCancelScopeBeforeTakingSequenceFlow() {
        this.streamProcessorRule.deploy(SERVICE_TASK_WORKFLOW);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceIntent.ELEMENT_COMPLETING && typedRecord.getValue().getBpmnElementType() == BpmnElementType.SERVICE_TASK;
        });
        TypedRecord<WorkflowInstanceRecord> createAndReceiveWorkflowInstance = this.streamProcessorRule.createAndReceiveWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process");
        });
        this.streamProcessorRule.completeFirstJob();
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createAndReceiveWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createAndReceiveWorkflowInstance.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        List list = (List) this.envRule.events().onlyWorkflowInstanceRecords().collect(Collectors.toList());
        List list2 = (List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList());
        List list3 = (List) this.envRule.events().onlyStatesOf("task").collect(Collectors.toList());
        LifecycleAssert.assertThat(list2).compliesWithCompleteLifecycle().endsWith(WorkflowInstanceIntent.ELEMENT_TERMINATED, new WorkflowInstanceIntent[0]);
        LifecycleAssert.assertThat(list3).compliesWithCompleteLifecycle();
        WorkflowInstanceAssert.assertThat(list).doesNotEvaluateFlowAfterTerminatingElement("process");
    }

    @Test
    public void shouldCancelActivityInStateCompleting() {
        this.streamProcessorRule.deploy(SERVICE_TASK_WORKFLOW);
        this.streamProcessor.blockAfterJobEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == JobIntent.COMPLETED;
        });
        TypedRecord<WorkflowInstanceRecord> createAndReceiveWorkflowInstance = this.streamProcessorRule.createAndReceiveWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process");
        });
        this.streamProcessorRule.completeFirstJob();
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createAndReceiveWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createAndReceiveWorkflowInstance.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        List list = (List) this.envRule.events().onlyWorkflowInstanceRecords().collect(Collectors.toList());
        List list2 = (List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList());
        List list3 = (List) this.envRule.events().onlyStatesOf("task").collect(Collectors.toList());
        LifecycleAssert.assertThat(list2).compliesWithCompleteLifecycle().endsWith(WorkflowInstanceIntent.ELEMENT_TERMINATED, new WorkflowInstanceIntent[0]);
        LifecycleAssert.assertThat(list3).compliesWithCompleteLifecycle();
        WorkflowInstanceAssert.assertThat(list).doesNotEvaluateFlowAfterTerminatingElement("process");
    }

    @Test
    public void shouldCancelAndCompleteJobConcurrentlyInSubProcess() {
        this.streamProcessorRule.deploy(SUB_PROCESS_WORKFLOW);
        this.streamProcessor.blockAfterJobEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == JobIntent.CREATE;
        });
        TypedRecord<WorkflowInstanceRecord> createAndReceiveWorkflowInstance = this.streamProcessorRule.createAndReceiveWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process");
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createAndReceiveWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createAndReceiveWorkflowInstance.getValue());
        this.streamProcessorRule.completeFirstJob();
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        List list = (List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList());
        List list2 = (List) this.envRule.events().onlyStatesOf("subProcess").collect(Collectors.toList());
        List list3 = (List) this.envRule.events().onlyStatesOf("task").collect(Collectors.toList());
        LifecycleAssert.assertThat(list).compliesWithCompleteLifecycle().endsWith(WorkflowInstanceIntent.ELEMENT_TERMINATED, new WorkflowInstanceIntent[0]);
        LifecycleAssert.assertThat(list2).compliesWithCompleteLifecycle();
        LifecycleAssert.assertThat(list3).compliesWithCompleteLifecycle();
    }

    @Test
    public void shouldRetryToOpenMessageSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(isForElement("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.streamProcessorRule.createWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("orderId", "order-123"));
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        TypedRecord<WorkflowInstanceRecord> awaitElementInState = this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        this.envRule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
        this.streamProcessor.unblock();
        ((SubscriptionCommandSender) Mockito.verify(this.streamProcessorRule.getMockSubscriptionCommandSender(), Mockito.timeout(5000L).times(2))).openMessageSubscription(0, awaitElementInState.getValue().getWorkflowInstanceKey(), awaitElementInState.getKey(), BufferUtil.wrapString("order canceled"), BufferUtil.wrapString("order-123"), true);
    }

    @Test
    public void shouldRejectDuplicatedOpenWorkflowInstanceSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(isForElement("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.streamProcessorRule.createWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("orderId", "order-123"));
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        UnpackedObject subscriptionRecordForEvent = subscriptionRecordForEvent(this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        this.streamProcessor.unblock();
        TypedRecord<WorkflowInstanceSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = this.streamProcessorRule.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(WorkflowInstanceSubscriptionIntent.OPEN);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionType()).isEqualTo(RejectionType.INVALID_STATE);
    }

    @Test
    public void shouldRejectDuplicatedCorrelateWorkflowInstanceSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        this.streamProcessorRule.createWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("orderId", "order-123"));
        });
        UnpackedObject subscriptionRecordForEvent = subscriptionRecordForEvent(this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.streamProcessor.blockAfterWorkflowInstanceSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceSubscriptionIntent.OPENED;
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CORRELATE, subscriptionRecordForEvent);
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CORRELATE, subscriptionRecordForEvent);
        this.streamProcessor.unblock();
        TypedRecord<WorkflowInstanceSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = this.streamProcessorRule.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(WorkflowInstanceSubscriptionIntent.CORRELATE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
        ((SubscriptionCommandSender) Mockito.verify(this.streamProcessorRule.getMockSubscriptionCommandSender(), Mockito.timeout(5000L).times(2))).correlateMessageSubscription(ArgumentMatchers.eq(subscriptionRecordForEvent.getSubscriptionPartitionId()), ArgumentMatchers.eq(subscriptionRecordForEvent.getWorkflowInstanceKey()), ArgumentMatchers.eq(subscriptionRecordForEvent.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any());
    }

    @Test
    public void shouldRejectCorrelateWorkflowInstanceSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        TypedRecord<WorkflowInstanceRecord> createAndReceiveWorkflowInstance = this.streamProcessorRule.createAndReceiveWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("orderId", "order-123"));
        });
        TypedRecord<WorkflowInstanceRecord> awaitElementInState = this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        UnpackedObject subscriptionRecordForEvent = subscriptionRecordForEvent(awaitElementInState);
        this.streamProcessor.blockAfterWorkflowInstanceSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceSubscriptionIntent.OPENED;
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createAndReceiveWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, awaitElementInState.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CORRELATE, subscriptionRecordForEvent);
        TypedRecord<WorkflowInstanceSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = this.streamProcessorRule.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(WorkflowInstanceSubscriptionIntent.CORRELATE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionType()).isEqualTo(RejectionType.INVALID_STATE);
    }

    @Test
    public void shouldRetryToCloseMessageSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        TypedRecord<WorkflowInstanceRecord> createAndReceiveWorkflowInstance = this.streamProcessorRule.createAndReceiveWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("orderId", "order-123"));
        });
        TypedRecord<WorkflowInstanceRecord> awaitElementInState = this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        WorkflowInstanceSubscriptionRecord subscriptionRecordForEvent = subscriptionRecordForEvent(awaitElementInState);
        this.envRule.writeCommand(createAndReceiveWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, awaitElementInState.getValue());
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        this.envRule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
        this.streamProcessor.unblock();
        ((SubscriptionCommandSender) Mockito.verify(this.streamProcessorRule.getMockSubscriptionCommandSender(), Mockito.timeout(5000L).times(2))).closeMessageSubscription(subscriptionRecordForEvent.getSubscriptionPartitionId(), subscriptionRecordForEvent.getWorkflowInstanceKey(), subscriptionRecordForEvent.getElementInstanceKey(), subscriptionRecordForEvent.getMessageName());
    }

    @Test
    public void shouldRejectDuplicatedCloseWorkflowInstanceSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        this.streamProcessorRule.createWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("orderId", "order-123"));
        });
        UnpackedObject subscriptionRecordForEvent = subscriptionRecordForEvent(this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.streamProcessor.blockAfterWorkflowInstanceSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceSubscriptionIntent.OPENED;
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CLOSE, subscriptionRecordForEvent);
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CLOSE, subscriptionRecordForEvent);
        this.streamProcessor.unblock();
        TypedRecord<WorkflowInstanceSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = this.streamProcessorRule.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(WorkflowInstanceSubscriptionIntent.CLOSE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
    }

    @Test
    public void shouldNotTriggerBoundaryEventIfTaskIsCompleted() {
        this.streamProcessorRule.deploy(TIMER_BOUNDARY_EVENT_WORKFLOW);
        this.streamProcessor.blockAfterJobEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == JobIntent.CREATED;
        });
        this.streamProcessorRule.createWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process");
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        TypedRecord<TimerRecord> awaitTimerInState = this.streamProcessorRule.awaitTimerInState("timer1", TimerIntent.CREATED);
        TypedRecord<JobRecord> awaitJobInState = this.streamProcessorRule.awaitJobInState("task1", JobIntent.CREATED);
        this.envRule.writeEvent(awaitJobInState.getKey(), JobIntent.COMPLETED, awaitJobInState.getValue());
        this.envRule.writeCommand(awaitTimerInState.getKey(), TimerIntent.TRIGGER, awaitTimerInState.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat((List) this.envRule.events().onlyTimerRecords().collect(Collectors.toList())).extracting((v0) -> {
            return v0.getMetadata();
        }).extracting(recordMetadata -> {
            return Assertions.tuple(new Object[]{recordMetadata.getRecordType(), recordMetadata.getIntent()});
        }).containsExactly(new Tuple[]{Assertions.tuple(new Object[]{RecordType.COMMAND, TimerIntent.CREATE}), Assertions.tuple(new Object[]{RecordType.EVENT, TimerIntent.CREATED}), Assertions.tuple(new Object[]{RecordType.COMMAND, TimerIntent.TRIGGER}), Assertions.tuple(new Object[]{RecordType.COMMAND_REJECTION, TimerIntent.TRIGGER})});
        Assertions.assertThat((List) this.envRule.events().onlyWorkflowInstanceRecords().onlyEvents().collect(Collectors.toList())).noneMatch(typedRecord2 -> {
            return typedRecord2.getValue().getElementId().equals(BufferUtil.wrapString("timer1"));
        });
    }

    @Test
    public void shouldIgnoreSecondConsecutiveBoundaryEventTrigger() {
        this.streamProcessorRule.deploy(Bpmn.createExecutableProcess("process").startEvent().serviceTask("task", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("type");
        }).boundaryEvent("timer1").timerWithDuration("PT1S").endEvent("timer1End").moveToActivity("task").boundaryEvent("timer2").timerWithDuration("PT2S").endEvent("timer2End").done());
        this.streamProcessor.blockAfterWorkflowInstanceRecord(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceIntent.ELEMENT_ACTIVATED && typedRecord.getValue().getElementId().equals(BufferUtil.wrapString("task"));
        });
        this.streamProcessorRule.createWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process");
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        TypedRecord<TimerRecord> awaitTimerInState = this.streamProcessorRule.awaitTimerInState("timer1", TimerIntent.CREATED);
        TypedRecord<TimerRecord> awaitTimerInState2 = this.streamProcessorRule.awaitTimerInState("timer2", TimerIntent.CREATED);
        this.envRule.writeCommand(awaitTimerInState.getKey(), TimerIntent.TRIGGER, awaitTimerInState.getValue());
        this.envRule.writeCommand(awaitTimerInState2.getKey(), TimerIntent.TRIGGER, awaitTimerInState2.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat(((TypedRecordStream) this.envRule.events().onlyWorkflowInstanceRecords().skipUntil(typedRecord2 -> {
            return typedRecord2.getValue().getElementId().equals(BufferUtil.wrapString("task"));
        })).withIntent(WorkflowInstanceIntent.ELEMENT_COMPLETING).map((v0) -> {
            return v0.getValue();
        }).map((v0) -> {
            return v0.getElementId();
        }).map(BufferUtil::bufferAsString)).containsExactly(new String[]{"timer1", "timer1End", "process"});
    }

    private Predicate<TypedRecord<WorkflowInstanceRecord>> isForElement(String str) {
        return typedRecord -> {
            return BufferUtil.wrapString(str).equals(typedRecord.getValue().getElementId());
        };
    }

    private Predicate<TypedRecord<WorkflowInstanceRecord>> isForElement(String str, WorkflowInstanceIntent workflowInstanceIntent) {
        return isForElement(str).and(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == workflowInstanceIntent;
        });
    }

    private WorkflowInstanceSubscriptionRecord subscriptionRecordForEvent(TypedRecord<WorkflowInstanceRecord> typedRecord) {
        return new WorkflowInstanceSubscriptionRecord().setSubscriptionPartitionId(0).setWorkflowInstanceKey(typedRecord.getValue().getWorkflowInstanceKey()).setElementInstanceKey(typedRecord.getKey()).setMessageName(BufferUtil.wrapString("order canceled"));
    }
}
