package io.zeebe.broker.job;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.job.processor.ActivateJobStreamProcessor;
import io.zeebe.broker.job.processor.JobSubscription;
import io.zeebe.broker.logstreams.processor.StreamProcessorServiceFactory;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.logstreams.impl.service.LogStreamServiceNames;
import io.zeebe.logstreams.impl.service.StreamProcessorService;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.ServerTransport;
import io.zeebe.transport.TransportListener;
import io.zeebe.util.allocation.HeapBufferAllocator;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.collection.CompactList;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.channel.ChannelSubscription;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.HashMap;
import java.util.Map;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;

/* loaded from: input_file:io/zeebe/broker/job/JobSubscriptionManager.class */
public class JobSubscriptionManager extends Actor implements TransportListener {
    protected static final String NAME = "jobqueue.subscription.manager";
    public static final int NUM_CONCURRENT_REQUESTS = 1024;
    protected final StreamProcessorServiceFactory factory;
    protected final ServiceContainer serviceContext;
    private final ServerTransport transport;
    private ChannelSubscription creditsSubscription;
    protected final Int2ObjectHashMap<PartitionBucket> logStreamBuckets = new Int2ObjectHashMap<>();
    private final Subscriptions subscriptions = new Subscriptions();
    private final Long2ObjectHashMap<ActivateJobStreamProcessor> streamProcessorBySubscriptionId = new Long2ObjectHashMap<>();
    protected final CreditsRequest creditsRequest = new CreditsRequest();
    protected long nextSubscriptionId = 0;
    protected final CreditsRequestBuffer creditRequestBuffer = new CreditsRequestBuffer(NUM_CONCURRENT_REQUESTS);
    protected final CompactList backPressuredCreditsRequests = new CompactList(12, this.creditRequestBuffer.getCapacityUpperBound(), new HeapBufferAllocator());

    /* loaded from: input_file:io/zeebe/broker/job/JobSubscriptionManager$PartitionBucket.class */
    class PartitionBucket {
        private final Partition partition;
        private final ServiceName<Partition> partitionServiceName;
        private final TypedStreamEnvironment env;
        private final Map<DirectBuffer, ActivateJobStreamProcessor> streamProcessors = new HashMap();
        private final Map<DirectBuffer, ActorFuture<StreamProcessorService>> openFutures = new HashMap();
        private final Map<DirectBuffer, ActorFuture<Void>> closeFutures = new HashMap();

        PartitionBucket(Partition partition, ServiceName<Partition> serviceName) {
            this.partition = partition;
            this.partitionServiceName = serviceName;
            this.env = new TypedStreamEnvironment(partition.getLogStream(), JobSubscriptionManager.this.transport.getOutput());
        }

        public ActivateJobStreamProcessor getActiveStreamProcessor(DirectBuffer directBuffer) {
            if (this.closeFutures.containsKey(directBuffer)) {
                return null;
            }
            return this.streamProcessors.get(directBuffer);
        }

        public ActorFuture<StreamProcessorService> startStreamProcessor(DirectBuffer directBuffer) {
            if (this.openFutures.containsKey(directBuffer)) {
                return this.openFutures.get(directBuffer);
            }
            if (!this.closeFutures.containsKey(directBuffer)) {
                return createStreamProcessor(directBuffer);
            }
            ActorFuture<Void> actorFuture = this.closeFutures.get(directBuffer);
            CompletableActorFuture completableActorFuture = new CompletableActorFuture();
            JobSubscriptionManager.this.actor.runOnCompletion(actorFuture, (r7, th) -> {
                if (th != null) {
                    completableActorFuture.completeExceptionally(th);
                } else {
                    JobSubscriptionManager.this.actor.runOnCompletion(createStreamProcessor(directBuffer), (streamProcessorService, th) -> {
                        if (th == null) {
                            completableActorFuture.complete(streamProcessorService);
                        } else {
                            completableActorFuture.completeExceptionally(th);
                        }
                    });
                }
            });
            return completableActorFuture;
        }

