package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.common.EventHandle;
import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.EventScopeInstanceState;
import io.camunda.zeebe.engine.state.immutable.MessageStartEventSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageStartEventSubscriptionRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/MessagePublishProcessor.class */
public final class MessagePublishProcessor implements TypedRecordProcessor<MessageRecord> {
    private static final String ALREADY_PUBLISHED_MESSAGE = "Expected to publish a new message with id '%s', but a message with that id was already published";
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;
    private final MessageStartEventSubscriptionState startEventSubscriptionState;
    private final SubscriptionCommandSender commandSender;
    private final KeyGenerator keyGenerator;
    private final StateWriter stateWriter;
    private final EventHandle eventHandle;
    private final Subscriptions correlatingSubscriptions = new Subscriptions();
    private TypedResponseWriter responseWriter;
    private MessageRecord messageRecord;
    private long messageKey;

    public MessagePublishProcessor(MessageState messageState, MessageSubscriptionState messageSubscriptionState, MessageStartEventSubscriptionState messageStartEventSubscriptionState, EventScopeInstanceState eventScopeInstanceState, SubscriptionCommandSender subscriptionCommandSender, KeyGenerator keyGenerator, Writers writers, ProcessState processState, EventTriggerBehavior eventTriggerBehavior) {
        this.messageState = messageState;
        this.subscriptionState = messageSubscriptionState;
        this.startEventSubscriptionState = messageStartEventSubscriptionState;
        this.commandSender = subscriptionCommandSender;
        this.keyGenerator = keyGenerator;
        this.stateWriter = writers.state();
        this.eventHandle = new EventHandle(keyGenerator, eventScopeInstanceState, writers, processState, eventTriggerBehavior);
    }

    @Override // io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        this.responseWriter = typedResponseWriter;
        this.messageRecord = typedRecord.mo22getValue();
        this.correlatingSubscriptions.clear();
        if (!this.messageRecord.hasMessageId() || !this.messageState.exist(this.messageRecord.getNameBuffer(), this.messageRecord.getCorrelationKeyBuffer(), this.messageRecord.getMessageIdBuffer())) {
            handleNewMessage(typedRecord, typedResponseWriter, consumer);
            return;
        }
        String format = String.format(ALREADY_PUBLISHED_MESSAGE, BufferUtil.bufferAsString(this.messageRecord.getMessageIdBuffer()));
        typedStreamWriter.appendRejection(typedRecord, RejectionType.ALREADY_EXISTS, format);
        typedResponseWriter.writeRejectionOnCommand(typedRecord, RejectionType.ALREADY_EXISTS, format);
    }

    private void handleNewMessage(TypedRecord<MessageRecord> typedRecord, TypedResponseWriter typedResponseWriter, Consumer<SideEffectProducer> consumer) {
        this.messageKey = this.keyGenerator.nextKey();
        this.messageRecord.setDeadline(typedRecord.getTimestamp() + this.messageRecord.getTimeToLive());
        this.stateWriter.appendFollowUpEvent(this.messageKey, MessageIntent.PUBLISHED, typedRecord.mo22getValue());
        typedResponseWriter.writeEventOnCommand(this.messageKey, MessageIntent.PUBLISHED, typedRecord.mo22getValue(), typedRecord);
        correlateToSubscriptions(this.messageKey, this.messageRecord);
        correlateToMessageStartEvents(this.messageRecord);
        consumer.accept(this::sendCorrelateCommand);
        if (this.messageRecord.getTimeToLive() <= 0) {
            this.stateWriter.appendFollowUpEvent(this.messageKey, MessageIntent.EXPIRED, this.messageRecord);
        }
    }

    private void correlateToSubscriptions(long j, MessageRecord messageRecord) {
        this.subscriptionState.visitSubscriptions(messageRecord.getNameBuffer(), messageRecord.getCorrelationKeyBuffer(), messageSubscription -> {
            if (messageSubscription.isCorrelating() || this.correlatingSubscriptions.contains(messageSubscription.getRecord().getBpmnProcessIdBuffer())) {
                return true;
            }
            MessageSubscriptionRecord variables = messageSubscription.getRecord().setMessageKey(j).setVariables(messageRecord.getVariablesBuffer());
            this.stateWriter.appendFollowUpEvent(messageSubscription.getKey(), MessageSubscriptionIntent.CORRELATING, variables);
            this.correlatingSubscriptions.add(variables);
            return true;
        });
    }

    private void correlateToMessageStartEvents(MessageRecord messageRecord) {
        this.startEventSubscriptionState.visitSubscriptionsByMessageName(messageRecord.getNameBuffer(), messageStartEventSubscription -> {
            MessageStartEventSubscriptionRecord record = messageStartEventSubscription.getRecord();
            DirectBuffer bpmnProcessIdBuffer = record.getBpmnProcessIdBuffer();
            DirectBuffer correlationKeyBuffer = messageRecord.getCorrelationKeyBuffer();
            if (this.correlatingSubscriptions.contains(bpmnProcessIdBuffer)) {
                return;
            }
            if (correlationKeyBuffer.capacity() == 0 || !this.messageState.existActiveProcessInstance(bpmnProcessIdBuffer, correlationKeyBuffer)) {
                this.correlatingSubscriptions.add(record);
                this.eventHandle.triggerMessageStartEvent(messageStartEventSubscription.getKey(), record, this.messageKey, messageRecord);
            }
        });
    }

    private boolean sendCorrelateCommand() {
        return this.correlatingSubscriptions.visitSubscriptions(subscription -> {
            return this.commandSender.correlateProcessMessageSubscription(subscription.getProcessInstanceKey(), subscription.getElementInstanceKey(), subscription.getBpmnProcessId(), this.messageRecord.getNameBuffer(), this.messageKey, this.messageRecord.getVariablesBuffer(), this.messageRecord.getCorrelationKeyBuffer());
        }) && this.responseWriter.flush();
    }
}
