/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.task;

import io.zeebe.broker.logstreams.LogStreamServiceNames;
import io.zeebe.broker.logstreams.processor.StreamProcessorService;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.task.CreditsRequest;
import io.zeebe.broker.task.CreditsRequestBuffer;
import io.zeebe.broker.task.TaskQueueServiceNames;
import io.zeebe.broker.task.processor.LockTaskStreamProcessor;
import io.zeebe.broker.task.processor.TaskSubscription;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.TransportListener;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.EnsureUtil;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.allocation.BufferAllocator;
import io.zeebe.util.allocation.HeapBufferAllocator;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.collection.CompactList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;

public class TaskSubscriptionManager
implements Actor,
TransportListener {
    protected static final String NAME = "taskqueue.subscription.manager";
    public static final int NUM_CONCURRENT_REQUESTS = 1024;
    protected final ServiceStartContext serviceContext;
    protected final Function<DirectBuffer, LockTaskStreamProcessor> streamProcessorSupplier;
    protected final Map<DirectBuffer, Int2ObjectHashMap<LogStreamBucket>> logStreamBuckets = new HashMap<DirectBuffer, Int2ObjectHashMap<LogStreamBucket>>();
    protected final Long2ObjectHashMap<LockTaskStreamProcessor> streamProcessorBySubscriptionId = new Long2ObjectHashMap();
    protected final DeferredCommandContext asyncContext = new DeferredCommandContext(1024);
    protected final CreditsRequestBuffer creditRequestBuffer;
    protected final CompactList backPressuredCreditsRequests;
    protected final CreditsRequest creditsRequest = new CreditsRequest();
    protected long nextSubscriptionId = 0L;

    public TaskSubscriptionManager(ServiceStartContext serviceContext) {
        this(serviceContext, taskType -> new LockTaskStreamProcessor((DirectBuffer)taskType));
    }

    public TaskSubscriptionManager(ServiceStartContext serviceContext, Function<DirectBuffer, LockTaskStreamProcessor> streamProcessorBuilder) {
        this.serviceContext = serviceContext;
        this.streamProcessorSupplier = streamProcessorBuilder;
        this.creditRequestBuffer = new CreditsRequestBuffer(1024, r -> {
            boolean dispatched = this.dispatchSubscriptionCredits((CreditsRequest)r);
            if (!dispatched) {
                this.backpressureRequest((CreditsRequest)r);
            }
        });
        this.backPressuredCreditsRequests = new CompactList(12, this.creditRequestBuffer.getCapacityUpperBound(), (BufferAllocator)new HeapBufferAllocator());
    }

    public String name() {
        return NAME;
    }

    public int doWork() throws Exception {
        int asyncWork = this.asyncContext.doWork();
        int backpressuredWork = this.dispatchBackpressuredSubscriptionCredits();
        int creditsRequests = this.backPressuredCreditsRequests.size() == 0 ? this.creditRequestBuffer.handleRequests() : 0;
        return asyncWork + backpressuredWork + creditsRequests;
    }

    public CompletableFuture<Void> addSubscription(TaskSubscription subscription) {
        return this.asyncContext.runAsync(future -> {
            EnsureUtil.ensureNotNull((String)"subscription", (Object)subscription);
            DirectBuffer taskType = subscription.getLockTaskType();
            EnsureUtil.ensureNotNull((String)"lock task type", (Object)taskType);
            DirectBuffer topicName = subscription.getTopicName();
            int partitionId = subscription.getPartitionId();
            LogStreamBucket logStreamBucket = this.getLogStreamBucket(topicName, partitionId);
            if (logStreamBucket == null) {
                String errorMessage = String.format("Topic with name '%s' and partition id '%d' not found.", BufferUtil.bufferAsString((DirectBuffer)topicName), partitionId);
                throw new RuntimeException(errorMessage);
            }
            long subscriptionId = this.nextSubscriptionId++;
            subscription.setSubscriberKey(subscriptionId);
            LockTaskStreamProcessor streamProcessor = logStreamBucket.getStreamProcessorByTaskType(taskType);
            if (streamProcessor != null) {
                this.streamProcessorBySubscriptionId.put(subscriptionId, (Object)streamProcessor);
                streamProcessor.addSubscription(subscription).handle((r, t) -> t == null ? future.complete(null) : future.completeExceptionally((Throwable)t));
            } else {
                ((CompletableFuture)this.createStreamProcessorService(logStreamBucket, taskType).thenCompose(processor -> {
                    this.streamProcessorBySubscriptionId.put(subscriptionId, processor);
                    logStreamBucket.addStreamProcessor((LockTaskStreamProcessor)processor);
                    return processor.addSubscription(subscription);
                })).handle((r, t) -> t == null ? future.complete(null) : future.completeExceptionally((Throwable)t));
            }
        });
    }

    protected CompletableFuture<LockTaskStreamProcessor> createStreamProcessorService(LogStreamBucket logStreamBucket, DirectBuffer taskType) {
        CompletableFuture<LockTaskStreamProcessor> future = new CompletableFuture<LockTaskStreamProcessor>();
        ServiceName<LogStream> logStreamServiceName = logStreamBucket.getLogServiceName();
        String logName = logStreamBucket.getLogStream().getLogName();
        ServiceName<StreamProcessorController> streamProcessorServiceName = TaskQueueServiceNames.taskQueueLockStreamProcessorServiceName(logName, BufferUtil.bufferAsString((DirectBuffer)taskType));
        String streamProcessorName = streamProcessorServiceName.getName();
        DirectBuffer newTaskTypeBuffer = BufferUtil.cloneBuffer((DirectBuffer)taskType);
        LockTaskStreamProcessor streamProcessor = this.streamProcessorSupplier.apply(newTaskTypeBuffer);
        StreamProcessorService streamProcessorService = new StreamProcessorService(streamProcessorName, 20, streamProcessor).eventFilter(LockTaskStreamProcessor.eventFilter()).reprocessingEventFilter(LockTaskStreamProcessor.reprocessingEventFilter(newTaskTypeBuffer));
        this.serviceContext.createService(streamProcessorServiceName, (Service)streamProcessorService).dependency(logStreamServiceName, streamProcessorService.getSourceStreamInjector()).dependency(logStreamServiceName, streamProcessorService.getTargetStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, streamProcessorService.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, streamProcessorService.getActorSchedulerInjector()).install().handle((r, t) -> t == null ? future.complete(streamProcessor) : future.completeExceptionally((Throwable)t));
        return future;
    }

    public CompletableFuture<Void> removeSubscription(long subscriptionId) {
        return this.asyncContext.runAsync(future -> {
            LockTaskStreamProcessor streamProcessor = (LockTaskStreamProcessor)this.streamProcessorBySubscriptionId.remove(subscriptionId);
            if (streamProcessor != null) {
                ((CompletableFuture)streamProcessor.removeSubscription(subscriptionId).thenCompose(hasSubscriptions -> hasSubscriptions == false ? this.removeStreamProcessorService(streamProcessor) : CompletableFuture.completedFuture(null))).handle((r, t) -> t == null ? future.complete(null) : future.completeExceptionally((Throwable)t));
            } else {
                future.complete(null);
            }
        });
    }

    protected CompletionStage<Void> removeStreamProcessorService(LockTaskStreamProcessor streamProcessor) {
        LogStreamBucket logStreamBucket = this.getLogStreamBucket(streamProcessor.getLogStreamTopicName(), streamProcessor.getLogStreamPartitionId());
        logStreamBucket.removeStreamProcessor(streamProcessor);
        String logName = logStreamBucket.getLogStream().getLogName();
        String taskType = BufferUtil.bufferAsString((DirectBuffer)streamProcessor.getSubscriptedTaskType());
        ServiceName<StreamProcessorController> streamProcessorServiceName = TaskQueueServiceNames.taskQueueLockStreamProcessorServiceName(logName, taskType);
        return this.serviceContext.removeService(streamProcessorServiceName);
    }

    public boolean increaseSubscriptionCreditsAsync(CreditsRequest request) {
        return this.creditRequestBuffer.offerRequest(request);
    }

    protected boolean dispatchSubscriptionCredits(CreditsRequest request) {
        LockTaskStreamProcessor streamProcessor = (LockTaskStreamProcessor)this.streamProcessorBySubscriptionId.get(request.getSubscriberKey());
        if (streamProcessor != null) {
            return streamProcessor.increaseSubscriptionCreditsAsync(request);
        }
        return true;
    }

    protected void backpressureRequest(CreditsRequest request) {
        request.appendTo(this.backPressuredCreditsRequests);
    }

    protected int dispatchBackpressuredSubscriptionCredits() {
        int numSuccessfulRequests = 0;
        for (int nextRequestToConsume = this.backPressuredCreditsRequests.size() - 1; nextRequestToConsume >= 0; --nextRequestToConsume) {
            this.creditsRequest.wrapListElement(this.backPressuredCreditsRequests, nextRequestToConsume);
            boolean success = this.dispatchSubscriptionCredits(this.creditsRequest);
            if (!success) break;
            this.backPressuredCreditsRequests.remove(nextRequestToConsume);
            ++numSuccessfulRequests;
        }
        return numSuccessfulRequests;
    }

    public void addStream(LogStream logStream, ServiceName<LogStream> logStreamServiceName) {
        this.asyncContext.runAsync(future -> this.logStreamBuckets.computeIfAbsent(logStream.getTopicName(), k -> new Int2ObjectHashMap()).put(logStream.getPartitionId(), (Object)new LogStreamBucket(logStream, logStreamServiceName)));
    }

    public void removeStream(LogStream logStream) {
        this.asyncContext.runAsync(future -> {
            DirectBuffer topicName = logStream.getTopicName();
            int partitionId = logStream.getPartitionId();
            Int2ObjectHashMap<LogStreamBucket> partitions = this.logStreamBuckets.get(topicName);
            if (partitions != null) {
                partitions.remove(partitionId);
                if (partitions.isEmpty()) {
                    this.logStreamBuckets.remove(topicName);
                }
            }
            this.removeSubscriptionsForLogStream(topicName, partitionId);
        });
    }

    protected void removeSubscriptionsForLogStream(DirectBuffer topicName, int partitionId) {
        Set entrySet = this.streamProcessorBySubscriptionId.entrySet();
        for (Map.Entry entry : entrySet) {
            LockTaskStreamProcessor streamProcessor = (LockTaskStreamProcessor)entry.getValue();
            if (!topicName.equals(streamProcessor.getLogStreamTopicName()) || partitionId != streamProcessor.getLogStreamPartitionId()) continue;
            entrySet.remove(entry);
        }
    }

    public void onClientChannelCloseAsync(int channelId) {
        this.asyncContext.runAsync(() -> {
            for (LockTaskStreamProcessor processor : this.streamProcessorBySubscriptionId.values()) {
                processor.onClientChannelCloseAsync(channelId).thenCompose(hasSubscriptions -> hasSubscriptions == false ? this.removeStreamProcessorService(processor) : CompletableFuture.completedFuture(null));
            }
        });
    }

    public int getCreditRequestCapacityUpperBound() {
        return this.creditRequestBuffer.getCapacityUpperBound();
    }

    protected LogStreamBucket getLogStreamBucket(DirectBuffer topicName, int partitionId) {
        Int2ObjectHashMap<LogStreamBucket> partitions = this.logStreamBuckets.get(topicName);
        if (partitions != null) {
            return (LogStreamBucket)partitions.get(partitionId);
        }
        return null;
    }

    public void onConnectionEstablished(RemoteAddress remoteAddress) {
    }

    public void onConnectionClosed(RemoteAddress remoteAddress) {
        this.onClientChannelCloseAsync(remoteAddress.getStreamId());
    }

    static class LogStreamBucket {
        protected final LogStream logStream;
        protected final ServiceName<LogStream> logStreamServiceName;
        protected List<LockTaskStreamProcessor> streamProcessors = new ArrayList<LockTaskStreamProcessor>();

        LogStreamBucket(LogStream logStream, ServiceName<LogStream> logStreamServiceName) {
            this.logStream = logStream;
            this.logStreamServiceName = logStreamServiceName;
        }

        public LogStream getLogStream() {
            return this.logStream;
        }

        public ServiceName<LogStream> getLogServiceName() {
            return this.logStreamServiceName;
        }

        public LockTaskStreamProcessor getStreamProcessorByTaskType(DirectBuffer taskType) {
            LockTaskStreamProcessor streamProcessorForType = null;
            int size = this.streamProcessors.size();
            for (int current = 0; current < size && streamProcessorForType == null; ++current) {
                LockTaskStreamProcessor streamProcessor = this.streamProcessors.get(current);
                if (!BufferUtil.equals((DirectBuffer)taskType, (DirectBuffer)streamProcessor.getSubscriptedTaskType())) continue;
                streamProcessorForType = streamProcessor;
            }
            return streamProcessorForType;
        }

        public void addStreamProcessor(LockTaskStreamProcessor streamProcessor) {
            this.streamProcessors.add(streamProcessor);
        }

        public void removeStreamProcessor(LockTaskStreamProcessor streamProcessor) {
            this.streamProcessors.remove(streamProcessor);
        }
    }
}

