package io.camunda.zeebe.broker.engine.impl;

import io.atomix.cluster.MemberId;
import io.atomix.core.Atomix;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.clustering.topology.TopologyPartitionListenerImpl;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentRequest;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentResponse;
import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentDistributor;
import io.camunda.zeebe.engine.state.mutable.MutableDeploymentState;
import io.camunda.zeebe.protocol.impl.encoding.ErrorResponse;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.nio.ByteOrder;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/engine/impl/DeploymentDistributorImpl.class */
public final class DeploymentDistributorImpl implements DeploymentDistributor {
    public static final Duration PUSH_REQUEST_TIMEOUT = Duration.ofSeconds(15);
    public static final Duration RETRY_DELAY = Duration.ofMillis(100);
    private static final Logger LOG = Loggers.PROCESS_REPOSITORY_LOGGER;
    private static final String DEPLOYMENT_PUSH_TOPIC = "deployment";
    private final PushDeploymentResponse pushDeploymentResponse = new PushDeploymentResponse();
    private final ErrorResponse errorResponse = new ErrorResponse();
    private final TopologyPartitionListenerImpl partitionListener;
    private final ActorControl actor;
    private final Atomix atomix;

    public DeploymentDistributorImpl(Atomix atomix, TopologyPartitionListenerImpl topologyPartitionListenerImpl, MutableDeploymentState mutableDeploymentState, ActorControl actorControl) {
        this.atomix = atomix;
        this.partitionListener = topologyPartitionListenerImpl;
        this.actor = actorControl;
    }

    public ActorFuture<Void> pushDeploymentToPartition(long j, int i, DirectBuffer directBuffer) {
        CompletableActorFuture<Void> completableActorFuture = new CompletableActorFuture<>();
        LOG.debug("Distribute deployment {} to partition {}.", Long.valueOf(j), Integer.valueOf(i));
        PushDeploymentRequest deploymentKey = new PushDeploymentRequest().deployment(directBuffer).deploymentKey(j);
        this.actor.runDelayed(PUSH_REQUEST_TIMEOUT, () -> {
            String deploymentResponseTopic = getDeploymentResponseTopic(deploymentKey.deploymentKey(), i);
            if (completableActorFuture.isDone()) {
                return;
            }
            LOG.warn("Failed to receive deployment response for partition {} (on topic '{}'). Retrying", Integer.valueOf(i), deploymentResponseTopic);
            sendPushDeploymentRequest(i, completableActorFuture, deploymentKey);
        });
        sendPushDeploymentRequest(i, completableActorFuture, deploymentKey);
        return completableActorFuture;
    }

    private void sendPushDeploymentRequest(int i, CompletableActorFuture<Void> completableActorFuture, PushDeploymentRequest pushDeploymentRequest) {
        Int2IntHashMap partitionLeaders = this.partitionListener.getPartitionLeaders();
        if (partitionLeaders.containsKey(i)) {
            int i2 = partitionLeaders.get(i);
            createResponseSubscription(pushDeploymentRequest.deploymentKey(), i, completableActorFuture);
            pushDeploymentToPartition(i2, i, pushDeploymentRequest);
        }
    }

    private void pushDeploymentToPartition(int i, int i2, PushDeploymentRequest pushDeploymentRequest) {
        pushDeploymentRequest.partitionId(i2);
        this.atomix.getCommunicationService().send(DEPLOYMENT_PUSH_TOPIC, pushDeploymentRequest.toBytes(), new MemberId(Integer.toString(i)), PUSH_REQUEST_TIMEOUT).whenComplete((bArr, th) -> {
            if (th != null) {
                LOG.warn("Failed to push deployment to node {} for partition {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), th});
                handleRetry(i, i2, pushDeploymentRequest);
            } else {
                UnsafeBuffer unsafeBuffer = new UnsafeBuffer(bArr);
                if (this.errorResponse.tryWrap(unsafeBuffer)) {
                    handleErrorResponseOnPushDeploymentRequest(i, i2, pushDeploymentRequest, unsafeBuffer);
                }
            }
        });
    }

    private void handleErrorResponseOnPushDeploymentRequest(int i, int i2, PushDeploymentRequest pushDeploymentRequest, DirectBuffer directBuffer) {
        this.errorResponse.wrap(directBuffer, 0, directBuffer.capacity());
        ErrorCode errorCode = this.errorResponse.getErrorCode();
        if (errorCode == ErrorCode.PARTITION_LEADER_MISMATCH) {
            LOG.debug("Received partition leader mismatch error from partition {} for deployment {}. Retrying.", Integer.valueOf(this.errorResponse.getErrorData().getInt(0, ByteOrder.LITTLE_ENDIAN)), Long.valueOf(pushDeploymentRequest.deploymentKey()));
        } else {
            if (errorCode == ErrorCode.RESOURCE_EXHAUSTED) {
                LOG.warn("Received rejected deployment push due to error of type {}: '{}'. Will be retried after a delay", errorCode.name(), BufferUtil.bufferAsString(this.errorResponse.getErrorData()));
                return;
            }
            LOG.warn("Received rejected deployment push due to error of type {}: '{}'", errorCode.name(), BufferUtil.bufferAsString(this.errorResponse.getErrorData()));
        }
        handleRetry(i, i2, pushDeploymentRequest);
    }

    private void createResponseSubscription(long j, int i, CompletableActorFuture<Void> completableActorFuture) {
        String deploymentResponseTopic = getDeploymentResponseTopic(j, i);
        if (this.atomix.getEventService().getSubscriptions(deploymentResponseTopic).isEmpty()) {
            LOG.trace("Setting up deployment subscription for topic {}", deploymentResponseTopic);
            this.atomix.getEventService().subscribe(deploymentResponseTopic, bArr -> {
                LOG.trace("Receiving deployment response on topic {}", deploymentResponseTopic);
                DirectBuffer unsafeBuffer = new UnsafeBuffer(bArr);
                if (this.pushDeploymentResponse.tryWrap(unsafeBuffer)) {
                    if (!completableActorFuture.isDone()) {
                        completableActorFuture.complete((Object) null);
                    }
                } else if (this.errorResponse.tryWrap(unsafeBuffer)) {
                    this.errorResponse.wrap(unsafeBuffer, 0, unsafeBuffer.capacity());
                    LOG.warn("Received rejected deployment push due to error of type {}: '{}'", this.errorResponse.getErrorCode().name(), BufferUtil.bufferAsString(this.errorResponse.getErrorData()));
                } else {
                    LOG.warn("Received unknown deployment response on topic {}", deploymentResponseTopic);
                }
                return CompletableFuture.completedFuture(null);
            });
        }
    }

    private void handleRetry(int i, int i2, PushDeploymentRequest pushDeploymentRequest) {
        LOG.trace("Retry deployment push to partition {} after {}", Integer.valueOf(i2), RETRY_DELAY);
        this.actor.runDelayed(RETRY_DELAY, () -> {
            Int2IntHashMap partitionLeaders = this.partitionListener.getPartitionLeaders();
            if (partitionLeaders.containsKey(i2)) {
                pushDeploymentToPartition(partitionLeaders.get(i2), i2, pushDeploymentRequest);
            } else {
                pushDeploymentToPartition(i, i2, pushDeploymentRequest);
            }
        });
    }

    public static String getDeploymentResponseTopic(long j, int i) {
        return String.format("deployment-response-%d-%d", Long.valueOf(j), Integer.valueOf(i));
    }
}
