package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnBehaviors;
import io.camunda.zeebe.engine.processing.common.EventHandle;
import io.camunda.zeebe.engine.processing.message.MessageCorrelateBehavior;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.EventScopeInstanceState;
import io.camunda.zeebe.engine.state.immutable.MessageStartEventSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageCorrelationRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.MessageCorrelationIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.state.KeyGenerator;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/MessageCorrelationCorrelateProcessor.class */
public final class MessageCorrelationCorrelateProcessor implements TypedRecordProcessor<MessageCorrelationRecord> {
    private static final String SUBSCRIPTION_NOT_FOUND = "Expected to find subscription for message with name '%s' and correlation key '%s', but none was found.";
    private final MessageCorrelateBehavior correlateBehavior;
    private final KeyGenerator keyGenerator;
    private final StateWriter stateWriter;
    private final TypedResponseWriter responseWriter;
    private final TypedRejectionWriter rejectionWriter;

    public MessageCorrelationCorrelateProcessor(Writers writers, KeyGenerator keyGenerator, EventScopeInstanceState eventScopeInstanceState, ProcessState processState, BpmnBehaviors bpmnBehaviors, MessageStartEventSubscriptionState messageStartEventSubscriptionState, MessageState messageState, MessageSubscriptionState messageSubscriptionState, SubscriptionCommandSender subscriptionCommandSender) {
        this.stateWriter = writers.state();
        this.responseWriter = writers.response();
        this.rejectionWriter = writers.rejection();
        this.keyGenerator = keyGenerator;
        this.correlateBehavior = new MessageCorrelateBehavior(messageStartEventSubscriptionState, messageState, new EventHandle(keyGenerator, eventScopeInstanceState, writers, processState, bpmnBehaviors.eventTriggerBehavior(), bpmnBehaviors.stateBehavior()), this.stateWriter, messageSubscriptionState, subscriptionCommandSender);
    }

    @Override // io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageCorrelationRecord> typedRecord) {
        RecordValue recordValue = (MessageCorrelationRecord) typedRecord.getValue();
        long nextKey = this.keyGenerator.nextKey();
        recordValue.setMessageKey(nextKey).setRequestId(typedRecord.getRequestId()).setRequestStreamId(typedRecord.getRequestStreamId());
        RecordValue timeToLive = new MessageRecord().setName(typedRecord.getValue().getName()).setCorrelationKey(typedRecord.getValue().getCorrelationKey()).setVariables(typedRecord.getValue().getVariablesBuffer()).setTenantId(typedRecord.getValue().getTenantId()).setTimeToLive(-1L);
        this.stateWriter.appendFollowUpEvent(nextKey, MessageIntent.PUBLISHED, timeToLive);
        this.stateWriter.appendFollowUpEvent(nextKey, MessageCorrelationIntent.CORRELATING, recordValue);
        Subscriptions subscriptions = new Subscriptions();
        MessageCorrelateBehavior.MessageData createMessageData = createMessageData(nextKey, recordValue);
        this.correlateBehavior.correlateToMessageEvents(createMessageData, subscriptions);
        this.correlateBehavior.correlateToMessageStartEvents(createMessageData, subscriptions);
        if (subscriptions.isEmpty()) {
            String formatted = SUBSCRIPTION_NOT_FOUND.formatted(typedRecord.getValue().getName(), typedRecord.getValue().getCorrelationKey());
            this.rejectionWriter.appendRejection(typedRecord, RejectionType.NOT_FOUND, formatted);
            this.responseWriter.writeRejectionOnCommand(typedRecord, RejectionType.NOT_FOUND, formatted);
        } else {
            subscriptions.getFirstMessageStartEventSubscription().ifPresent(subscription -> {
                recordValue.setProcessInstanceKey(subscription.getProcessInstanceKey());
                this.stateWriter.appendFollowUpEvent(nextKey, MessageCorrelationIntent.CORRELATED, recordValue);
                this.responseWriter.writeEventOnCommand(nextKey, MessageCorrelationIntent.CORRELATED, recordValue, typedRecord);
            });
        }
        this.correlateBehavior.sendCorrelateCommands(createMessageData, subscriptions);
        this.stateWriter.appendFollowUpEvent(nextKey, MessageIntent.EXPIRED, timeToLive);
    }

    private MessageCorrelateBehavior.MessageData createMessageData(long j, MessageCorrelationRecord messageCorrelationRecord) {
        return new MessageCorrelateBehavior.MessageData(j, messageCorrelationRecord.getNameBuffer(), messageCorrelationRecord.getCorrelationKeyBuffer(), messageCorrelationRecord.getVariablesBuffer(), messageCorrelationRecord.getTenantId());
    }
}
