/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.event.processor;

import io.zeebe.broker.event.TopicSubscriptionServiceNames;
import io.zeebe.broker.event.processor.SubscriptionCfg;
import io.zeebe.broker.event.processor.TopicSubscriptionManagementProcessor;
import io.zeebe.broker.logstreams.LogStreamServiceNames;
import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.logstreams.processor.StreamProcessorService;
import io.zeebe.broker.system.ConfigurationManager;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.broker.transport.clientapi.ErrorResponseWriter;
import io.zeebe.broker.transport.clientapi.SubscribedEventWriter;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.ServerOutput;
import io.zeebe.transport.ServerTransport;
import io.zeebe.transport.TransportListener;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.actor.ActorReference;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.buffer.BufferUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;

public class TopicSubscriptionService
implements Service<TopicSubscriptionService>,
Actor,
TransportListener {
    protected final Injector<ActorScheduler> actorSchedulerInjector = new Injector();
    protected final Injector<ServerTransport> clientApiTransportInjector = new Injector();
    protected final SubscriptionCfg config;
    protected ActorScheduler actorScheduler;
    protected ServiceStartContext serviceContext;
    protected Map<DirectBuffer, Int2ObjectHashMap<TopicSubscriptionManagementProcessor>> managersByLog = new HashMap<DirectBuffer, Int2ObjectHashMap<TopicSubscriptionManagementProcessor>>();
    protected ServerOutput serverOutput;
    protected ActorReference actorRef;
    protected DeferredCommandContext asyncContext;
    protected final ServiceGroupReference<LogStream> logStreamsGroupReference = ServiceGroupReference.create().onAdd(this::onStreamAdded).onRemove(this::onStreamRemoved).build();

    public TopicSubscriptionService(ConfigurationManager configurationManager) {
        this.config = configurationManager.readEntry("subscriptions", SubscriptionCfg.class);
        Objects.requireNonNull(this.config);
    }

    public TopicSubscriptionService get() {
        return this;
    }

    public Injector<ActorScheduler> getActorSchedulerInjector() {
        return this.actorSchedulerInjector;
    }

    public Injector<ServerTransport> getClientApiTransportInjector() {
        return this.clientApiTransportInjector;
    }

    public ServiceGroupReference<LogStream> getLogStreamsGroupReference() {
        return this.logStreamsGroupReference;
    }

    public void start(ServiceStartContext startContext) {
        ServerTransport transport = (ServerTransport)this.clientApiTransportInjector.getValue();
        this.serverOutput = transport.getOutput();
        this.actorScheduler = (ActorScheduler)this.actorSchedulerInjector.getValue();
        this.asyncContext = new DeferredCommandContext();
        this.serviceContext = startContext;
        CompletableFuture registration = transport.registerChannelListener((TransportListener)this);
        startContext.async(registration);
        this.actorRef = this.actorScheduler.schedule((Actor)this);
    }

    public void stop(ServiceStopContext stopContext) {
        this.actorRef.close();
    }

    public void onStreamAdded(ServiceName<LogStream> logStreamServiceName, LogStream logStream) {
        this.asyncContext.runAsync(() -> {
            TopicSubscriptionManagementProcessor ackProcessor = new TopicSubscriptionManagementProcessor(logStreamServiceName, new CommandResponseWriter(this.serverOutput), new ErrorResponseWriter(this.serverOutput), () -> new SubscribedEventWriter(this.serverOutput), this.serviceContext);
            this.createStreamProcessorService(logStreamServiceName, TopicSubscriptionServiceNames.subscriptionManagementServiceName(logStream.getLogName()), 50, ackProcessor, TopicSubscriptionManagementProcessor.filter()).thenAccept(v -> {
                TopicSubscriptionManagementProcessor cfr_ignored_0 = (TopicSubscriptionManagementProcessor)this.managersByLog.computeIfAbsent(logStream.getTopicName(), k -> new Int2ObjectHashMap()).put(logStream.getPartitionId(), (Object)ackProcessor);
            });
        });
    }

    protected CompletableFuture<Void> createStreamProcessorService(ServiceName<LogStream> logStreamName, ServiceName<StreamProcessorController> processorName, int processorId, StreamProcessor streamProcessor, MetadataFilter eventFilter) {
        StreamProcessorService streamProcessorService = new StreamProcessorService(processorName.getName(), processorId, streamProcessor).eventFilter(eventFilter);
        return this.serviceContext.createService(processorName, (Service)streamProcessorService).dependency(logStreamName, streamProcessorService.getSourceStreamInjector()).dependency(logStreamName, streamProcessorService.getTargetStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, streamProcessorService.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, streamProcessorService.getActorSchedulerInjector()).install();
    }

    public void onStreamRemoved(ServiceName<LogStream> logStreamServiceName, LogStream logStream) {
        this.asyncContext.runAsync(() -> {
            DirectBuffer topicName = logStream.getTopicName();
            int partitionId = logStream.getPartitionId();
            Int2ObjectHashMap<TopicSubscriptionManagementProcessor> managersByPartition = this.managersByLog.get(topicName);
            if (managersByPartition != null) {
                managersByPartition.remove(partitionId);
                if (managersByPartition.isEmpty()) {
                    this.managersByLog.remove(topicName);
                }
            }
        });
    }

    public void onClientChannelCloseAsync(int channelId) {
        this.asyncContext.runAsync(() -> this.managersByLog.forEach((topicName, partitions) -> partitions.forEach((partitionId, manager) -> manager.onClientChannelCloseAsync(channelId))));
    }

    public int doWork() throws Exception {
        return this.asyncContext.doWork();
    }

    public int getPriority(long now) {
        return 1;
    }

    public String name() {
        return "subscription-service";
    }

    public CompletableFuture<Void> closeSubscriptionAsync(DirectBuffer topicName, int partitionId, long subscriberKey) {
        TopicSubscriptionManagementProcessor managementProcessor = this.getManager(topicName, partitionId);
        if (managementProcessor != null) {
            return managementProcessor.closePushProcessorAsync(subscriberKey);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        future.completeExceptionally(new RuntimeException(String.format("No subscription management processor registered for topic '%s' and partition '%d'", BufferUtil.bufferAsString((DirectBuffer)topicName), partitionId)));
        return future;
    }

    private TopicSubscriptionManagementProcessor getManager(DirectBuffer topicName, int partitionId) {
        Int2ObjectHashMap<TopicSubscriptionManagementProcessor> managersByPartition = this.managersByLog.get(topicName);
        if (managersByPartition != null) {
            return (TopicSubscriptionManagementProcessor)managersByPartition.get(partitionId);
        }
        return null;
    }

    public void onConnectionEstablished(RemoteAddress remoteAddress) {
    }

    public void onConnectionClosed(RemoteAddress remoteAddress) {
        this.onClientChannelCloseAsync(remoteAddress.getStreamId());
    }
}

