package io.zeebe.broker.clustering.orchestration.state;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.orchestration.topic.TopicRecord;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.intent.TopicIntent;
import io.zeebe.util.buffer.BufferUtil;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/clustering/orchestration/state/TopicCreateProcessor.class */
public class TopicCreateProcessor implements TypedRecordProcessor<TopicRecord> {
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    private final Predicate<DirectBuffer> topicExists;
    private final Consumer<DirectBuffer> notifyListeners;
    private final BiConsumer<Long, TopicRecord> addTopic;
    private boolean isCreating;
    private String rejectionReason;
    private RejectionType rejectionType;

    public TopicCreateProcessor(Predicate<DirectBuffer> predicate, Consumer<DirectBuffer> consumer, BiConsumer<Long, TopicRecord> biConsumer) {
        this.topicExists = predicate;
        this.notifyListeners = consumer;
        this.addTopic = biConsumer;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<TopicRecord> typedRecord) {
        this.isCreating = false;
        TopicRecord value = typedRecord.getValue();
        DirectBuffer name = value.getName();
        if (this.topicExists.test(name)) {
            this.rejectionReason = "Topic exists already";
            this.rejectionType = RejectionType.NOT_APPLICABLE;
            LOG.warn("Rejecting topic {} creation as a topic with the same name already exists", BufferUtil.bufferAsString(name));
        } else if (value.getPartitions() < 1) {
            this.rejectionReason = "Topic must have at least one partition";
            this.rejectionType = RejectionType.BAD_VALUE;
            LOG.warn("Rejecting topic {} creation as a topic has to have at least one partition", BufferUtil.bufferAsString(name));
        } else if (value.getReplicationFactor() >= 1) {
            LOG.info("Creating topic {}", value);
            this.isCreating = true;
        } else {
            this.rejectionReason = "Topic must have at least one replica";
            this.rejectionType = RejectionType.BAD_VALUE;
            LOG.warn("Rejecting topic {} creation as a topic has to have at least one replica", BufferUtil.bufferAsString(name));
        }
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public boolean executeSideEffects(TypedRecord<TopicRecord> typedRecord, TypedResponseWriter typedResponseWriter) {
        if (!this.isCreating) {
            return typedResponseWriter.writeRejection((TypedRecord<?>) typedRecord, this.rejectionType, this.rejectionReason);
        }
        boolean writeRecord = typedResponseWriter.writeRecord(TopicIntent.CREATING, typedRecord);
        if (writeRecord) {
            this.notifyListeners.accept(typedRecord.getValue().getName());
        }
        return writeRecord;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public long writeRecord(TypedRecord<TopicRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
        return this.isCreating ? typedStreamWriter.writeFollowUpEvent(typedRecord.getKey(), TopicIntent.CREATING, typedRecord.getValue()) : typedStreamWriter.writeRejection(typedRecord, this.rejectionType, this.rejectionReason);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void updateState(TypedRecord<TopicRecord> typedRecord) {
        if (this.isCreating) {
            this.addTopic.accept(Long.valueOf(typedRecord.getKey()), typedRecord.getValue());
        }
    }
}
