package io.zeebe.broker.subscription.message.processor;

import io.zeebe.broker.logstreams.processor.SideEffectProducer;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.state.Message;
import io.zeebe.broker.subscription.message.state.MessageStartEventSubscriptionState;
import io.zeebe.broker.subscription.message.state.MessageState;
import io.zeebe.broker.subscription.message.state.MessageSubscriptionState;
import io.zeebe.protocol.BpmnElementType;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.collections.LongArrayList;

/* loaded from: input_file:io/zeebe/broker/subscription/message/processor/PublishMessageProcessor.class */
public class PublishMessageProcessor implements TypedRecordProcessor<MessageRecord> {
    public static final String ALREADY_PUBLISHED_MESSAGE = "Expected to publish a new message with id '%s', but a message with that id was already published";
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;
    private final MessageStartEventSubscriptionState startEventSubscriptionState;
    private final SubscriptionCommandSender commandSender;
    private TypedResponseWriter responseWriter;
    private MessageRecord messageRecord;
    private final LongArrayList correlatedWorkflowInstances = new LongArrayList();
    private final LongArrayList correlatedElementInstances = new LongArrayList();

    public PublishMessageProcessor(MessageState messageState, MessageSubscriptionState messageSubscriptionState, MessageStartEventSubscriptionState messageStartEventSubscriptionState, SubscriptionCommandSender subscriptionCommandSender) {
        this.messageState = messageState;
        this.subscriptionState = messageSubscriptionState;
        this.startEventSubscriptionState = messageStartEventSubscriptionState;
        this.commandSender = subscriptionCommandSender;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        this.responseWriter = typedResponseWriter;
        this.messageRecord = typedRecord.getValue();
        if (!this.messageRecord.hasMessageId() || !this.messageState.exist(this.messageRecord.getName(), this.messageRecord.getCorrelationKey(), this.messageRecord.getMessageId())) {
            handleNewMessage(typedRecord, typedResponseWriter, typedStreamWriter, consumer);
            return;
        }
        String format = String.format(ALREADY_PUBLISHED_MESSAGE, BufferUtil.bufferAsString(this.messageRecord.getMessageId()));
        typedStreamWriter.appendRejection(typedRecord, RejectionType.ALREADY_EXISTS, format);
        typedResponseWriter.writeRejectionOnCommand((TypedRecord<?>) typedRecord, RejectionType.ALREADY_EXISTS, format);
    }

    private void handleNewMessage(TypedRecord<MessageRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        long appendNewEvent = typedStreamWriter.appendNewEvent(MessageIntent.PUBLISHED, typedRecord.getValue());
        typedResponseWriter.writeEventOnCommand(appendNewEvent, MessageIntent.PUBLISHED, typedRecord.getValue(), typedRecord);
        this.correlatedWorkflowInstances.clear();
        this.correlatedElementInstances.clear();
        this.subscriptionState.visitSubscriptions(this.messageRecord.getName(), this.messageRecord.getCorrelationKey(), messageSubscription -> {
            long workflowInstanceKey = messageSubscription.getWorkflowInstanceKey();
            long elementInstanceKey = messageSubscription.getElementInstanceKey();
            if (messageSubscription.isCorrelating() || this.correlatedWorkflowInstances.containsLong(workflowInstanceKey)) {
                return true;
            }
            this.subscriptionState.updateToCorrelatingState(messageSubscription, this.messageRecord.getPayload(), ActorClock.currentTimeMillis());
            this.correlatedWorkflowInstances.addLong(workflowInstanceKey);
            this.correlatedElementInstances.addLong(elementInstanceKey);
            return true;
        });
        consumer.accept(this::correlateMessage);
        correlateMessageStartEvents(typedRecord, typedStreamWriter);
        if (this.messageRecord.getTimeToLive() <= 0) {
            typedStreamWriter.appendFollowUpEvent(appendNewEvent, MessageIntent.DELETED, this.messageRecord);
            return;
        }
        Message message = new Message(appendNewEvent, this.messageRecord.getName(), this.messageRecord.getCorrelationKey(), this.messageRecord.getPayload(), this.messageRecord.getMessageId(), this.messageRecord.getTimeToLive(), this.messageRecord.getTimeToLive() + ActorClock.currentTimeMillis());
        this.messageState.put(message);
        this.correlatedWorkflowInstances.forEachOrderedLong(j -> {
            this.messageState.putMessageCorrelation(message.getKey(), j);
        });
    }

    private boolean correlateMessage() {
        for (int i = 0; i < this.correlatedWorkflowInstances.size(); i++) {
            if (!this.commandSender.correlateWorkflowInstanceSubscription(this.correlatedWorkflowInstances.getLong(i), this.correlatedElementInstances.getLong(i), this.messageRecord.getName(), this.messageRecord.getPayload())) {
                return false;
            }
        }
        return this.responseWriter.flush();
    }

    private void correlateMessageStartEvents(TypedRecord<MessageRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
        this.startEventSubscriptionState.visitSubscriptionsByMessageName(typedRecord.getValue().getName(), messageStartEventSubscriptionRecord -> {
            DirectBuffer startEventId = messageStartEventSubscriptionRecord.getStartEventId();
            long workflowKey = messageStartEventSubscriptionRecord.getWorkflowKey();
            WorkflowInstanceRecord workflowInstanceRecord = new WorkflowInstanceRecord();
            workflowInstanceRecord.setWorkflowKey(workflowKey).setElementId(startEventId).setBpmnElementType(BpmnElementType.START_EVENT).setPayload(typedRecord.getValue().getPayload());
            typedStreamWriter.appendNewEvent(WorkflowInstanceIntent.EVENT_OCCURRED, workflowInstanceRecord);
        });
    }
}
