package io.zeebe.broker.subscription.message.processor;

import io.zeebe.broker.logstreams.processor.SideEffectProducer;
import io.zeebe.broker.logstreams.processor.TypedBatchWriter;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.state.Message;
import io.zeebe.broker.subscription.message.state.MessageStateController;
import io.zeebe.broker.subscription.message.state.MessageSubscription;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/* loaded from: input_file:io/zeebe/broker/subscription/message/processor/PublishMessageProcessor.class */
public class PublishMessageProcessor implements TypedRecordProcessor<MessageRecord> {
    private final MessageStateController messageStateController;
    private final SubscriptionCommandSender commandSender;
    private TypedResponseWriter responseWriter;
    private MessageRecord messageRecord;
    private List<MessageSubscription> matchingSubscriptions;

    public PublishMessageProcessor(MessageStateController messageStateController, SubscriptionCommandSender subscriptionCommandSender) {
        this.messageStateController = messageStateController;
        this.commandSender = subscriptionCommandSender;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        this.responseWriter = typedResponseWriter;
        this.messageRecord = typedRecord.getValue();
        if (!this.messageRecord.hasMessageId() || !this.messageStateController.exist(this.messageRecord.getName(), this.messageRecord.getCorrelationKey(), this.messageRecord.getMessageId())) {
            handleNewMessage(typedRecord, typedResponseWriter, typedStreamWriter, consumer);
            return;
        }
        String format = String.format("message with id '%s' is already published", BufferUtil.bufferAsString(this.messageRecord.getMessageId()));
        typedStreamWriter.writeRejection(typedRecord, RejectionType.BAD_VALUE, format);
        typedResponseWriter.writeRejectionOnCommand((TypedRecord<?>) typedRecord, RejectionType.BAD_VALUE, format);
    }

    private void handleNewMessage(TypedRecord<MessageRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        TypedBatchWriter newBatch = typedStreamWriter.newBatch();
        long addNewEvent = newBatch.addNewEvent(MessageIntent.PUBLISHED, typedRecord.getValue());
        typedResponseWriter.writeEventOnCommand(addNewEvent, MessageIntent.PUBLISHED, typedRecord.getValue(), typedRecord);
        this.matchingSubscriptions = (List) ((Map) this.messageStateController.findSubscriptions(this.messageRecord.getName(), this.messageRecord.getCorrelationKey()).stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getWorkflowInstanceKey();
        }))).values().stream().map(list -> {
            return (MessageSubscription) list.get(0);
        }).collect(Collectors.toList());
        for (MessageSubscription messageSubscription : this.matchingSubscriptions) {
            messageSubscription.setMessagePayload(this.messageRecord.getPayload());
            this.messageStateController.updateCommandSentTime(messageSubscription);
        }
        consumer.accept(this::correlateMessage);
        if (this.messageRecord.getTimeToLive() <= 0) {
            newBatch.addFollowUpEvent(addNewEvent, MessageIntent.DELETED, this.messageRecord);
            return;
        }
        Message message = new Message(addNewEvent, this.messageRecord.getName(), this.messageRecord.getCorrelationKey(), this.messageRecord.getPayload(), this.messageRecord.getMessageId(), this.messageRecord.getTimeToLive(), this.messageRecord.getTimeToLive() + ActorClock.currentTimeMillis());
        this.messageStateController.put(message);
        Iterator<MessageSubscription> it = this.matchingSubscriptions.iterator();
        while (it.hasNext()) {
            this.messageStateController.putMessageCorrelation(message.getKey(), it.next().getWorkflowInstanceKey());
        }
    }

    private boolean correlateMessage() {
        for (MessageSubscription messageSubscription : this.matchingSubscriptions) {
            if (!this.commandSender.correlateWorkflowInstanceSubscription(messageSubscription.getWorkflowInstanceKey(), messageSubscription.getElementInstanceKey(), this.messageRecord.getName(), this.messageRecord.getPayload())) {
                return false;
            }
        }
        return this.responseWriter.flush();
    }
}
