package io.zeebe.broker.event.processor;

import io.zeebe.broker.event.TopicSubscriptionServiceNames;
import io.zeebe.broker.logstreams.LogStreamServiceNames;
import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.logstreams.processor.StreamProcessorService;
import io.zeebe.broker.system.ConfigurationManager;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.broker.transport.clientapi.ErrorResponseWriter;
import io.zeebe.broker.transport.clientapi.SubscribedEventWriter;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.ServerOutput;
import io.zeebe.transport.ServerTransport;
import io.zeebe.transport.TransportListener;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.actor.ActorReference;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.buffer.BufferUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;

/* loaded from: input_file:io/zeebe/broker/event/processor/TopicSubscriptionService.class */
public class TopicSubscriptionService implements Service<TopicSubscriptionService>, Actor, TransportListener {
    protected final SubscriptionCfg config;
    protected ActorScheduler actorScheduler;
    protected ServiceStartContext serviceContext;
    protected ServerOutput serverOutput;
    protected ActorReference actorRef;
    protected DeferredCommandContext asyncContext;
    protected final Injector<ActorScheduler> actorSchedulerInjector = new Injector<>();
    protected final Injector<ServerTransport> clientApiTransportInjector = new Injector<>();
    protected Map<DirectBuffer, Int2ObjectHashMap<TopicSubscriptionManagementProcessor>> managersByLog = new HashMap();
    protected final ServiceGroupReference<LogStream> logStreamsGroupReference = ServiceGroupReference.create().onAdd(this::onStreamAdded).onRemove(this::onStreamRemoved).build();

    public TopicSubscriptionService(ConfigurationManager configurationManager) {
        this.config = (SubscriptionCfg) configurationManager.readEntry("subscriptions", SubscriptionCfg.class);
        Objects.requireNonNull(this.config);
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public TopicSubscriptionService m33get() {
        return this;
    }

    public Injector<ActorScheduler> getActorSchedulerInjector() {
        return this.actorSchedulerInjector;
    }

    public Injector<ServerTransport> getClientApiTransportInjector() {
        return this.clientApiTransportInjector;
    }

    public ServiceGroupReference<LogStream> getLogStreamsGroupReference() {
        return this.logStreamsGroupReference;
    }

    public void start(ServiceStartContext serviceStartContext) {
        ServerTransport serverTransport = (ServerTransport) this.clientApiTransportInjector.getValue();
        this.serverOutput = serverTransport.getOutput();
        this.actorScheduler = (ActorScheduler) this.actorSchedulerInjector.getValue();
        this.asyncContext = new DeferredCommandContext();
        this.serviceContext = serviceStartContext;
        serviceStartContext.async(serverTransport.registerChannelListener(this));
        this.actorRef = this.actorScheduler.schedule(this);
    }

    public void stop(ServiceStopContext serviceStopContext) {
        this.actorRef.close();
    }

    public void onStreamAdded(ServiceName<LogStream> serviceName, LogStream logStream) {
        this.asyncContext.runAsync(() -> {
            TopicSubscriptionManagementProcessor topicSubscriptionManagementProcessor = new TopicSubscriptionManagementProcessor(serviceName, new CommandResponseWriter(this.serverOutput), new ErrorResponseWriter(this.serverOutput), () -> {
                return new SubscribedEventWriter(this.serverOutput);
            }, this.serviceContext);
            createStreamProcessorService(serviceName, TopicSubscriptionServiceNames.subscriptionManagementServiceName(logStream.getLogName()), 50, topicSubscriptionManagementProcessor, TopicSubscriptionManagementProcessor.filter()).thenAccept(r7 -> {
            });
        });
    }

    protected CompletableFuture<Void> createStreamProcessorService(ServiceName<LogStream> serviceName, ServiceName<StreamProcessorController> serviceName2, int i, StreamProcessor streamProcessor, MetadataFilter metadataFilter) {
        StreamProcessorService eventFilter = new StreamProcessorService(serviceName2.getName(), i, streamProcessor).eventFilter(metadataFilter);
        return this.serviceContext.createService(serviceName2, eventFilter).dependency(serviceName, eventFilter.getSourceStreamInjector()).dependency(serviceName, eventFilter.getTargetStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, eventFilter.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, eventFilter.getActorSchedulerInjector()).install();
    }

    public void onStreamRemoved(ServiceName<LogStream> serviceName, LogStream logStream) {
        this.asyncContext.runAsync(() -> {
            DirectBuffer topicName = logStream.getTopicName();
            int partitionId = logStream.getPartitionId();
            Int2ObjectHashMap<TopicSubscriptionManagementProcessor> int2ObjectHashMap = this.managersByLog.get(topicName);
            if (int2ObjectHashMap != null) {
                int2ObjectHashMap.remove(partitionId);
                if (int2ObjectHashMap.isEmpty()) {
                    this.managersByLog.remove(topicName);
                }
            }
        });
    }

    public void onClientChannelCloseAsync(int i) {
        this.asyncContext.runAsync(() -> {
            this.managersByLog.forEach((directBuffer, int2ObjectHashMap) -> {
                int2ObjectHashMap.forEach((num, topicSubscriptionManagementProcessor) -> {
                    topicSubscriptionManagementProcessor.onClientChannelCloseAsync(i);
                });
            });
        });
    }

    public int doWork() throws Exception {
        return this.asyncContext.doWork();
    }

    public int getPriority(long j) {
        return 1;
    }

    public String name() {
        return "subscription-service";
    }

    public CompletableFuture<Void> closeSubscriptionAsync(DirectBuffer directBuffer, int i, long j) {
        TopicSubscriptionManagementProcessor manager = getManager(directBuffer, i);
        if (manager != null) {
            return manager.closePushProcessorAsync(j);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new RuntimeException(String.format("No subscription management processor registered for topic '%s' and partition '%d'", BufferUtil.bufferAsString(directBuffer), Integer.valueOf(i))));
        return completableFuture;
    }

    private TopicSubscriptionManagementProcessor getManager(DirectBuffer directBuffer, int i) {
        Int2ObjectHashMap<TopicSubscriptionManagementProcessor> int2ObjectHashMap = this.managersByLog.get(directBuffer);
        if (int2ObjectHashMap != null) {
            return (TopicSubscriptionManagementProcessor) int2ObjectHashMap.get(i);
        }
        return null;
    }

    public void onConnectionEstablished(RemoteAddress remoteAddress) {
    }

    public void onConnectionClosed(RemoteAddress remoteAddress) {
        onClientChannelCloseAsync(remoteAddress.getStreamId());
    }
}
