package io.zeebe.broker.clustering.orchestration.topic;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.api.CreatePartitionRequest;
import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.clustering.base.topology.NodeInfo;
import io.zeebe.broker.clustering.base.topology.PartitionInfo;
import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.clustering.base.topology.TopologyPartitionListener;
import io.zeebe.broker.clustering.orchestration.NodeSelector;
import io.zeebe.broker.clustering.orchestration.id.IdGenerator;
import io.zeebe.broker.clustering.orchestration.state.KnownTopics;
import io.zeebe.broker.clustering.orchestration.state.KnownTopicsListener;
import io.zeebe.broker.clustering.orchestration.state.TopicInfo;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.msgpack.value.IntegerValue;
import io.zeebe.msgpack.value.ValueArray;
import io.zeebe.protocol.intent.TopicIntent;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.transport.ClientTransport;
import io.zeebe.util.sched.Actor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/clustering/orchestration/topic/TopicCreationService.class */
public class TopicCreationService extends Actor implements Service<TopicCreationService>, KnownTopicsListener, TopologyPartitionListener {
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    public static final Duration TIMER_RATE = Duration.ofSeconds(1);
    public static final Duration PENDING_TIMEOUT = Duration.ofMinutes(1);
    private KnownTopics knownTopics;
    private TopologyManager topologyManager;
    private TypedStreamWriter streamWriter;
    private IdGenerator idGenerator;
    private NodeSelector nodeSelector;
    private ClientTransport clientTransport;
    private final Injector<KnownTopics> stateInjector = new Injector<>();
    private final Injector<TopologyManager> topologyManagerInjector = new Injector<>();
    private final Injector<Partition> leaderSystemPartitionInjector = new Injector<>();
    private final Injector<IdGenerator> idGeneratorInjector = new Injector<>();
    private final Injector<NodeSelector> nodeOrchestratingServiceInjector = new Injector<>();
    private final Injector<ClientTransport> managementClientApiInjector = new Injector<>();
    private Set<String> pendingTopicCreationRequests = new HashSet();
    private Set<String> pendingTopicCompletions = new HashSet();

    public void start(ServiceStartContext serviceStartContext) {
        this.knownTopics = (KnownTopics) this.stateInjector.getValue();
        this.topologyManager = (TopologyManager) this.topologyManagerInjector.getValue();
        this.idGenerator = (IdGenerator) this.idGeneratorInjector.getValue();
        this.nodeSelector = (NodeSelector) this.nodeOrchestratingServiceInjector.getValue();
        this.clientTransport = (ClientTransport) this.managementClientApiInjector.getValue();
        this.streamWriter = new TypedStreamEnvironment(((Partition) this.leaderSystemPartitionInjector.getValue()).getLogStream(), null).buildStreamWriter();
        this.knownTopics.registerTopicListener(this);
        this.topologyManager.addTopologyPartitionListener(this);
        serviceStartContext.async(serviceStartContext.getScheduler().submitActor(this));
    }

    public void stop(ServiceStopContext serviceStopContext) {
        serviceStopContext.async(this.actor.close());
    }

    public String getName() {
        return "create-topic";
    }

    protected void onActorStarted() {
        this.actor.runAtFixedRate(TIMER_RATE, this::checkCurrentState);
    }

    @Override // io.zeebe.broker.clustering.orchestration.state.KnownTopicsListener
    public void topicAdded(String str) {
        this.actor.run(() -> {
            checkCurrentState(str);
        });
    }

    @Override // io.zeebe.broker.clustering.base.topology.TopologyPartitionListener
    public void onPartitionUpdated(PartitionInfo partitionInfo, NodeInfo nodeInfo) {
        this.actor.run(() -> {
            checkCurrentState(partitionInfo.getTopicName());
        });
    }

    private void checkCurrentState() {
        checkCurrentState(null);
    }

    private void checkCurrentState(String str) {
        this.actor.runOnCompletion(this.topologyManager.query(ClusterPartitionState::computeCurrentState), (clusterPartitionState, th) -> {
            if (th == null) {
                computeStateDifferences(clusterPartitionState, str);
            } else {
                LOG.error("Unable to compute current cluster topic state from topology", th);
            }
        });
    }

