package io.zeebe.broker.subscription.message.processor;

import io.zeebe.broker.logstreams.processor.SideEffectProducer;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.data.MessageSubscriptionRecord;
import io.zeebe.broker.subscription.message.state.MessageState;
import io.zeebe.broker.subscription.message.state.MessageSubscription;
import io.zeebe.broker.subscription.message.state.MessageSubscriptionState;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.util.buffer.BufferUtil;
import java.util.function.Consumer;

/* loaded from: input_file:io/zeebe/broker/subscription/message/processor/OpenMessageSubscriptionProcessor.class */
public class OpenMessageSubscriptionProcessor implements TypedRecordProcessor<MessageSubscriptionRecord> {
    public static final String SUBSCRIPTION_ALREADY_OPENED_MESSAGE = "Expected to open a new message subscription for element with key '%d' and message name '%s', but there is already a message subscription for that element key and message name opened";
    private final MessageCorrelator messageCorrelator;
    private final MessageSubscriptionState subscriptionState;
    private final SubscriptionCommandSender commandSender;
    private MessageSubscriptionRecord subscriptionRecord;

    public OpenMessageSubscriptionProcessor(MessageState messageState, MessageSubscriptionState messageSubscriptionState, SubscriptionCommandSender subscriptionCommandSender) {
        this.subscriptionState = messageSubscriptionState;
        this.commandSender = subscriptionCommandSender;
        this.messageCorrelator = new MessageCorrelator(messageState, messageSubscriptionState, subscriptionCommandSender);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageSubscriptionRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        this.subscriptionRecord = typedRecord.getValue();
        if (!this.subscriptionState.existSubscriptionForElementInstance(this.subscriptionRecord.getElementInstanceKey(), this.subscriptionRecord.getMessageName())) {
            handleNewSubscription(typedRecord, typedStreamWriter, consumer);
        } else {
            consumer.accept(this::sendAcknowledgeCommand);
            typedStreamWriter.appendRejection(typedRecord, RejectionType.INVALID_STATE, String.format(SUBSCRIPTION_ALREADY_OPENED_MESSAGE, Long.valueOf(this.subscriptionRecord.getElementInstanceKey()), BufferUtil.bufferAsString(this.subscriptionRecord.getMessageName())));
        }
    }

    private void handleNewSubscription(TypedRecord<MessageSubscriptionRecord> typedRecord, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        MessageSubscription messageSubscription = new MessageSubscription(this.subscriptionRecord.getWorkflowInstanceKey(), this.subscriptionRecord.getElementInstanceKey(), this.subscriptionRecord.getMessageName(), this.subscriptionRecord.getCorrelationKey(), this.subscriptionRecord.shouldCloseOnCorrelate());
        consumer.accept(this::sendAcknowledgeCommand);
        this.subscriptionState.put(messageSubscription);
        this.messageCorrelator.correlateNextMessage(messageSubscription, this.subscriptionRecord, consumer);
        typedStreamWriter.appendFollowUpEvent(typedRecord.getKey(), MessageSubscriptionIntent.OPENED, this.subscriptionRecord);
    }

    private boolean sendAcknowledgeCommand() {
        return this.commandSender.openWorkflowInstanceSubscription(this.subscriptionRecord.getWorkflowInstanceKey(), this.subscriptionRecord.getElementInstanceKey(), this.subscriptionRecord.getMessageName(), this.subscriptionRecord.shouldCloseOnCorrelate());
    }
}
