package io.zeebe.broker.workflow.gateway;

import io.zeebe.broker.topic.StreamProcessorControl;
import io.zeebe.broker.util.StreamProcessorRule;
import io.zeebe.broker.util.TypedRecordStream;
import io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessorRule;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.model.bpmn.instance.ExclusiveGateway;
import io.zeebe.model.bpmn.instance.ParallelGateway;
import io.zeebe.model.bpmn.instance.Process;
import io.zeebe.model.bpmn.instance.SequenceFlow;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/workflow/gateway/ParallelGatewayStreamProcessorTest.class */
public class ParallelGatewayStreamProcessorTest {
    public static final String PROCESS_ID = "process";
    public static final DirectBuffer PROCESS_ID_BUFFER = BufferUtil.wrapString(PROCESS_ID);
    public StreamProcessorRule envRule = new StreamProcessorRule();
    public WorkflowInstanceStreamProcessorRule streamProcessorRule = new WorkflowInstanceStreamProcessorRule(this.envRule);

    @Rule
    public RuleChain chain = RuleChain.outerRule(this.envRule).around(this.streamProcessorRule);
    private StreamProcessorControl streamProcessor;

    @Before
    public void setUp() {
        this.streamProcessor = this.streamProcessorRule.getStreamProcessor();
    }

    @Test
    public void shouldNotCompleteScopeWhenATokenWaitsAtAGateway() {
        BpmnModelInstance done = Bpmn.createExecutableProcess(PROCESS_ID).startEvent().parallelGateway("fork").sequenceFlowId("flowToJoin").parallelGateway("join").endEvent().moveToNode("fork").serviceTask("waitState", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("type");
        }).sequenceFlowId("flowToEnd").endEvent().done();
        ExclusiveGateway newInstance = done.newInstance(ExclusiveGateway.class);
        ParallelGateway modelElementById = done.getModelElementById("join");
        SequenceFlow newInstance2 = done.newInstance(SequenceFlow.class);
        Process process = (Process) done.getModelElementsByType(Process.class).iterator().next();
        process.addChildElement(newInstance);
        process.addChildElement(newInstance2);
        newInstance2.setSource(newInstance);
        newInstance.getOutgoing().add(newInstance2);
        newInstance2.setTarget(modelElementById);
        modelElementById.getIncoming().add(newInstance2);
        this.streamProcessorRule.deploy(done);
        this.streamProcessorRule.createWorkflowInstance(PROCESS_ID);
        this.streamProcessorRule.awaitElementInState("flowToJoin", WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceIntent.END_EVENT_OCCURRED;
        });
        this.streamProcessorRule.completeFirstJob();
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        Assertions.assertThat(((TypedRecordStream) this.envRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_COMPLETING).filter(typedRecord2 -> {
            return PROCESS_ID_BUFFER.equals(typedRecord2.getValue().getActivityId());
        })).findFirst()).isNotPresent();
    }
}
