package io.zeebe.broker.subscription.message.processor;

import io.zeebe.broker.logstreams.processor.SideEffectProducer;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.data.MessageSubscriptionRecord;
import io.zeebe.broker.subscription.message.state.Message;
import io.zeebe.broker.subscription.message.state.MessageState;
import io.zeebe.broker.subscription.message.state.MessageSubscription;
import io.zeebe.broker.subscription.message.state.MessageSubscriptionState;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/zeebe/broker/subscription/message/processor/MessageCorrelator.class */
public class MessageCorrelator {
    private final DirectBuffer messagePayload = new UnsafeBuffer();
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;
    private final SubscriptionCommandSender commandSender;
    private Consumer<SideEffectProducer> sideEffect;
    private MessageSubscriptionRecord subscriptionRecord;
    private MessageSubscription subscription;

    public MessageCorrelator(MessageState messageState, MessageSubscriptionState messageSubscriptionState, SubscriptionCommandSender subscriptionCommandSender) {
        this.messageState = messageState;
        this.subscriptionState = messageSubscriptionState;
        this.commandSender = subscriptionCommandSender;
    }

    public void correlateNextMessage(MessageSubscription messageSubscription, MessageSubscriptionRecord messageSubscriptionRecord, Consumer<SideEffectProducer> consumer) {
        this.subscription = messageSubscription;
        this.subscriptionRecord = messageSubscriptionRecord;
        this.sideEffect = consumer;
        this.messageState.visitMessages(messageSubscription.getMessageName(), messageSubscription.getCorrelationKey(), this::correlateMessage);
    }

    private boolean correlateMessage(Message message) {
        boolean existMessageCorrelation = this.messageState.existMessageCorrelation(message.getKey(), this.subscriptionRecord.getWorkflowInstanceKey());
        if (!existMessageCorrelation) {
            this.subscriptionState.updateToCorrelatingState(this.subscription, message.getPayload(), ActorClock.currentTimeMillis());
            this.messagePayload.wrap(message.getPayload());
            this.sideEffect.accept(this::sendCorrelateCommand);
            this.messageState.putMessageCorrelation(message.getKey(), this.subscriptionRecord.getWorkflowInstanceKey());
        }
        return existMessageCorrelation;
    }

    private boolean sendCorrelateCommand() {
        return this.commandSender.correlateWorkflowInstanceSubscription(this.subscriptionRecord.getWorkflowInstanceKey(), this.subscriptionRecord.getElementInstanceKey(), this.subscriptionRecord.getMessageName(), this.messagePayload);
    }
}
