package io.zeebe.broker.workflow;

import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.broker.workflow.gateway.ParallelGatewayStreamProcessorTest;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.protocol.clientapi.ExecuteCommandResponseDecoder;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.record.value.deployment.ResourceType;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandResponse;
import io.zeebe.test.broker.protocol.clientapi.SubscribedRecord;
import io.zeebe.test.broker.protocol.clientapi.TestPartitionClient;
import io.zeebe.test.util.MsgPackUtil;
import java.io.File;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Files;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/workflow/WorkflowInstanceFunctionalTest.class */
public class WorkflowInstanceFunctionalTest {
    private static final String PROP_JOB_TYPE = "type";
    private static final String PROP_JOB_RETRIES = "retries";
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private TestPartitionClient testClient;

    public WorkflowInstanceFunctionalTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void init() {
        this.testClient = this.apiRule.partition();
    }

    @Test
    public void shouldStartWorkflowInstanceAtNoneStartEvent() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent(TypedStreamProcessorTest.STREAM_NAME).endEvent().done());
        ExecuteCommandResponse createWorkflowInstanceWithResponse = this.testClient.createWorkflowInstanceWithResponse(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveFirstWorkflowInstanceCommand = this.testClient.receiveFirstWorkflowInstanceCommand(WorkflowInstanceIntent.CREATE);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.START_EVENT_OCCURRED);
        long key = createWorkflowInstanceWithResponse.key();
        Assertions.assertThat(createWorkflowInstanceWithResponse.sourceRecordPosition()).isEqualTo(receiveFirstWorkflowInstanceCommand.position());
        Assertions.assertThat(createWorkflowInstanceWithResponse.position()).isGreaterThan(receiveFirstWorkflowInstanceCommand.position());
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent.key()).isGreaterThan(0L).isNotEqualTo(key);
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent.position()).isGreaterThan(createWorkflowInstanceWithResponse.position());
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent.value()).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(key)).containsEntry("activityId", TypedStreamProcessorTest.STREAM_NAME);
    }

    @Test
    public void shouldTakeSequenceFlowFromStartEvent() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().sequenceFlowId(TypedStreamProcessorTest.STREAM_NAME).endEvent().done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.START_EVENT_OCCURRED);
        SubscribedRecord receiveFirstWorkflowInstanceEvent2 = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN);
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent2.key()).isGreaterThan(0L).isNotEqualTo(createWorkflowInstance);
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent2.sourceRecordPosition()).isEqualTo(receiveFirstWorkflowInstanceEvent.position());
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent2.value()).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", TypedStreamProcessorTest.STREAM_NAME);
    }

    @Test
    public void shouldOccureEndEvent() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().endEvent(TypedStreamProcessorTest.STREAM_NAME).done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN);
        SubscribedRecord receiveFirstWorkflowInstanceEvent2 = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.END_EVENT_OCCURRED);
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent2.key()).isGreaterThan(0L).isNotEqualTo(createWorkflowInstance);
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent2.sourceRecordPosition()).isEqualTo(receiveFirstWorkflowInstanceEvent.position());
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent2.value()).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", TypedStreamProcessorTest.STREAM_NAME);
    }

    @Test
    public void shouldCompleteWorkflowInstance() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().endEvent().done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.END_EVENT_OCCURRED);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState(ParallelGatewayStreamProcessorTest.PROCESS_ID, WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat(receiveElementInState.key()).isEqualTo(createWorkflowInstance);
        Assertions.assertThat(receiveElementInState.position()).isGreaterThan(receiveFirstWorkflowInstanceEvent.position());
        Assertions.assertThat(receiveElementInState.value()).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", ParallelGatewayStreamProcessorTest.PROCESS_ID);
    }

    @Test
    public void shouldConsumeTokenIfEventHasNoOutgoingSequenceflow() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.START_EVENT_OCCURRED);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState(ParallelGatewayStreamProcessorTest.PROCESS_ID, WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat(receiveElementInState.key()).isEqualTo(createWorkflowInstance);
        Assertions.assertThat(receiveElementInState.position()).isGreaterThan(receiveFirstWorkflowInstanceEvent.position());
        Assertions.assertThat(receiveElementInState.value()).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", ParallelGatewayStreamProcessorTest.PROCESS_ID);
    }

    @Test
    public void shouldConsumeTokenIfActivityHasNoOutgoingSequenceflow() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask(TypedStreamProcessorTest.STREAM_NAME, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType(TestJarExporter.FOO);
        }).done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        this.testClient.completeJobOfType(TestJarExporter.FOO);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState(TypedStreamProcessorTest.STREAM_NAME, WorkflowInstanceIntent.ELEMENT_COMPLETED);
        SubscribedRecord receiveElementInState2 = this.testClient.receiveElementInState(ParallelGatewayStreamProcessorTest.PROCESS_ID, WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat(receiveElementInState2.key()).isEqualTo(createWorkflowInstance);
        Assertions.assertThat(receiveElementInState2.position()).isGreaterThan(receiveElementInState.position());
        Assertions.assertThat(receiveElementInState2.value()).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", ParallelGatewayStreamProcessorTest.PROCESS_ID);
    }

    @Test
    public void shouldActivateServiceTask() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask(TypedStreamProcessorTest.STREAM_NAME, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType(TestJarExporter.FOO);
        }).endEvent().done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState(TypedStreamProcessorTest.STREAM_NAME, WorkflowInstanceIntent.ELEMENT_READY);
        SubscribedRecord receiveElementInState2 = this.testClient.receiveElementInState(TypedStreamProcessorTest.STREAM_NAME, WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        Assertions.assertThat(receiveElementInState2.key()).isGreaterThan(0L).isNotEqualTo(createWorkflowInstance);
        Assertions.assertThat(receiveElementInState2.sourceRecordPosition()).isEqualTo(receiveElementInState.position());
        Assertions.assertThat(receiveElementInState2.value()).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", TypedStreamProcessorTest.STREAM_NAME);
    }

    @Test
    public void shouldCreateTaskWhenServiceTaskIsActivated() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask(TypedStreamProcessorTest.STREAM_NAME, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType(TestJarExporter.FOO).zeebeTaskRetries(5);
        }).endEvent().done());
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState(TypedStreamProcessorTest.STREAM_NAME, WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        SubscribedRecord receiveFirstJobCommand = this.testClient.receiveFirstJobCommand(JobIntent.CREATE);
        Assertions.assertThat(receiveFirstJobCommand.key()).isEqualTo(ExecuteCommandResponseDecoder.keyNullValue());
        Assertions.assertThat(receiveFirstJobCommand.sourceRecordPosition()).isEqualTo(receiveElementInState.position());
        Assertions.assertThat(receiveFirstJobCommand.value()).containsEntry(PROP_JOB_TYPE, TestJarExporter.FOO).containsEntry(PROP_JOB_RETRIES, 5L);
    }

    @Test
    public void shouldCreateJobWithWorkflowInstanceAndCustomHeaders() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask(TypedStreamProcessorTest.STREAM_NAME, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType(TestJarExporter.FOO).zeebeTaskHeader("a", "b").zeebeTaskHeader("c", "d");
        }).endEvent().done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveFirstJobCommand = this.testClient.receiveFirstJobCommand(JobIntent.CREATE);
        Assertions.assertThat((Map) receiveFirstJobCommand.value().get("headers")).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowDefinitionVersion", 1L).containsEntry("activityId", TypedStreamProcessorTest.STREAM_NAME).containsKey("activityInstanceKey");
        Assertions.assertThat((Map) receiveFirstJobCommand.value().get("customHeaders")).containsEntry("a", "b").containsEntry("c", "d");
    }

    @Test
    public void shouldCompleteServiceTaskWhenTaskIsCompleted() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask(TypedStreamProcessorTest.STREAM_NAME, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType(TestJarExporter.FOO);
        }).endEvent().done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        this.testClient.completeJobOfType(TestJarExporter.FOO);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState(TypedStreamProcessorTest.STREAM_NAME, WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.COMPLETED);
        SubscribedRecord receiveElementInState2 = this.testClient.receiveElementInState(TypedStreamProcessorTest.STREAM_NAME, WorkflowInstanceIntent.ELEMENT_COMPLETING);
        SubscribedRecord receiveElementInState3 = this.testClient.receiveElementInState(TypedStreamProcessorTest.STREAM_NAME, WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat(receiveElementInState2.sourceRecordPosition()).isEqualTo(receiveFirstJobEvent.position());
        Assertions.assertThat(receiveElementInState3.key()).isEqualTo(receiveElementInState.key());
        Assertions.assertThat(receiveElementInState3.sourceRecordPosition()).isEqualTo(receiveElementInState2.position());
        Assertions.assertThat(receiveElementInState3.value()).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", TypedStreamProcessorTest.STREAM_NAME);
    }

    @Test
    public void shouldSpitOnExclusiveGateway() {
        this.testClient.deploy(Bpmn.createExecutableProcess("workflow").startEvent().exclusiveGateway("xor").sequenceFlowId("s1").condition("$.foo < 5").endEvent("a").moveToLastGateway().sequenceFlowId("s2").condition("$.foo >= 5 && $.foo < 10").endEvent("b").moveToLastExclusiveGateway().defaultFlow().sequenceFlowId("s3").endEvent("c").done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 4));
        long createWorkflowInstance2 = this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 8));
        long createWorkflowInstance3 = this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 12));
        Assertions.assertThat(this.testClient.receiveFirstWorkflowInstanceEvent(createWorkflowInstance, WorkflowInstanceIntent.END_EVENT_OCCURRED).value()).containsEntry("activityId", "a");
        Assertions.assertThat(this.testClient.receiveFirstWorkflowInstanceEvent(createWorkflowInstance2, WorkflowInstanceIntent.END_EVENT_OCCURRED).value()).containsEntry("activityId", "b");
        Assertions.assertThat(this.testClient.receiveFirstWorkflowInstanceEvent(createWorkflowInstance3, WorkflowInstanceIntent.END_EVENT_OCCURRED).value()).containsEntry("activityId", "c");
    }

    @Test
    public void shouldJoinOnExclusiveGateway() {
        this.testClient.deploy(Bpmn.createExecutableProcess("workflow").startEvent().exclusiveGateway("split").sequenceFlowId("s1").condition("$.foo < 5").exclusiveGateway("joinRequest").moveToLastExclusiveGateway().defaultFlow().sequenceFlowId("s2").connectTo("joinRequest").endEvent("end").done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 4));
        long createWorkflowInstance2 = this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 8));
        this.testClient.receiveElementInState(createWorkflowInstance, "workflow", WorkflowInstanceIntent.ELEMENT_COMPLETED);
        this.testClient.receiveElementInState(createWorkflowInstance2, "workflow", WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat((List) this.testClient.receiveEvents().withIntent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN).filter(subscribedRecord -> {
            return ((Long) subscribedRecord.value().get("workflowInstanceKey")).longValue() == createWorkflowInstance;
        }).limit(3L).map(subscribedRecord2 -> {
            return (String) subscribedRecord2.value().get("activityId");
        }).collect(Collectors.toList())).contains(new String[]{"s1"}).doesNotContain(new String[]{"s2"});
        Assertions.assertThat((List) this.testClient.receiveEvents().withIntent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN).filter(subscribedRecord3 -> {
            return ((Long) subscribedRecord3.value().get("workflowInstanceKey")).longValue() == createWorkflowInstance2;
        }).limit(3L).map(subscribedRecord4 -> {
            return (String) subscribedRecord4.value().get("activityId");
        }).collect(Collectors.toList())).contains(new String[]{"s2"}).doesNotContain(new String[]{"s1"});
    }

    @Test
    public void shouldSetSourceRecordPositionCorrectOnJoinXor() {
        this.testClient.deploy(Bpmn.createExecutableProcess("workflow").startEvent().exclusiveGateway("split").sequenceFlowId("s1").condition("$.foo < 5").exclusiveGateway("joinRequest").moveToLastExclusiveGateway().defaultFlow().sequenceFlowId("s2").connectTo("joinRequest").endEvent("end").done());
        this.testClient.receiveElementInState(this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 4)), "workflow", WorkflowInstanceIntent.ELEMENT_COMPLETED);
        List list = (List) this.testClient.receiveEvents().withIntent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN).limit(3L).collect(Collectors.toList());
        List list2 = (List) this.testClient.receiveEvents().withIntent(WorkflowInstanceIntent.GATEWAY_ACTIVATED).limit(2L).collect(Collectors.toList());
        Assertions.assertThat(((SubscribedRecord) list2.get(0)).sourceRecordPosition()).isEqualTo(((SubscribedRecord) list.get(0)).position());
        Assertions.assertThat(((SubscribedRecord) list.get(1)).value().get("activityId")).isEqualTo("s1");
        Assertions.assertThat(((SubscribedRecord) list2.get(1)).sourceRecordPosition()).isEqualTo(((SubscribedRecord) list.get(1)).position());
        long createWorkflowInstance = this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 8));
        this.testClient.receiveElementInState(createWorkflowInstance, "workflow", WorkflowInstanceIntent.ELEMENT_COMPLETED);
        List list3 = (List) this.testClient.receiveEvents().withIntent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN).filter(subscribedRecord -> {
            return ((Long) subscribedRecord.value().get("workflowInstanceKey")).longValue() == createWorkflowInstance;
        }).limit(3L).collect(Collectors.toList());
        List list4 = (List) this.testClient.receiveEvents().withIntent(WorkflowInstanceIntent.GATEWAY_ACTIVATED).filter(subscribedRecord2 -> {
            return ((Long) subscribedRecord2.value().get("workflowInstanceKey")).longValue() == createWorkflowInstance;
        }).limit(2L).collect(Collectors.toList());
        Assertions.assertThat(((SubscribedRecord) list4.get(0)).sourceRecordPosition()).isEqualTo(((SubscribedRecord) list3.get(0)).position());
        Assertions.assertThat(((SubscribedRecord) list3.get(1)).value().get("activityId")).isEqualTo("s2");
        Assertions.assertThat(((SubscribedRecord) list4.get(1)).sourceRecordPosition()).isEqualTo(((SubscribedRecord) list3.get(1)).position());
    }

    @Test
    public void testWorkflowInstanceStatesWithServiceTask() {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent("a").serviceTask("b", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType(TypedStreamProcessorTest.STREAM_NAME);
        }).endEvent("c").done());
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        this.testClient.completeJobOfType(TypedStreamProcessorTest.STREAM_NAME);
        Assertions.assertThat((List) this.testClient.receiveRecords().ofTypeWorkflowInstance().limit(14L).collect(Collectors.toList())).extracting(subscribedRecord -> {
            return subscribedRecord.intent();
        }).containsExactly(new Intent[]{WorkflowInstanceIntent.CREATE, WorkflowInstanceIntent.CREATED, WorkflowInstanceIntent.ELEMENT_READY, WorkflowInstanceIntent.ELEMENT_ACTIVATED, WorkflowInstanceIntent.START_EVENT_OCCURRED, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, WorkflowInstanceIntent.ELEMENT_READY, WorkflowInstanceIntent.ELEMENT_ACTIVATED, WorkflowInstanceIntent.ELEMENT_COMPLETING, WorkflowInstanceIntent.ELEMENT_COMPLETED, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, WorkflowInstanceIntent.END_EVENT_OCCURRED, WorkflowInstanceIntent.ELEMENT_COMPLETING, WorkflowInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void testWorkflowInstanceStatesWithExclusiveGateway() {
        this.testClient.deploy(Bpmn.createExecutableProcess("workflow").startEvent().exclusiveGateway("xor").sequenceFlowId("s1").condition("$.foo < 5").endEvent("a").moveToLastExclusiveGateway().defaultFlow().sequenceFlowId("s2").endEvent("b").done());
        this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 4));
        Assertions.assertThat((List) this.testClient.receiveRecords().ofTypeWorkflowInstance().limit(11L).collect(Collectors.toList())).extracting(subscribedRecord -> {
            return subscribedRecord.intent();
        }).containsExactly(new Intent[]{WorkflowInstanceIntent.CREATE, WorkflowInstanceIntent.CREATED, WorkflowInstanceIntent.ELEMENT_READY, WorkflowInstanceIntent.ELEMENT_ACTIVATED, WorkflowInstanceIntent.START_EVENT_OCCURRED, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, WorkflowInstanceIntent.GATEWAY_ACTIVATED, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, WorkflowInstanceIntent.END_EVENT_OCCURRED, WorkflowInstanceIntent.ELEMENT_COMPLETING, WorkflowInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void shouldCreateAndCompleteInstanceOfYamlWorkflow() throws URISyntaxException {
        String contentOf = Files.contentOf(new File(getClass().getResource("/workflows/simple-workflow.yaml").toURI()), StandardCharsets.UTF_8);
        HashMap hashMap = new HashMap();
        hashMap.put("resource", contentOf.getBytes(StandardCharsets.UTF_8));
        hashMap.put("resourceType", ResourceType.YAML_WORKFLOW);
        hashMap.put("resourceName", "simple-workflow.yaml");
        this.testClient.receiveFirstDeploymentEvent(DeploymentIntent.CREATED, ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().partitionId(0).type(ValueType.DEPLOYMENT, DeploymentIntent.CREATE).command().put("resources", Collections.singletonList(hashMap)).done()).sendAndAwait().key());
        long createWorkflowInstance = this.testClient.createWorkflowInstance("yaml-workflow");
        this.testClient.completeJobOfType(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.completeJobOfType(TestJarExporter.FOO);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("yaml-workflow", WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat(receiveElementInState.key()).isEqualTo(createWorkflowInstance);
        Assertions.assertThat(receiveElementInState.value()).containsEntry("bpmnProcessId", "yaml-workflow").containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "yaml-workflow");
    }

    @Test
    public void shouldReprocessWorkflowInstanceRecordsWhenWorkflowIsTemporarilyUnavailable() {
        for (int i = 0; i < 25; i++) {
            this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask(TypedStreamProcessorTest.STREAM_NAME, serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeTaskType(TestJarExporter.FOO);
            }).endEvent().done());
        }
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        this.brokerRule.stopBroker();
        this.brokerRule.purgeSnapshots();
        this.brokerRule.startBroker();
        Assertions.assertThat(this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID)).isGreaterThan(0L);
    }
}
