package io.zeebe.broker.workflow.deployment.distribute.processor;

import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.clustering.base.topology.TopologyPartitionListenerImpl;
import io.zeebe.broker.logstreams.processor.SideEffectProducer;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.broker.workflow.deployment.distribute.processor.state.DeploymentsStateController;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.transport.ClientTransport;
import io.zeebe.util.sched.ActorControl;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;

/* loaded from: input_file:io/zeebe/broker/workflow/deployment/distribute/processor/DeploymentDistributeProcessor.class */
public class DeploymentDistributeProcessor implements TypedRecordProcessor<DeploymentRecord> {
    private final TopologyManager topologyManager;
    private final LogStreamWriterImpl logStreamWriter;
    private final ClientTransport managementApi;
    private final DeploymentsStateController deploymentsStateController;
    private final ClusterCfg clusterCfg;
    private ActorControl actor;
    private TopologyPartitionListenerImpl partitionListener;
    private DeploymentDistributor deploymentDistributor;
    private int streamProcessorId;

    public DeploymentDistributeProcessor(ClusterCfg clusterCfg, TopologyManager topologyManager, DeploymentsStateController deploymentsStateController, ClientTransport clientTransport, LogStreamWriterImpl logStreamWriterImpl) {
        this.clusterCfg = clusterCfg;
        this.deploymentsStateController = deploymentsStateController;
        this.topologyManager = topologyManager;
        this.managementApi = clientTransport;
        this.logStreamWriter = logStreamWriterImpl;
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        this.streamProcessorId = typedStreamProcessor.getStreamProcessorContext().getId();
        this.actor = typedStreamProcessor.getActor();
        this.partitionListener = new TopologyPartitionListenerImpl(typedStreamProcessor.getActor());
        this.topologyManager.addTopologyPartitionListener(this.partitionListener);
        this.deploymentDistributor = new DeploymentDistributor(this.clusterCfg, this.managementApi, this.partitionListener, this.deploymentsStateController, this.actor);
        this.actor.submit(this::reprocessPendingDeployments);
    }

    private void reprocessPendingDeployments() {
        this.deploymentsStateController.foreachPending((l, pendingDeploymentDistribution) -> {
            ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer();
            DirectBuffer deployment = pendingDeploymentDistribution.getDeployment();
            expandableArrayBuffer.putBytes(0, deployment, 0, deployment.capacity());
            distributeDeployment(l.longValue(), pendingDeploymentDistribution.getSourcePosition(), expandableArrayBuffer);
        });
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<DeploymentRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        DeploymentRecord value = typedRecord.getValue();
        long key = typedRecord.getKey();
        ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer();
        value.write(expandableArrayBuffer, 0);
        distributeDeployment(key, typedRecord.getPosition(), expandableArrayBuffer);
    }

    private void distributeDeployment(long j, long j2, DirectBuffer directBuffer) {
        this.actor.runOnCompletion(this.deploymentDistributor.pushDeployment(j, j2, directBuffer), (r7, th) -> {
            writeCreatingDeploymentCommand(j);
        });
    }

    private void writeCreatingDeploymentCommand(long j) {
        PendingDeploymentDistribution removePendingDeployment = this.deploymentDistributor.removePendingDeployment(j);
        DirectBuffer deployment = removePendingDeployment.getDeployment();
        long sourcePosition = removePendingDeployment.getSourcePosition();
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        deploymentRecord.wrap(deployment);
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.intent(DeploymentIntent.DISTRIBUTED).valueType(ValueType.DEPLOYMENT).recordType(RecordType.EVENT);
        this.actor.runUntilDone(() -> {
            if (this.logStreamWriter.key(j).producerId(this.streamProcessorId).sourceRecordPosition(sourcePosition).valueWriter(deploymentRecord).metadataWriter(recordMetadata).tryWrite() < 0) {
                this.actor.yield();
            } else {
                this.actor.done();
            }
        });
    }
}
