package io.zeebe.broker.subscription.message.processor;

import io.zeebe.broker.logstreams.processor.SideEffectProducer;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.data.MessageSubscriptionRecord;
import io.zeebe.broker.subscription.message.state.Message;
import io.zeebe.broker.subscription.message.state.MessageStateController;
import io.zeebe.broker.subscription.message.state.MessageSubscription;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/zeebe/broker/subscription/message/processor/OpenMessageSubscriptionProcessor.class */
public class OpenMessageSubscriptionProcessor implements TypedRecordProcessor<MessageSubscriptionRecord> {
    private final MessageStateController messageStateController;
    private final SubscriptionCommandSender commandSender;
    private final DirectBuffer messagePayload = new UnsafeBuffer(0, 0);
    private MessageSubscriptionRecord subscriptionRecord;

    public OpenMessageSubscriptionProcessor(MessageStateController messageStateController, SubscriptionCommandSender subscriptionCommandSender) {
        this.messageStateController = messageStateController;
        this.commandSender = subscriptionCommandSender;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageSubscriptionRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        this.subscriptionRecord = typedRecord.getValue();
        MessageSubscription messageSubscription = new MessageSubscription(this.subscriptionRecord.getWorkflowInstancePartitionId(), this.subscriptionRecord.getWorkflowInstanceKey(), this.subscriptionRecord.getActivityInstanceKey(), this.subscriptionRecord.getMessageName(), this.subscriptionRecord.getCorrelationKey());
        if (!this.messageStateController.exist(messageSubscription)) {
            handleNewSubscription(typedRecord, typedStreamWriter, consumer, messageSubscription);
        } else {
            consumer.accept(this::sendAcknowledgeCommand);
            typedStreamWriter.writeRejection(typedRecord, RejectionType.NOT_APPLICABLE, "subscription is already open");
        }
    }

    private void handleNewSubscription(TypedRecord<MessageSubscriptionRecord> typedRecord, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer, MessageSubscription messageSubscription) {
        Message findFirstMessage = this.messageStateController.findFirstMessage(this.subscriptionRecord.getMessageName(), this.subscriptionRecord.getCorrelationKey());
        if (findFirstMessage != null) {
            this.messagePayload.wrap(findFirstMessage.getPayload());
            consumer.accept(this::sendCorrelateCommand);
            messageSubscription.setMessagePayload(findFirstMessage.getPayload());
            messageSubscription.setCommandSentTime(ActorClock.currentTimeMillis());
        } else {
            consumer.accept(this::sendAcknowledgeCommand);
        }
        this.messageStateController.put(messageSubscription);
        typedStreamWriter.writeFollowUpEvent(typedRecord.getKey(), MessageSubscriptionIntent.OPENED, this.subscriptionRecord);
    }

    private boolean sendCorrelateCommand() {
        return this.commandSender.correlateWorkflowInstanceSubscription(this.subscriptionRecord.getWorkflowInstancePartitionId(), this.subscriptionRecord.getWorkflowInstanceKey(), this.subscriptionRecord.getActivityInstanceKey(), this.subscriptionRecord.getMessageName(), this.messagePayload);
    }

    private boolean sendAcknowledgeCommand() {
        return this.commandSender.openWorkflowInstanceSubscription(this.subscriptionRecord.getWorkflowInstancePartitionId(), this.subscriptionRecord.getWorkflowInstanceKey(), this.subscriptionRecord.getActivityInstanceKey(), this.subscriptionRecord.getMessageName());
    }
}