    private void computeStateDifferences(ClusterPartitionState clusterPartitionState, String str) {
        this.actor.runOnCompletion(this.knownTopics.queryTopics(iterable -> {
            return computePendingTopics(iterable, clusterPartitionState);
        }), (list, th) -> {
            if (th != null) {
                LOG.error("Failed to compute the topic partitions to create");
                return;
            }
            Iterator it = list.iterator();
            while (it.hasNext()) {
                PendingTopic pendingTopic = (PendingTopic) it.next();
                String topicName = pendingTopic.getTopicName();
                if (str == null || str.equals(topicName)) {
                    if (pendingTopic.getMissingPartitions() > 0) {
                        if (!this.pendingTopicCreationRequests.contains(topicName)) {
                            LOG.debug("Creating {} partitions for topic {}", Integer.valueOf(pendingTopic.getMissingPartitions()), topicName);
                            for (int i = 0; i < pendingTopic.getMissingPartitions(); i++) {
                                createPartition(pendingTopic);
                            }
                            this.pendingTopicCreationRequests.add(topicName);
                            this.actor.runDelayed(PENDING_TIMEOUT, () -> {
                                this.pendingTopicCreationRequests.remove(topicName);
                            });
                        }
                    } else if (!this.pendingTopicCompletions.contains(topicName)) {
                        int partitionCount = pendingTopic.getPartitionCount();
                        int replicationFactor = pendingTopic.getReplicationFactor();
                        TopicRecord topicRecord = new TopicRecord();
                        topicRecord.setName(pendingTopic.getTopicNameBuffer());
                        topicRecord.setPartitions(partitionCount);
                        topicRecord.setReplicationFactor(replicationFactor);
                        ValueArray<IntegerValue> partitionIds = topicRecord.getPartitionIds();
                        pendingTopic.getPartitionIds().forEach(num -> {
                            ((IntegerValue) partitionIds.add()).setValue(num.intValue());
                        });
                        this.actor.runUntilDone(() -> {
                            writeEvent(pendingTopic.getKey(), TopicIntent.CREATE_COMPLETE, topicRecord);
                        });
                        this.pendingTopicCreationRequests.remove(topicName);
                        this.pendingTopicCompletions.add(topicName);
                        this.actor.runDelayed(PENDING_TIMEOUT, () -> {
                            this.pendingTopicCompletions.remove(topicName);
                        });
                        LOG.debug("Topic {} with {} partition(s) and replication factor {} created", new Object[]{topicName, Integer.valueOf(partitionCount), Integer.valueOf(replicationFactor)});
                    }
                }
            }
        });
    }

    private List<PendingTopic> computePendingTopics(Iterable<TopicInfo> iterable, ClusterPartitionState clusterPartitionState) {
        ArrayList arrayList = new ArrayList();
        for (TopicInfo topicInfo : iterable) {
            if (!topicInfo.getPartitionIds().iterator().hasNext()) {
                List list = (List) clusterPartitionState.getPartitions(topicInfo.getTopicNameBuffer()).stream().map((v0) -> {
                    return v0.getPartitionId();
                }).collect(Collectors.toList());
                arrayList.add(new PendingTopic(topicInfo.getTopicNameBuffer(), topicInfo.getPartitionCount(), topicInfo.getReplicationFactor(), list, topicInfo.getPartitionCount() - list.size(), topicInfo.getKey()));
            }
        }
        return arrayList;
    }

    private void createPartition(PendingTopic pendingTopic) {
        this.actor.runOnCompletion(this.idGenerator.nextId(), (num, th) -> {
            if (th == null) {
                sendCreatePartitionRequest(pendingTopic, num);
            } else {
                LOG.error("Failed to get new partition id for topic {}", pendingTopic.getTopicName(), th);
            }
        });
    }

    private void sendCreatePartitionRequest(PendingTopic pendingTopic, Integer num) {
        this.actor.runOnCompletion(this.nodeSelector.getNextSocketAddress(new PartitionInfo(pendingTopic.getTopicNameBuffer(), num.intValue(), pendingTopic.getReplicationFactor())), (nodeInfo, th) -> {
            if (th == null) {
                sendCreatePartitionRequest(pendingTopic, num, nodeInfo);
            } else {
                LOG.error("Problem in resolving next node address to create partition {} for topic {}", new Object[]{num, pendingTopic.getTopicName(), th});
            }
        });
    }

    private void sendCreatePartitionRequest(PendingTopic pendingTopic, Integer num, NodeInfo nodeInfo) {
        CreatePartitionRequest replicationFactor = new CreatePartitionRequest().topicName(pendingTopic.getTopicNameBuffer()).partitionId(num.intValue()).replicationFactor(pendingTopic.getReplicationFactor());
        LOG.debug("Send create partition request for topic {} to node {} with partitionId={}", new Object[]{pendingTopic.getTopicName(), nodeInfo.getManagementApiAddress(), num});
        this.actor.runOnCompletion(this.clientTransport.getOutput().sendRequest(this.clientTransport.registerRemoteAddress(nodeInfo.getManagementApiAddress()), replicationFactor), (clientResponse, th) -> {
            if (th == null) {
                LOG.info("Partition {} for topic {} created on node {}", new Object[]{num, pendingTopic.getTopicName(), nodeInfo.getManagementApiAddress()});
            } else {
                LOG.warn("Failed to create partition {} for topic {} on node {}", new Object[]{num, pendingTopic.getTopicName(), nodeInfo.getManagementApiAddress(), th});
            }
        });
    }

    private void writeEvent(long j, TopicIntent topicIntent, TopicRecord topicRecord) {
        if (this.streamWriter.writeFollowUpEvent(j, topicIntent, topicRecord) >= 0) {
            this.actor.done();
        } else {
            this.actor.yield();
        }
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public TopicCreationService m45get() {
        return this;
    }

    public Injector<KnownTopics> getStateInjector() {
        return this.stateInjector;
    }

    public Injector<TopologyManager> getTopologyManagerInjector() {
        return this.topologyManagerInjector;
    }

    public Injector<Partition> getLeaderSystemPartitionInjector() {
        return this.leaderSystemPartitionInjector;
    }

    public Injector<IdGenerator> getIdGeneratorInjector() {
        return this.idGeneratorInjector;
    }

    public Injector<NodeSelector> getNodeOrchestratingServiceInjector() {
        return this.nodeOrchestratingServiceInjector;
    }

    public Injector<ClientTransport> getManagementClientApiInjector() {
        return this.managementClientApiInjector;
    }
}
