package io.zeebe.broker.workflow.deployment.distribute.processor;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.base.topology.NodeInfo;
import io.zeebe.broker.clustering.base.topology.TopologyPartitionListenerImpl;
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.broker.system.management.deployment.PushDeploymentRequest;
import io.zeebe.broker.system.management.deployment.PushDeploymentResponse;
import io.zeebe.broker.workflow.deployment.distribute.processor.state.DeploymentsStateController;
import io.zeebe.transport.ClientResponse;
import io.zeebe.transport.ClientTransport;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.Iterator;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.IntArrayList;
import org.agrona.collections.Long2ObjectHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/workflow/deployment/distribute/processor/DeploymentDistributor.class */
public class DeploymentDistributor {
    private static final Logger LOG = Loggers.WORKFLOW_REPOSITORY_LOGGER;
    public static final Duration PUSH_REQUEST_TIMEOUT = Duration.ofSeconds(15);
    public static final Duration PARTITION_LEADER_RESOLVE_RETRY = Duration.ofMillis(100);
    private final ClientTransport managementApi;
    private final TopologyPartitionListenerImpl partitionListener;
    private final ActorControl actor;
    private final DeploymentsStateController deploymentsStateController;
    private final IntArrayList partitionsToDistributeTo;
    private final PushDeploymentRequest pushDeploymentRequest = new PushDeploymentRequest();
    private final PushDeploymentResponse pushDeploymentResponse = new PushDeploymentResponse();
    private final transient Long2ObjectHashMap<ActorFuture<Void>> pendingDeploymentFutures = new Long2ObjectHashMap<>();

    public DeploymentDistributor(ClusterCfg clusterCfg, ClientTransport clientTransport, TopologyPartitionListenerImpl topologyPartitionListenerImpl, DeploymentsStateController deploymentsStateController, ActorControl actorControl) {
        this.managementApi = clientTransport;
        this.partitionListener = topologyPartitionListenerImpl;
        this.actor = actorControl;
        this.deploymentsStateController = deploymentsStateController;
        this.partitionsToDistributeTo = partitionsToDistributeTo(clusterCfg);
    }

    private IntArrayList partitionsToDistributeTo(ClusterCfg clusterCfg) {
        IntArrayList intArrayList = new IntArrayList();
        intArrayList.addAll(clusterCfg.getPartitionIds());
        intArrayList.removeInt(0);
        return intArrayList;
    }

    public ActorFuture<Void> pushDeployment(long j, long j2, DirectBuffer directBuffer) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.deploymentsStateController.putPendingDeployment(j, new PendingDeploymentDistribution(directBuffer, j2));
        this.pendingDeploymentFutures.put(j, completableActorFuture);
        pushDeploymentToPartitions(j);
        return completableActorFuture;
    }

    public PendingDeploymentDistribution removePendingDeployment(long j) {
        return this.deploymentsStateController.removePendingDeployment(j);
    }

    private void pushDeploymentToPartitions(long j) {
        if (!this.partitionsToDistributeTo.isEmpty()) {
            deployOnMultiplePartitions(j);
            return;
        }
        LOG.trace("No other partitions to distribute deployment.");
        LOG.trace("Deployment finished.");
        ((ActorFuture) this.pendingDeploymentFutures.remove(j)).complete((Object) null);
    }

    private void deployOnMultiplePartitions(long j) {
        LOG.trace("Distribute deployment to other partitions.");
        PendingDeploymentDistribution pendingDeployment = this.deploymentsStateController.getPendingDeployment(j);
        DirectBuffer deployment = pendingDeployment.getDeployment();
        pendingDeployment.setDistributionCount(this.partitionsToDistributeTo.size());
        this.pushDeploymentRequest.reset();
        this.pushDeploymentRequest.deployment(deployment).deploymentKey(j);
        IntArrayList intArrayList = new IntArrayList();
        intArrayList.addAll(this.partitionsToDistributeTo);
        distributeDeployment(intArrayList);
    }

    private void distributeDeployment(IntArrayList intArrayList) {
        IntArrayList distributeDeploymentToPartitions = distributeDeploymentToPartitions(intArrayList);
        if (distributeDeploymentToPartitions.isEmpty()) {
            LOG.trace("Pushed deployment to all partitions");
        } else {
            this.actor.runDelayed(PARTITION_LEADER_RESOLVE_RETRY, () -> {
                distributeDeployment(distributeDeploymentToPartitions);
            });
        }
    }

    private IntArrayList distributeDeploymentToPartitions(IntArrayList intArrayList) {
        Int2ObjectHashMap<NodeInfo> partitionLeaders = this.partitionListener.getPartitionLeaders();
        Iterator it = intArrayList.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            NodeInfo nodeInfo = (NodeInfo) partitionLeaders.get(num);
            if (nodeInfo != null) {
                it.remove();
                pushDeploymentToPartition(nodeInfo.getNodeId(), num.intValue());
            }
        }
        return intArrayList;
    }

    private void pushDeploymentToPartition(int i, int i2) {
        this.pushDeploymentRequest.partitionId(i2);
        ActorFuture sendRequestWithRetry = this.managementApi.getOutput().sendRequestWithRetry(() -> {
            return Integer.valueOf(i);
        }, directBuffer -> {
            return !this.pushDeploymentResponse.tryWrap(directBuffer);
        }, this.pushDeploymentRequest, PUSH_REQUEST_TIMEOUT);
        LOG.debug("Deployment pushed to partition {} (node id: {}).", Integer.valueOf(i2), Integer.valueOf(i));
        this.actor.runOnCompletion(sendRequestWithRetry, (clientResponse, th) -> {
            if (th == null) {
                handlePushResponse(clientResponse);
                return;
            }
            LOG.error("Error on pushing deployment to partition {}. Retry request. ", Integer.valueOf(i2), th);
            NodeInfo nodeInfo = (NodeInfo) this.partitionListener.getPartitionLeaders().get(i2);
            if (nodeInfo != null) {
                pushDeploymentToPartition(nodeInfo.getNodeId(), i2);
            } else {
                pushDeploymentToPartition(i, i2);
            }
        });
    }

    private void handlePushResponse(ClientResponse clientResponse) {
        this.pushDeploymentResponse.wrap(clientResponse.getResponseBuffer());
        long deploymentKey = this.pushDeploymentResponse.deploymentKey();
        if (this.deploymentsStateController.getPendingDeployment(deploymentKey).decrementCount() != 0) {
            LOG.trace("Deployment was pushed to partition {} successfully.", Integer.valueOf(this.pushDeploymentResponse.partitionId()));
        } else {
            LOG.debug("Deployment pushed to all partitions successfully.");
            ((ActorFuture) this.pendingDeploymentFutures.remove(deploymentKey)).complete((Object) null);
        }
    }
}
