package io.zeebe.broker.clustering.orchestration.state;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.orchestration.topic.TopicEvent;
import io.zeebe.broker.clustering.orchestration.topic.TopicState;
import io.zeebe.broker.logstreams.processor.TypedEvent;
import io.zeebe.broker.logstreams.processor.TypedEventProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.util.buffer.BufferUtil;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/clustering/orchestration/state/TopicCreatedProcessor.class */
public class TopicCreatedProcessor implements TypedEventProcessor<TopicEvent> {
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    private final Predicate<DirectBuffer> topicAlreadyCreated;
    private final Consumer<DirectBuffer> notifyListeners;
    private final BiConsumer<Long, TopicEvent> updateTopicState;
    private boolean isCreated;

    public TopicCreatedProcessor(Predicate<DirectBuffer> predicate, Consumer<DirectBuffer> consumer, BiConsumer<Long, TopicEvent> biConsumer) {
        this.topicAlreadyCreated = predicate;
        this.notifyListeners = consumer;
        this.updateTopicState = biConsumer;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public void processEvent(TypedEvent<TopicEvent> typedEvent) {
        TopicEvent value = typedEvent.getValue();
        DirectBuffer name = value.getName();
        this.isCreated = !this.topicAlreadyCreated.test(name);
        if (this.isCreated) {
            value.setState(TopicState.CREATED);
        } else {
            LOG.warn("Rejecting topic create complete as topic {} was already created", BufferUtil.bufferAsString(name));
            value.setState(TopicState.CREATE_COMPLETE_REJECTED);
        }
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public boolean executeSideEffects(TypedEvent<TopicEvent> typedEvent, TypedResponseWriter typedResponseWriter) {
        if (!this.isCreated) {
            return true;
        }
        this.notifyListeners.accept(typedEvent.getValue().getName());
        return true;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public long writeEvent(TypedEvent<TopicEvent> typedEvent, TypedStreamWriter typedStreamWriter) {
        return typedStreamWriter.writeFollowupEvent(typedEvent.getKey(), typedEvent.getValue());
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public void updateState(TypedEvent<TopicEvent> typedEvent) {
        if (this.isCreated) {
            this.updateTopicState.accept(Long.valueOf(typedEvent.getKey()), typedEvent.getValue());
        }
    }
}