        private ActorFuture<StreamProcessorService> createStreamProcessor(DirectBuffer directBuffer) {
            ActivateJobStreamProcessor activateJobStreamProcessor = new ActivateJobStreamProcessor(directBuffer);
            ActorFuture<StreamProcessorService> build = JobSubscriptionManager.this.factory.createService(this.partition, this.partitionServiceName).processor(activateJobStreamProcessor.createStreamProcessor(this.env)).processorId(20).processorName(JobSubscriptionManager.streamProcessorName(directBuffer)).build();
            this.openFutures.put(directBuffer, build);
            JobSubscriptionManager.this.actor.runOnCompletion(build, (streamProcessorService, th) -> {
                this.openFutures.remove(directBuffer);
                if (th == null) {
                    this.streamProcessors.put(directBuffer, activateJobStreamProcessor);
                } else {
                    Loggers.SYSTEM_LOGGER.debug("Problem on starting job activating stream processor.", th);
                }
            });
            return build;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stopStreamProcessor(DirectBuffer directBuffer) {
            if (this.closeFutures.containsKey(directBuffer)) {
                return;
            }
            if (this.openFutures.containsKey(directBuffer)) {
                JobSubscriptionManager.this.actor.runOnCompletion(this.openFutures.get(directBuffer), (streamProcessorService, th) -> {
                    if (th == null) {
                        destroyStreamProcessor(directBuffer);
                    }
                });
            } else if (this.streamProcessors.containsKey(directBuffer)) {
                destroyStreamProcessor(directBuffer);
            }
        }

        private ActorFuture<Void> destroyStreamProcessor(DirectBuffer directBuffer) {
            ActorFuture<Void> removeService = JobSubscriptionManager.this.serviceContext.removeService(LogStreamServiceNames.streamProcessorService(this.partition.getLogStream().getLogName(), JobSubscriptionManager.streamProcessorName(directBuffer)));
            this.closeFutures.put(directBuffer, removeService);
            JobSubscriptionManager.this.actor.runOnCompletion(removeService, (r6, th) -> {
                this.closeFutures.remove(directBuffer);
                if (th == null) {
                    this.streamProcessors.remove(directBuffer);
                } else {
                    Loggers.SYSTEM_LOGGER.debug("Problem on closing job activating stream processor.", th);
                }
            });
            return removeService;
        }

        public LogStream getLogStream() {
            return this.partition.getLogStream();
        }

        public Partition getPartition() {
            return this.partition;
        }

        public String getLogStreamName() {
            return this.partition.getLogStream().getLogName();
        }

        public ServiceName<Partition> getPartitionServiceName() {
            return this.partitionServiceName;
        }
    }

    public JobSubscriptionManager(ServiceContainer serviceContainer, StreamProcessorServiceFactory streamProcessorServiceFactory, ServerTransport serverTransport) {
        this.transport = serverTransport;
        this.serviceContext = serviceContainer;
        this.factory = streamProcessorServiceFactory;
    }

    public String getName() {
        return NAME;
    }

    protected void onActorStarted() {
        this.creditsSubscription = this.actor.consume(this.creditRequestBuffer, this::consumeCreditsRequest);
    }

    protected void onActorClosing() {
        if (this.creditsSubscription != null) {
            this.creditsSubscription.cancel();
            this.creditsSubscription = null;
        }
    }

