package io.zeebe.broker.clustering.orchestration.state;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.orchestration.topic.TopicEvent;
import io.zeebe.broker.clustering.orchestration.topic.TopicState;
import io.zeebe.broker.logstreams.processor.TypedEvent;
import io.zeebe.broker.logstreams.processor.TypedEventProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.util.buffer.BufferUtil;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/clustering/orchestration/state/TopicCreateProcessor.class */
public class TopicCreateProcessor implements TypedEventProcessor<TopicEvent> {
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    private final Predicate<DirectBuffer> topicExists;
    private final Consumer<DirectBuffer> notifyListeners;
    private final BiConsumer<Long, TopicEvent> addTopic;
    private boolean isCreating;

    public TopicCreateProcessor(Predicate<DirectBuffer> predicate, Consumer<DirectBuffer> consumer, BiConsumer<Long, TopicEvent> biConsumer) {
        this.topicExists = predicate;
        this.notifyListeners = consumer;
        this.addTopic = biConsumer;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public void processEvent(TypedEvent<TopicEvent> typedEvent) {
        this.isCreating = false;
        TopicEvent value = typedEvent.getValue();
        DirectBuffer name = value.getName();
        if (this.topicExists.test(name)) {
            LOG.warn("Rejecting topic {} creation as a topic with the same name already exists", BufferUtil.bufferAsString(name));
            value.setState(TopicState.CREATE_REJECTED);
            return;
        }
        if (value.getPartitions() < 1) {
            LOG.warn("Rejecting topic {} creation as a topic has to have at least one partition", BufferUtil.bufferAsString(name));
            value.setState(TopicState.CREATE_REJECTED);
        } else if (value.getReplicationFactor() < 1) {
            LOG.warn("Rejecting topic {} creation as a topic has to have at least one replication", BufferUtil.bufferAsString(name));
            value.setState(TopicState.CREATE_REJECTED);
        } else {
            LOG.info("Creating topic {} with partition count {} and replication factor {}", new Object[]{BufferUtil.bufferAsString(name), Integer.valueOf(value.getPartitions()), Integer.valueOf(value.getReplicationFactor())});
            this.isCreating = true;
            value.setState(TopicState.CREATING);
        }
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public boolean executeSideEffects(TypedEvent<TopicEvent> typedEvent, TypedResponseWriter typedResponseWriter) {
        boolean write = typedResponseWriter.write(typedEvent);
        if (write && this.isCreating) {
            this.notifyListeners.accept(typedEvent.getValue().getName());
        }
        return write;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public long writeEvent(TypedEvent<TopicEvent> typedEvent, TypedStreamWriter typedStreamWriter) {
        return typedStreamWriter.writeFollowupEvent(typedEvent.getKey(), typedEvent.getValue());
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public void updateState(TypedEvent<TopicEvent> typedEvent) {
        if (this.isCreating) {
            this.addTopic.accept(Long.valueOf(typedEvent.getKey()), typedEvent.getValue());
        }
    }
}
