package io.zeebe.broker.event.processor;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.logstreams.processor.StreamProcessorServiceFactory;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.broker.transport.clientapi.ErrorResponseWriter;
import io.zeebe.broker.transport.clientapi.SubscribedRecordWriter;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.ServerOutput;
import io.zeebe.transport.ServerTransport;
import io.zeebe.transport.TransportListener;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/event/processor/TopicSubscriptionService.class */
public class TopicSubscriptionService extends Actor implements Service<TopicSubscriptionService>, TransportListener {
    private static final Logger LOG = Loggers.SERVICES_LOGGER;
    protected static final MetadataFilter TOPIC_SUBSCRIPTION_EVENT_FILTER = recordMetadata -> {
        ValueType valueType = recordMetadata.getValueType();
        return (valueType == ValueType.SUBSCRIPTION || valueType == ValueType.SUBSCRIBER || valueType == ValueType.ID || valueType == ValueType.NOOP) ? false : true;
    };
    protected final ServiceContainer serviceContainer;
    protected ServerOutput serverOutput;
    protected StreamProcessorServiceFactory streamProcessorServiceFactory;
    protected final Injector<ServerTransport> clientApiTransportInjector = new Injector<>();
    protected final Injector<StreamProcessorServiceFactory> streamProcessorServiceFactoryInjector = new Injector<>();
    protected final Int2ObjectHashMap<TopicSubscriptionManagementProcessor> managersByPartition = new Int2ObjectHashMap<>();
    protected final ServiceGroupReference<Partition> partitionsGroupReference = ServiceGroupReference.create().onAdd(this::onPartitionAdded).onRemove(this::onPartitionRemoved).build();
    protected final ServiceGroupReference<Partition> systemPartitionGroupReference = ServiceGroupReference.create().onAdd(this::onPartitionAdded).onRemove(this::onPartitionRemoved).build();

    public TopicSubscriptionService(ServiceContainer serviceContainer) {
        this.serviceContainer = serviceContainer;
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public TopicSubscriptionService m54get() {
        return this;
    }

    public Injector<ServerTransport> getClientApiTransportInjector() {
        return this.clientApiTransportInjector;
    }

    public Injector<StreamProcessorServiceFactory> getStreamProcessorServiceFactoryInjector() {
        return this.streamProcessorServiceFactoryInjector;
    }

    public ServiceGroupReference<Partition> getPartitionsGroupReference() {
        return this.partitionsGroupReference;
    }

    public ServiceGroupReference<Partition> getSystemPartitionGroupReference() {
        return this.systemPartitionGroupReference;
    }

    public void start(ServiceStartContext serviceStartContext) {
        this.streamProcessorServiceFactory = (StreamProcessorServiceFactory) this.streamProcessorServiceFactoryInjector.getValue();
        ServerTransport serverTransport = (ServerTransport) this.clientApiTransportInjector.getValue();
        this.serverOutput = serverTransport.getOutput();
        serviceStartContext.async(serverTransport.registerChannelListener(this));
        serviceStartContext.getScheduler().submitActor(this);
    }

    public void stop(ServiceStopContext serviceStopContext) {
        this.actor.close();
    }

    public void onPartitionAdded(ServiceName<Partition> serviceName, Partition partition) {
        this.actor.call(() -> {
            TopicSubscriptionManagementProcessor topicSubscriptionManagementProcessor = new TopicSubscriptionManagementProcessor(partition, serviceName, TOPIC_SUBSCRIPTION_EVENT_FILTER, new CommandResponseWriter(this.serverOutput), new ErrorResponseWriter(this.serverOutput), () -> {
                return new SubscribedRecordWriter(this.serverOutput);
            }, this.streamProcessorServiceFactory, this.serviceContainer);
            this.actor.runOnCompletion(this.streamProcessorServiceFactory.createService(partition, serviceName).processor(topicSubscriptionManagementProcessor).processorId(50).processorName("topic-management").eventFilter(TopicSubscriptionManagementProcessor.filter()).build(), (streamProcessorService, th) -> {
                if (th == null) {
                    this.managersByPartition.put(partition.getInfo().getPartitionId(), topicSubscriptionManagementProcessor);
                } else {
                    LOG.error("Failed to create topic subscription stream processor service for log stream service '{}'", serviceName);
                }
            });
        });
    }

    public void onPartitionRemoved(ServiceName<Partition> serviceName, Partition partition) {
        this.actor.call(() -> {
            return (TopicSubscriptionManagementProcessor) this.managersByPartition.remove(partition.getInfo().getPartitionId());
        });
    }

    public void onClientChannelCloseAsync(int i) {
        this.actor.call(() -> {
            this.managersByPartition.forEach((num, topicSubscriptionManagementProcessor) -> {
                topicSubscriptionManagementProcessor.onClientChannelCloseAsync(i);
            });
        });
    }

    public String getName() {
        return "subscription-service";
    }

    public ActorFuture<Void> closeSubscriptionAsync(int i, long j) {
        TopicSubscriptionManagementProcessor manager = getManager(i);
        return manager != null ? manager.closePushProcessorAsync(j) : CompletableActorFuture.completedExceptionally(new RuntimeException(String.format("No subscription management processor registered for partition '%d'", Integer.valueOf(i))));
    }

    private TopicSubscriptionManagementProcessor getManager(int i) {
        return (TopicSubscriptionManagementProcessor) this.managersByPartition.get(i);
    }

    public void onConnectionEstablished(RemoteAddress remoteAddress) {
    }

    public void onConnectionClosed(RemoteAddress remoteAddress) {
        onClientChannelCloseAsync(remoteAddress.getStreamId());
    }
}
