package io.camunda.zeebe.broker.bootstrap;

import io.camunda.zeebe.broker.clustering.ClusterServicesImpl;
import io.camunda.zeebe.broker.jobstream.JobStreamMetrics;
import io.camunda.zeebe.broker.jobstream.JobStreamService;
import io.camunda.zeebe.broker.jobstream.RemoteJobStreamErrorHandlerService;
import io.camunda.zeebe.broker.jobstream.RemoteJobStreamer;
import io.camunda.zeebe.broker.jobstream.YieldingJobStreamErrorHandler;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationPropertiesImpl;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.TransportFactory;
import io.camunda.zeebe.transport.stream.api.RemoteStreamService;

/* loaded from: input_file:io/camunda/zeebe/broker/bootstrap/JobStreamServiceStep.class */
public final class JobStreamServiceStep extends AbstractBrokerStartupStep {
    @Override // io.camunda.zeebe.broker.bootstrap.AbstractBrokerStartupStep
    void startupInternal(BrokerStartupContext brokerStartupContext, ConcurrencyControl concurrencyControl, ActorFuture<BrokerStartupContext> actorFuture) {
        ClusterServicesImpl clusterServices = brokerStartupContext.getClusterServices();
        RemoteJobStreamErrorHandlerService remoteJobStreamErrorHandlerService = new RemoteJobStreamErrorHandlerService(new YieldingJobStreamErrorHandler());
        ActorSchedulingService actorSchedulingService = brokerStartupContext.getActorSchedulingService();
        RemoteStreamService createRemoteStreamServer = new TransportFactory(actorSchedulingService).createRemoteStreamServer(clusterServices.getCommunicationService(), JobActivationPropertiesImpl::new, remoteJobStreamErrorHandlerService, new JobStreamMetrics());
        actorSchedulingService.submitActor(remoteJobStreamErrorHandlerService).onComplete((r14, th) -> {
            if (th != null) {
                actorFuture.completeExceptionally(th);
            } else {
                createRemoteStreamServer.start(actorSchedulingService, concurrencyControl).onComplete((remoteStreamer, th) -> {
                    if (th != null) {
                        actorFuture.completeExceptionally(th);
                        return;
                    }
                    JobStreamService jobStreamService = new JobStreamService(createRemoteStreamServer, new RemoteJobStreamer(remoteStreamer, clusterServices.getEventService()), remoteJobStreamErrorHandlerService);
                    clusterServices.getMembershipService().addListener(createRemoteStreamServer);
                    brokerStartupContext.addPartitionListener(remoteJobStreamErrorHandlerService);
                    brokerStartupContext.setJobStreamService(jobStreamService);
                    brokerStartupContext.getSpringBrokerBridge().registerJobStreamServiceSupplier(() -> {
                        return jobStreamService;
                    });
                    actorFuture.complete(brokerStartupContext);
                });
            }
        });
    }

    @Override // io.camunda.zeebe.broker.bootstrap.AbstractBrokerStartupStep
    void shutdownInternal(BrokerStartupContext brokerStartupContext, ConcurrencyControl concurrencyControl, ActorFuture<BrokerStartupContext> actorFuture) {
        JobStreamService jobStreamService = brokerStartupContext.getJobStreamService();
        if (jobStreamService != null) {
            brokerStartupContext.getClusterServices().getMembershipService().removeListener(jobStreamService.remoteStreamService());
            brokerStartupContext.removePartitionListener(jobStreamService.errorHandlerService());
            jobStreamService.closeAsync(concurrencyControl).onComplete((obj, th) -> {
                if (th != null) {
                    actorFuture.completeExceptionally(th);
                    return;
                }
                brokerStartupContext.getClusterServices().getMembershipService().removeListener(jobStreamService.remoteStreamService());
                brokerStartupContext.setJobStreamService(null);
                brokerStartupContext.getSpringBrokerBridge().registerJobStreamServiceSupplier(null);
                actorFuture.complete(brokerStartupContext);
            });
        }
    }

    public String getName() {
        return "JobStreamService";
    }
}