    public ActorFuture<Void> addSubscription(JobSubscription jobSubscription) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.call(()
        /*  JADX ERROR: Method code generation error
            jadx.core.utils.exceptions.CodegenException: Error generate insn: 0x0014: INVOKE 
              (wrap:io.zeebe.util.sched.ActorControl:0x0009: IGET (r5v0 'this' io.zeebe.broker.job.JobSubscriptionManager A[IMMUTABLE_TYPE, THIS]) A[WRAPPED] io.zeebe.broker.job.JobSubscriptionManager.actor io.zeebe.util.sched.ActorControl)
              (wrap:java.lang.Runnable:0x000f: INVOKE_CUSTOM 
              (r5v0 'this' io.zeebe.broker.job.JobSubscriptionManager A[DONT_INLINE, IMMUTABLE_TYPE, THIS])
              (r6v0 'jobSubscription' io.zeebe.broker.job.processor.JobSubscription A[DONT_INLINE])
              (r0v0 'completableActorFuture' io.zeebe.util.sched.future.CompletableActorFuture A[DONT_INLINE])
             A[MD:(io.zeebe.broker.job.JobSubscriptionManager, io.zeebe.broker.job.processor.JobSubscription, io.zeebe.util.sched.future.CompletableActorFuture):java.lang.Runnable (s), WRAPPED]
             handle type: INVOKE_DIRECT
             lambda: java.lang.Runnable.run():void
             call insn: INVOKE 
              (r1 I:io.zeebe.broker.job.JobSubscriptionManager)
              (r2 I:io.zeebe.broker.job.processor.JobSubscription)
              (r3 I:io.zeebe.util.sched.future.CompletableActorFuture)
             DIRECT call: io.zeebe.broker.job.JobSubscriptionManager.lambda$addSubscription$1(io.zeebe.broker.job.processor.JobSubscription, io.zeebe.util.sched.future.CompletableActorFuture):void A[MD:(io.zeebe.broker.job.processor.JobSubscription, io.zeebe.util.sched.future.CompletableActorFuture):void (m)])
             VIRTUAL call: io.zeebe.util.sched.ActorControl.call(java.lang.Runnable):io.zeebe.util.sched.future.ActorFuture in method: io.zeebe.broker.job.JobSubscriptionManager.addSubscription(io.zeebe.broker.job.processor.JobSubscription):io.zeebe.util.sched.future.ActorFuture<java.lang.Void>, file: input_file:io/zeebe/broker/job/JobSubscriptionManager.class
            	at jadx.core.codegen.InsnGen.makeInsn(InsnGen.java:310)
            	at jadx.core.codegen.InsnGen.makeInsn(InsnGen.java:273)
            	at jadx.core.codegen.RegionGen.makeSimpleBlock(RegionGen.java:94)
            	at jadx.core.dex.nodes.IBlock.generate(IBlock.java:15)
            	at jadx.core.codegen.RegionGen.makeRegion(RegionGen.java:66)
            	at jadx.core.dex.regions.Region.generate(Region.java:35)
            	at jadx.core.codegen.RegionGen.makeRegion(RegionGen.java:66)
            	at jadx.core.codegen.MethodGen.addRegionInsns(MethodGen.java:297)
            	at jadx.core.codegen.MethodGen.addInstructions(MethodGen.java:276)
            	at jadx.core.codegen.ClassGen.addMethodCode(ClassGen.java:406)
            	at jadx.core.codegen.ClassGen.addMethod(ClassGen.java:335)
            	at jadx.core.codegen.ClassGen.lambda$addInnerClsAndMethods$3(ClassGen.java:301)
            	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
            	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
            	at java.base/java.util.stream.SortedOps$RefSortingSink.end(SortedOps.java:395)
            	at java.base/java.util.stream.Sink$ChainedReference.end(Sink.java:261)
            Caused by: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.SSAVar.setCodeVar(jadx.core.dex.instructions.args.CodeVar)" because the return value of "jadx.core.dex.instructions.args.RegisterArg.getSVar()" is null
            	at jadx.core.codegen.InsnGen.makeInlinedLambdaMethod(InsnGen.java:1041)
            	at jadx.core.codegen.InsnGen.makeInvokeLambda(InsnGen.java:936)
            	at jadx.core.codegen.InsnGen.makeInvoke(InsnGen.java:827)
            	at jadx.core.codegen.InsnGen.makeInsnBody(InsnGen.java:422)
            	at jadx.core.codegen.InsnGen.addWrappedArg(InsnGen.java:145)
            	at jadx.core.codegen.InsnGen.addArg(InsnGen.java:121)
            	at jadx.core.codegen.InsnGen.addArg(InsnGen.java:108)
            	at jadx.core.codegen.InsnGen.generateMethodArguments(InsnGen.java:1117)
            	at jadx.core.codegen.InsnGen.makeInvoke(InsnGen.java:884)
            	at jadx.core.codegen.InsnGen.makeInsnBody(InsnGen.java:422)
            	at jadx.core.codegen.InsnGen.makeInsn(InsnGen.java:303)
            	... 15 more
            */
        /*
            this = this;
            io.zeebe.util.sched.future.CompletableActorFuture r0 = new io.zeebe.util.sched.future.CompletableActorFuture
            r1 = r0
            r1.<init>()
            r7 = r0
            r0 = r5
            io.zeebe.util.sched.ActorControl r0 = r0.actor
            r1 = r5
            r2 = r6
            r3 = r7
            io.zeebe.util.sched.future.ActorFuture<java.lang.Void> r1 = () -> { // java.lang.Runnable.run():void
                r1.lambda$addSubscription$1(r2, r3);
            }
            io.zeebe.util.sched.future.ActorFuture r0 = r0.call(r1)
            r0 = r7
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: io.zeebe.broker.job.JobSubscriptionManager.addSubscription(io.zeebe.broker.job.processor.JobSubscription):io.zeebe.util.sched.future.ActorFuture");
    }

    private void addSubscriptionToStreamProcessor(JobSubscription jobSubscription, CompletableActorFuture<Void> completableActorFuture, ActivateJobStreamProcessor activateJobStreamProcessor) {
        this.streamProcessorBySubscriptionId.put(jobSubscription.getSubscriberKey(), activateJobStreamProcessor);
        this.actor.runOnCompletion(activateJobStreamProcessor.addSubscription(jobSubscription), (r7, th) -> {
            if (th == null) {
                completableActorFuture.complete((Object) null);
            } else {
                this.subscriptions.removeSubscription(jobSubscription.getSubscriberKey());
                completableActorFuture.completeExceptionally(th);
            }
        });
    }

    public ActorFuture<Void> removeSubscription(long j) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.call(() -> {
            JobSubscription subscription = this.subscriptions.getSubscription(j);
            if (subscription == null) {
                completableActorFuture.completeExceptionally(new RuntimeException("Subscription does not exist"));
                return;
            }
            PartitionBucket partitionBucket = (PartitionBucket) this.logStreamBuckets.get(subscription.getPartitionId());
            ActivateJobStreamProcessor activeStreamProcessor = partitionBucket.getActiveStreamProcessor(subscription.getJobType());
            if (activeStreamProcessor == null) {
                completableActorFuture.complete((Object) null);
            } else {
                this.actor.runOnCompletion(activeStreamProcessor.removeSubscription(j), (r10, th) -> {
                    if (th == null) {
                        completableActorFuture.complete((Object) null);
                    } else {
                        completableActorFuture.completeExceptionally(th);
                    }
                    this.subscriptions.removeSubscription(j);
                    this.streamProcessorBySubscriptionId.remove(j);
                    if (this.subscriptions.getSubscriptionsForPartitionAndType(subscription.getPartitionId(), subscription.getJobType()) == 0) {
                        partitionBucket.stopStreamProcessor(subscription.getJobType());
                    }
                });
            }
        });
        return completableActorFuture;
    }

    public boolean increaseSubscriptionCreditsAsync(CreditsRequest creditsRequest) {
        return creditsRequest.writeTo(this.creditRequestBuffer);
    }

    protected boolean dispatchSubscriptionCredits(CreditsRequest creditsRequest) {
        ActivateJobStreamProcessor activateJobStreamProcessor = (ActivateJobStreamProcessor) this.streamProcessorBySubscriptionId.get(creditsRequest.getSubscriberKey());
        if (activateJobStreamProcessor != null) {
            return activateJobStreamProcessor.increaseSubscriptionCreditsAsync(creditsRequest);
        }
        return true;
    }

    public void consumeCreditsRequest() {
        dispatchBackpressuredSubscriptionCredits();
        if (this.backPressuredCreditsRequests.size() == 0) {
            this.creditRequestBuffer.read((i, mutableDirectBuffer, i2, i3) -> {
                this.creditsRequest.wrap(mutableDirectBuffer, i2, i3);
                if (dispatchSubscriptionCredits(this.creditsRequest)) {
                    return;
                }
                backpressureRequest(this.creditsRequest);
            }, 1);
        }
    }

    protected void backpressureRequest(CreditsRequest creditsRequest) {
        creditsRequest.appendTo(this.backPressuredCreditsRequests);
    }

    protected void dispatchBackpressuredSubscriptionCredits() {
        this.actor.runUntilDone(this::dispatchNextBackpressuredSubscriptionCredit);
    }

    protected void dispatchNextBackpressuredSubscriptionCredit() {
        int size = this.backPressuredCreditsRequests.size() - 1;
        if (size < 0) {
            this.actor.done();
            return;
        }
        this.creditsRequest.wrapListElement(this.backPressuredCreditsRequests, size);
        if (!dispatchSubscriptionCredits(this.creditsRequest)) {
            this.actor.yield();
        } else {
            this.backPressuredCreditsRequests.remove(size);
            this.actor.run(this::dispatchNextBackpressuredSubscriptionCredit);
        }
    }

    public void addPartition(ServiceName<Partition> serviceName, Partition partition) {
        this.actor.call(() -> {
            this.logStreamBuckets.put(partition.getInfo().getPartitionId(), new PartitionBucket(partition, serviceName));
        });
    }

    public void removePartition(Partition partition) {
        this.actor.call(() -> {
            int partitionId = partition.getInfo().getPartitionId();
            this.logStreamBuckets.remove(partitionId);
            this.subscriptions.removeSubscriptionsForPartition(partitionId);
        });
    }

    public void onClientChannelCloseAsync(int i) {
        this.actor.call(() -> {
            for (JobSubscription jobSubscription : this.subscriptions.getSubscriptionsForChannel(i)) {
                this.subscriptions.removeSubscription(jobSubscription.getSubscriberKey());
                PartitionBucket partitionBucket = (PartitionBucket) this.logStreamBuckets.get(jobSubscription.getPartitionId());
                partitionBucket.getActiveStreamProcessor(jobSubscription.getJobType()).removeSubscription(jobSubscription.getSubscriberKey());
                if (this.subscriptions.getSubscriptionsForPartitionAndType(jobSubscription.getPartitionId(), jobSubscription.getJobType()) == 0) {
                    partitionBucket.stopStreamProcessor(jobSubscription.getJobType());
                }
            }
        });
    }

    public void onConnectionEstablished(RemoteAddress remoteAddress) {
    }

    public void onConnectionClosed(RemoteAddress remoteAddress) {
        onClientChannelCloseAsync(remoteAddress.getStreamId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String streamProcessorName(DirectBuffer directBuffer) {
        return String.format("job-activate.%s", BufferUtil.bufferAsString(directBuffer));
    }
}
