package io.zeebe.broker.workflow.processor;

import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.job.JobEventProcessors;
import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.data.WorkflowInstanceSubscriptionRecord;
import io.zeebe.broker.util.CopiedTypedEvent;
import io.zeebe.broker.util.RecordStream;
import io.zeebe.broker.util.Records;
import io.zeebe.broker.util.StreamProcessorControl;
import io.zeebe.broker.util.StreamProcessorRule;
import io.zeebe.broker.util.TypedRecordStream;
import io.zeebe.broker.workflow.processor.timer.DueDateTimerChecker;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.model.bpmn.instance.Process;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.zeebe.protocol.impl.record.value.deployment.ResourceType;
import io.zeebe.protocol.impl.record.value.deployment.Workflow;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.impl.record.value.timer.TimerRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceCreationRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.TimerIntent;
import io.zeebe.protocol.intent.WorkflowInstanceCreationIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.ActorControl;
import java.io.ByteArrayOutputStream;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessorRule.class */
public class WorkflowInstanceStreamProcessorRule extends ExternalResource implements StreamProcessorLifecycleAware {
    public static final int VERSION = 1;
    public static final int WORKFLOW_KEY = 123;
    public static final int DEPLOYMENT_KEY = 1;

    @Rule
    public TemporaryFolder folder = new TemporaryFolder();
    private final StreamProcessorRule environmentRule;
    private SubscriptionCommandSender mockSubscriptionCommandSender;
    private TopologyManager mockTopologyManager;
    private StreamProcessorControl streamProcessor;
    private WorkflowState workflowState;
    private ZeebeState zeebeState;
    private ActorControl actor;

    public WorkflowInstanceStreamProcessorRule(StreamProcessorRule streamProcessorRule) {
        this.environmentRule = streamProcessorRule;
    }

    public SubscriptionCommandSender getMockSubscriptionCommandSender() {
        return this.mockSubscriptionCommandSender;
    }

    protected void before() throws Exception {
        this.mockSubscriptionCommandSender = (SubscriptionCommandSender) Mockito.mock(SubscriptionCommandSender.class);
        this.mockTopologyManager = (TopologyManager) Mockito.mock(TopologyManager.class);
        Mockito.when(Boolean.valueOf(this.mockSubscriptionCommandSender.openMessageSubscription(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.mockSubscriptionCommandSender.correlateMessageSubscription(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.mockSubscriptionCommandSender.closeMessageSubscription(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class)))).thenReturn(true);
        this.streamProcessor = this.environmentRule.runStreamProcessor((typedEventStreamProcessorBuilder, zeebeDb) -> {
            this.zeebeState = new ZeebeState(zeebeDb);
            this.workflowState = this.zeebeState.getWorkflowState();
            WorkflowEventProcessors.addWorkflowProcessors(typedEventStreamProcessorBuilder, this.zeebeState, this.mockSubscriptionCommandSender, this.mockTopologyManager, new DueDateTimerChecker(this.workflowState), 1);
            JobEventProcessors.addJobProcessors(typedEventStreamProcessorBuilder, this.zeebeState);
            typedEventStreamProcessorBuilder.withListener(this);
            return typedEventStreamProcessorBuilder.build();
        });
    }

    public StreamProcessorControl getStreamProcessor() {
        return this.streamProcessor;
    }

    public void deploy(BpmnModelInstance bpmnModelInstance, int i, int i2) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Bpmn.writeModelToStream(byteArrayOutputStream, bpmnModelInstance);
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(byteArrayOutputStream.toByteArray());
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        DirectBuffer wrapString = BufferUtil.wrapString("resourceName");
        Process process = (Process) bpmnModelInstance.getModelElementsByType(Process.class).iterator().next();
        ((DeploymentResource) deploymentRecord.resources().add()).setResource(unsafeBuffer).setResourceName(wrapString).setResourceType(ResourceType.BPMN_XML);
        ((Workflow) deploymentRecord.workflows().add()).setKey(123L).setResourceName(wrapString).setBpmnProcessId(BufferUtil.wrapString(process.getId())).setVersion(i2);
        this.actor.call(() -> {
            return Boolean.valueOf(this.workflowState.putDeployment(i, deploymentRecord));
        }).join();
    }

