package io.zeebe.broker.workflow.processor.message;

import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.logstreams.processor.SideEffectProducer;
import io.zeebe.broker.logstreams.processor.TypedBatchWriter;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.data.WorkflowInstanceSubscriptionRecord;
import io.zeebe.broker.workflow.state.ElementInstance;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.util.sched.ActorControl;
import java.time.Duration;
import java.util.function.Consumer;

/* loaded from: input_file:io/zeebe/broker/workflow/processor/message/CorrelateWorkflowInstanceSubscription.class */
public final class CorrelateWorkflowInstanceSubscription implements TypedRecordProcessor<WorkflowInstanceSubscriptionRecord> {
    public static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(10);
    public static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30);
    private final TopologyManager topologyManager;
    private final WorkflowState workflowState;
    private final SubscriptionCommandSender subscriptionCommandSender;
    private TypedRecord<WorkflowInstanceSubscriptionRecord> record;
    private WorkflowInstanceSubscriptionRecord subscription;
    private TypedStreamWriter streamWriter;
    private Consumer<SideEffectProducer> sideEffect;

    public CorrelateWorkflowInstanceSubscription(TopologyManager topologyManager, WorkflowState workflowState, SubscriptionCommandSender subscriptionCommandSender) {
        this.topologyManager = topologyManager;
        this.workflowState = workflowState;
        this.subscriptionCommandSender = subscriptionCommandSender;
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        ActorControl actor = typedStreamProcessor.getActor();
        this.subscriptionCommandSender.init(this.topologyManager, actor, typedStreamProcessor.getEnvironment().getStream());
        actor.runAtFixedRate(SUBSCRIPTION_CHECK_INTERVAL, new PendingWorkflowInstanceSubscriptionChecker(this.subscriptionCommandSender, this.workflowState, SUBSCRIPTION_TIMEOUT.toMillis()));
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onClose() {
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<WorkflowInstanceSubscriptionRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        this.record = typedRecord;
        this.subscription = typedRecord.getValue();
        this.streamWriter = typedStreamWriter;
        this.sideEffect = consumer;
        ElementInstance elementInstanceState = this.workflowState.getElementInstanceState().getInstance(this.subscription.getActivityInstanceKey());
        if (elementInstanceState == null) {
            typedStreamWriter.writeRejection(typedRecord, RejectionType.NOT_APPLICABLE, "activity is not active anymore");
            return;
        }
        if (this.workflowState.getWorkflowByKey(elementInstanceState.getValue().getWorkflowKey()) != null) {
            onWorkflowAvailable();
        } else {
            typedStreamWriter.writeRejection(typedRecord, RejectionType.NOT_APPLICABLE, "workflow is not available");
        }
    }

    private void onWorkflowAvailable() {
        if (!this.workflowState.remove(this.subscription)) {
            this.streamWriter.writeRejection(this.record, RejectionType.NOT_APPLICABLE, "subscription is already correlated");
            this.sideEffect.accept(this::sendAcknowledgeCommand);
            return;
        }
        ElementInstance elementInstanceState = this.workflowState.getElementInstanceState().getInstance(this.subscription.getActivityInstanceKey());
        UnpackedObject value = elementInstanceState.getValue();
        value.setPayload(this.subscription.getPayload());
        TypedBatchWriter newBatch = this.streamWriter.newBatch();
        newBatch.addFollowUpEvent(this.record.getKey(), WorkflowInstanceSubscriptionIntent.CORRELATED, this.subscription);
        newBatch.addFollowUpEvent(this.subscription.getActivityInstanceKey(), WorkflowInstanceIntent.ELEMENT_COMPLETING, value);
        this.sideEffect.accept(this::sendAcknowledgeCommand);
        elementInstanceState.setState(WorkflowInstanceIntent.ELEMENT_COMPLETING);
        elementInstanceState.setValue(value);
    }

    private boolean sendAcknowledgeCommand() {
        return this.subscriptionCommandSender.correlateMessageSubscription(this.subscription.getSubscriptionPartitionId(), this.subscription.getWorkflowInstanceKey(), this.subscription.getActivityInstanceKey(), this.subscription.getMessageName());
    }
}
