package io.zeebe.broker.job;

import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.job.old.JobSubscriptionManager;
import io.zeebe.broker.logstreams.processor.StreamProcessorServiceFactory;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.logstreams.state.StateStorage;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.transport.ServerTransport;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorScheduler;

/* loaded from: input_file:io/zeebe/broker/job/JobQueueManagerService.class */
public class JobQueueManagerService implements Service<JobQueueManagerService> {
    private static final String JOB_STREAM_PROCESSOR_NAME = "job";
    private final Injector<ServerTransport> clientApiTransportInjector = new Injector<>();
    private final Injector<JobSubscriptionManager> jobSubscriptionManagerInjector = new Injector<>();
    private final Injector<StreamProcessorServiceFactory> streamProcessorServiceFactoryInjector = new Injector<>();
    private final ServiceGroupReference<Partition> partitionsReference = ServiceGroupReference.create().onAdd(this::addPartition).build();
    private ActorScheduler actorScheduler;
    private StreamProcessorServiceFactory streamProcessorServiceFactory;

    public void installJobStreamProcessor(ServiceName<Partition> serviceName, Partition partition) {
        ServerTransport serverTransport = (ServerTransport) this.clientApiTransportInjector.getValue();
        StateStorage create = partition.getStateStorageFactory().create(10, JOB_STREAM_PROCESSOR_NAME);
        JobStreamProcessor jobStreamProcessor = new JobStreamProcessor((JobSubscriptionManager) this.jobSubscriptionManagerInjector.getValue());
        TypedStreamEnvironment typedStreamEnvironment = new TypedStreamEnvironment(partition.getLogStream(), serverTransport.getOutput());
        this.streamProcessorServiceFactory.createService(partition, serviceName).processor(jobStreamProcessor.createStreamProcessor(typedStreamEnvironment)).processorId(10).processorName(JOB_STREAM_PROCESSOR_NAME).snapshotController(jobStreamProcessor.createSnapshotController(create)).build();
    }

    public void start(ServiceStartContext serviceStartContext) {
        this.actorScheduler = serviceStartContext.getScheduler();
        this.streamProcessorServiceFactory = (StreamProcessorServiceFactory) this.streamProcessorServiceFactoryInjector.getValue();
    }

    public void stop(ServiceStopContext serviceStopContext) {
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public JobQueueManagerService m46get() {
        return this;
    }

    public Injector<ServerTransport> getClientApiTransportInjector() {
        return this.clientApiTransportInjector;
    }

    public Injector<JobSubscriptionManager> getJobSubscriptionManagerInjector() {
        return this.jobSubscriptionManagerInjector;
    }

    public ServiceGroupReference<Partition> getPartitionsGroupReference() {
        return this.partitionsReference;
    }

    public void addPartition(final ServiceName<Partition> serviceName, final Partition partition) {
        this.actorScheduler.submitActor(new Actor() { // from class: io.zeebe.broker.job.JobQueueManagerService.1
            protected void onActorStarted() {
                JobQueueManagerService.this.installJobStreamProcessor(serviceName, partition);
            }
        });
    }

    public Injector<StreamProcessorServiceFactory> getStreamProcessorServiceFactoryInjector() {
        return this.streamProcessorServiceFactoryInjector;
    }
}