    public void deploy(BpmnModelInstance bpmnModelInstance) {
        deploy(bpmnModelInstance, 1, 1);
    }

    public TypedRecord<WorkflowInstanceRecord> createAndReceiveWorkflowInstance(Function<WorkflowInstanceCreationRecord, WorkflowInstanceCreationRecord> function) {
        TypedRecord<WorkflowInstanceCreationRecord> createWorkflowInstance = createWorkflowInstance(function);
        return awaitAndGetFirstWorkflowInstanceRecord(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceIntent.ELEMENT_ACTIVATING && typedRecord.getKey() == createWorkflowInstance.getValue().getInstanceKey();
        });
    }

    public TypedRecord<WorkflowInstanceCreationRecord> createWorkflowInstance(Function<WorkflowInstanceCreationRecord, WorkflowInstanceCreationRecord> function) {
        long writeCommand = this.environmentRule.writeCommand(WorkflowInstanceCreationIntent.CREATE, (UnpackedObject) function.apply(new WorkflowInstanceCreationRecord()));
        return awaitAndGetFirstRecord(ValueType.WORKFLOW_INSTANCE_CREATION, (BiFunction<CopiedTypedEvent, TypedRecord<BiFunction>, Boolean>) (copiedTypedEvent, typedRecord) -> {
            return Boolean.valueOf(copiedTypedEvent.getSourceEventPosition() == writeCommand && typedRecord.getMetadata().getIntent() == WorkflowInstanceCreationIntent.CREATED);
        }, (BiFunction) new WorkflowInstanceCreationRecord());
    }

    public void completeFirstJob() {
        TypedRecord<JobRecord> awaitAndGetFirstRecordInState = awaitAndGetFirstRecordInState(JobIntent.CREATE);
        this.environmentRule.writeEvent(this.environmentRule.writeEvent(JobIntent.CREATED, awaitAndGetFirstRecordInState.getValue()), JobIntent.COMPLETED, awaitAndGetFirstRecordInState.getValue());
    }

    public TypedRecord<WorkflowInstanceRecord> awaitAndGetFirstWorkflowInstanceRecord(Predicate<TypedRecord<WorkflowInstanceRecord>> predicate) {
        return awaitAndGetFirstRecord(ValueType.WORKFLOW_INSTANCE, predicate, WorkflowInstanceRecord.class);
    }

    public <T extends UnpackedObject> TypedRecord<T> awaitAndGetFirstRecord(ValueType valueType, Predicate<TypedRecord<T>> predicate, Class<T> cls) {
        return (TypedRecord) ((Optional) TestUtil.doRepeatedly(() -> {
            return ((RecordStream) this.environmentRule.events().filter(loggedEvent -> {
                return Records.isRecordOfType(loggedEvent, valueType);
            })).map(loggedEvent2 -> {
                return CopiedTypedEvent.toTypedEvent(loggedEvent2, cls);
            }).filter(predicate).findFirst();
        }).until((v0) -> {
            return v0.isPresent();
        })).orElse(null);
    }

    public <T extends UnpackedObject> TypedRecord<T> awaitAndGetFirstRecord(ValueType valueType, BiFunction<CopiedTypedEvent, TypedRecord<T>, Boolean> biFunction, T t) {
        return (TypedRecord) ((Optional) TestUtil.doRepeatedly(() -> {
            return ((RecordStream) this.environmentRule.events().filter(loggedEvent -> {
                return Records.isRecordOfType(loggedEvent, valueType);
            })).map(loggedEvent2 -> {
                return new CopiedTypedEvent(loggedEvent2, t);
            }).filter(copiedTypedEvent -> {
                return ((Boolean) biFunction.apply(copiedTypedEvent, copiedTypedEvent)).booleanValue();
            }).findFirst();
        }).until((v0) -> {
            return v0.isPresent();
        })).orElse(null);
    }

    private TypedRecord<WorkflowInstanceRecord> awaitAndGetFirstRecordInState(WorkflowInstanceIntent workflowInstanceIntent) {
        awaitFirstRecordInState(workflowInstanceIntent);
        return (TypedRecord) this.environmentRule.events().onlyWorkflowInstanceRecords().withIntent(workflowInstanceIntent).findFirst().get();
    }

    private TypedRecord<JobRecord> awaitAndGetFirstRecordInState(JobIntent jobIntent) {
        awaitFirstRecordInState(jobIntent);
        return (TypedRecord) this.environmentRule.events().onlyJobRecords().withIntent(jobIntent).findFirst().get();
    }

    private void awaitFirstRecordInState(Intent intent) {
        TestUtil.waitUntil(() -> {
            return this.environmentRule.events().withIntent(intent).findFirst().isPresent();
        });
    }

    public TypedRecord<WorkflowInstanceSubscriptionRecord> awaitAndGetFirstSubscriptionRejection() {
        TestUtil.waitUntil(() -> {
            return this.environmentRule.events().onlyWorkflowInstanceSubscriptionRecords().onlyRejections().findFirst().isPresent();
        });
        return (TypedRecord) this.environmentRule.events().onlyWorkflowInstanceSubscriptionRecords().onlyRejections().findFirst().get();
    }

    public TypedRecord<WorkflowInstanceRecord> awaitElementInState(String str, WorkflowInstanceIntent workflowInstanceIntent) {
        DirectBuffer wrapString = BufferUtil.wrapString(str);
        return (TypedRecord) ((Optional) TestUtil.doRepeatedly(() -> {
            return ((TypedRecordStream) this.environmentRule.events().onlyWorkflowInstanceRecords().withIntent(workflowInstanceIntent).filter(typedRecord -> {
                return wrapString.equals(typedRecord.getValue().getElementId());
            })).findFirst();
        }).until(optional -> {
            return Boolean.valueOf(optional.isPresent());
        })).get();
    }

    public TypedRecord<TimerRecord> awaitTimerInState(String str, TimerIntent timerIntent) {
        DirectBuffer wrapString = BufferUtil.wrapString(str);
        Supplier supplier = () -> {
            return ((TypedRecordStream) this.environmentRule.events().onlyTimerRecords().filter(typedRecord -> {
                return typedRecord.getValue().getHandlerNodeId().equals(wrapString);
            })).withIntent(timerIntent);
        };
        TestUtil.waitUntil(() -> {
            return ((TypedRecordStream) supplier.get()).findFirst().isPresent();
        });
        return (TypedRecord) ((TypedRecordStream) supplier.get()).findFirst().get();
    }

    public TypedRecord<JobRecord> awaitJobInState(String str, JobIntent jobIntent) {
        DirectBuffer wrapString = BufferUtil.wrapString(str);
        Supplier supplier = () -> {
            return ((TypedRecordStream) this.environmentRule.events().onlyJobRecords().filter(typedRecord -> {
                return typedRecord.getValue().getHeaders().getElementId().equals(wrapString);
            })).withIntent(jobIntent);
        };
        TestUtil.waitUntil(() -> {
            return ((TypedRecordStream) supplier.get()).findFirst().isPresent();
        });
        return (TypedRecord) ((TypedRecordStream) supplier.get()).findFirst().get();
    }

    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        this.actor = typedStreamProcessor.getActor();
    }

    public void onRecovered(TypedStreamProcessor typedStreamProcessor) {
    }

    public void onClose() {
        this.actor = null;
    }
}
