package io.camunda.zeebe.broker.bootstrap;

import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.backpressure.BackpressureCfg;
import io.camunda.zeebe.broker.transport.backpressure.PartitionAwareRequestLimiter;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiServiceImpl;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.transport.ServerTransport;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.ConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;

/* loaded from: input_file:io/camunda/zeebe/broker/bootstrap/CommandApiServiceStep.class */
final class CommandApiServiceStep extends AbstractBrokerStartupStep {
    public String getName() {
        return "Command API";
    }

    @Override // io.camunda.zeebe.broker.bootstrap.AbstractBrokerStartupStep
    void startupInternal(BrokerStartupContext brokerStartupContext, ConcurrencyControl concurrencyControl, ActorFuture<BrokerStartupContext> actorFuture) {
        concurrencyControl.run(() -> {
            startServerTransport(brokerStartupContext, actorFuture);
        });
    }

    @Override // io.camunda.zeebe.broker.bootstrap.AbstractBrokerStartupStep
    void shutdownInternal(BrokerStartupContext brokerStartupContext, ConcurrencyControl concurrencyControl, ActorFuture<BrokerStartupContext> actorFuture) {
        CommandApiServiceImpl commandApiService = brokerStartupContext.getCommandApiService();
        if (commandApiService == null) {
            closeServerTransport(brokerStartupContext, concurrencyControl, actorFuture);
            return;
        }
        brokerStartupContext.removePartitionListener(commandApiService);
        brokerStartupContext.removeDiskSpaceUsageListener(commandApiService);
        concurrencyControl.runOnCompletion(commandApiService.closeAsync(), proceed(() -> {
            brokerStartupContext.setCommandApiService(null);
            closeServerTransport(brokerStartupContext, concurrencyControl, actorFuture);
        }, actorFuture));
    }

    private void startServerTransport(BrokerStartupContext brokerStartupContext, ActorFuture<BrokerStartupContext> actorFuture) {
        ConcurrencyControl concurrencyControl = brokerStartupContext.getConcurrencyControl();
        BrokerInfo brokerInfo = brokerStartupContext.getBrokerInfo();
        ActorSchedulingService actorSchedulingService = brokerStartupContext.getActorSchedulingService();
        AtomixServerTransport atomixServerTransport = new AtomixServerTransport(brokerInfo.getNodeId(), brokerStartupContext.getApiMessagingService());
        concurrencyControl.runOnCompletion(actorSchedulingService.submitActor(atomixServerTransport), proceed(() -> {
            brokerStartupContext.setCommandApiServerTransport(atomixServerTransport);
            startCommandApiService(brokerStartupContext, atomixServerTransport, actorFuture);
        }, actorFuture));
    }

    private void startCommandApiService(BrokerStartupContext brokerStartupContext, ServerTransport serverTransport, ActorFuture<BrokerStartupContext> actorFuture) {
        ConcurrencyControl concurrencyControl = brokerStartupContext.getConcurrencyControl();
        BrokerInfo brokerInfo = brokerStartupContext.getBrokerInfo();
        BrokerCfg brokerConfiguration = brokerStartupContext.getBrokerConfiguration();
        ActorSchedulingService actorSchedulingService = brokerStartupContext.getActorSchedulingService();
        BackpressureCfg backpressure = brokerConfiguration.getBackpressure();
        PartitionAwareRequestLimiter newNoopLimiter = PartitionAwareRequestLimiter.newNoopLimiter();
        if (backpressure.isEnabled()) {
            newNoopLimiter = PartitionAwareRequestLimiter.newLimiter(backpressure);
        }
        CommandApiServiceImpl commandApiServiceImpl = new CommandApiServiceImpl(serverTransport, brokerInfo, newNoopLimiter, actorSchedulingService, brokerConfiguration.getExperimental().getQueryApi());
        concurrencyControl.runOnCompletion(actorSchedulingService.submitActor(commandApiServiceImpl), proceed(() -> {
            brokerStartupContext.setCommandApiService(commandApiServiceImpl);
            brokerStartupContext.addPartitionListener(commandApiServiceImpl);
            brokerStartupContext.addDiskSpaceUsageListener(commandApiServiceImpl);
            actorFuture.complete(brokerStartupContext);
        }, actorFuture));
    }

    private void closeServerTransport(BrokerStartupContext brokerStartupContext, ConcurrencyControl concurrencyControl, ActorFuture<BrokerStartupContext> actorFuture) {
        AtomixServerTransport commandApiServerTransport = brokerStartupContext.getCommandApiServerTransport();
        if (commandApiServerTransport == null) {
            return;
        }
        concurrencyControl.runOnCompletion(commandApiServerTransport.closeAsync(), proceed(() -> {
            brokerStartupContext.setCommandApiServerTransport(null);
            actorFuture.complete(brokerStartupContext);
        }, actorFuture));
    }
}
