package io.zeebe.broker.subscription.message;

import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.logstreams.processor.StreamProcessorServiceFactory;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.processor.MessageStreamProcessor;
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.transport.ClientTransport;
import io.zeebe.transport.ServerTransport;

/* loaded from: input_file:io/zeebe/broker/subscription/message/MessageService.class */
public class MessageService implements Service<MessageService> {
    private final Injector<ServerTransport> clientApiTransportInjector = new Injector<>();
    private final Injector<StreamProcessorServiceFactory> streamProcessorServiceFactoryInjector = new Injector<>();
    private final Injector<ClientTransport> managementApiClientInjector = new Injector<>();
    private final Injector<ClientTransport> subscriptionApiClientInjector = new Injector<>();
    private final Injector<TopologyManager> topologyManagerInjector = new Injector<>();
    private final ServiceGroupReference<Partition> partitionsGroupReference = ServiceGroupReference.create().onAdd((serviceName, partition) -> {
        startStreamProcessors(serviceName, partition);
    }).build();
    private final ClusterCfg clusterCfg;

    public MessageService(ClusterCfg clusterCfg) {
        this.clusterCfg = clusterCfg;
    }

    private void startStreamProcessors(ServiceName<Partition> serviceName, Partition partition) {
        ServerTransport serverTransport = (ServerTransport) this.clientApiTransportInjector.getValue();
        StreamProcessorServiceFactory streamProcessorServiceFactory = (StreamProcessorServiceFactory) this.streamProcessorServiceFactoryInjector.getValue();
        TopologyManager topologyManager = (TopologyManager) this.topologyManagerInjector.getValue();
        SubscriptionCommandSender subscriptionCommandSender = new SubscriptionCommandSender(this.clusterCfg, (ClientTransport) getSubscriptionApiClientInjector().getValue());
        TypedStreamEnvironment typedStreamEnvironment = new TypedStreamEnvironment(partition.getLogStream(), serverTransport.getOutput());
        MessageStreamProcessor messageStreamProcessor = new MessageStreamProcessor(subscriptionCommandSender, topologyManager);
        streamProcessorServiceFactory.createService(partition, serviceName).processor(messageStreamProcessor.createStreamProcessors(typedStreamEnvironment)).snapshotController(messageStreamProcessor.createStateSnapshotController(partition.getStateStorageFactory().create(90, "message"))).processorId(90).processorName("message").build();
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public MessageService m96get() {
        return this;
    }

    public Injector<ServerTransport> getClientApiTransportInjector() {
        return this.clientApiTransportInjector;
    }

    public ServiceGroupReference<Partition> getPartitionsGroupReference() {
        return this.partitionsGroupReference;
    }

    public Injector<StreamProcessorServiceFactory> getStreamProcessorServiceFactoryInjector() {
        return this.streamProcessorServiceFactoryInjector;
    }

    public Injector<ClientTransport> getManagementApiClientInjector() {
        return this.managementApiClientInjector;
    }

    public Injector<ClientTransport> getSubscriptionApiClientInjector() {
        return this.subscriptionApiClientInjector;
    }

    public Injector<TopologyManager> getTopologyManagerInjector() {
        return this.topologyManagerInjector;
    }
}
