package io.zeebe.broker.job.old;

import io.zeebe.broker.job.JobStateController;
import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedCommandWriter;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.util.EnsureUtil;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.channel.ChannelSubscription;
import io.zeebe.util.sched.clock.ActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.Map;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.Object2ObjectHashMap;

/* loaded from: input_file:io/zeebe/broker/job/old/JobSubscriptionProcessor.class */
public class JobSubscriptionProcessor implements TypedRecordProcessor<JobRecord>, StreamProcessorLifecycleAware {
    private final Map<DirectBuffer, JobSubscriptions> typeSubscriptions = new Object2ObjectHashMap();
    private final Map<Long, JobSubscriptions> keySubscriptions = new Long2ObjectHashMap();
    private final CreditsRequestBuffer creditsBuffer = new CreditsRequestBuffer(JobSubscriptionManager.NUM_CONCURRENT_REQUESTS);
    private final JobStateController state;
    private ActorControl actor;
    private ChannelSubscription creditsSubscription;
    private TypedCommandWriter writer;

    public JobSubscriptionProcessor(JobStateController jobStateController) {
        this.state = jobStateController;
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        this.actor = typedStreamProcessor.getStreamProcessorContext().getActorControl();
        this.creditsSubscription = this.actor.consume(this.creditsBuffer, this::consumeCreditsRequest);
        this.writer = typedStreamProcessor.getEnvironment().buildCommandWriter();
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onClose() {
        if (this.creditsSubscription != null) {
            this.creditsSubscription.cancel();
            this.creditsSubscription = null;
        }
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<JobRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        JobSubscriptions jobSubscriptions;
        JobSubscription nextAvailableSubscription;
        JobRecord value = typedRecord.getValue();
        DirectBuffer type = value.getType();
        if (value.getRetries() >= 1 && this.typeSubscriptions.containsKey(type) && (nextAvailableSubscription = getNextAvailableSubscription((jobSubscriptions = this.typeSubscriptions.get(type)))) != null) {
            activateJob(typedRecord.getKey(), value, typedStreamWriter, jobSubscriptions, nextAvailableSubscription);
        }
    }

    public ActorFuture<Void> addSubscription(JobSubscription jobSubscription) {
        try {
            EnsureUtil.ensureNotNull("subscription", jobSubscription);
            EnsureUtil.ensureNotNullOrEmpty("job type", jobSubscription.getJobType());
            EnsureUtil.ensureNotNullOrEmpty("worker", jobSubscription.getWorker());
            EnsureUtil.ensureGreaterThan("length of worker", jobSubscription.getWorker().capacity(), 0L);
            EnsureUtil.ensureLessThanOrEqual("length of worker", jobSubscription.getWorker().capacity(), 64L);
            EnsureUtil.ensureGreaterThan("timeout", jobSubscription.getTimeout(), 0L);
            EnsureUtil.ensureGreaterThan("subscription credits", jobSubscription.getCredits(), 0L);
            return this.actor.call(() -> {
                JobSubscriptions jobSubscriptions = this.typeSubscriptions.get(jobSubscription.getJobType());
                if (jobSubscriptions == null) {
                    jobSubscriptions = new JobSubscriptions(jobSubscription.getJobType(), 8);
                    this.typeSubscriptions.put(jobSubscription.getJobType(), jobSubscriptions);
                }
                if (!this.keySubscriptions.containsKey(Long.valueOf(jobSubscription.getSubscriberKey()))) {
                    this.keySubscriptions.put(Long.valueOf(jobSubscription.getSubscriberKey()), jobSubscriptions);
                }
                jobSubscriptions.addSubscription(jobSubscription);
                activateOutstandingJobs(jobSubscription.getJobType());
            });
        } catch (Exception e) {
            return CompletableActorFuture.completedExceptionally(e);
        }
    }

    public ActorFuture<Void> removeSubscription(long j) {
        return this.actor.call(() -> {
            if (this.keySubscriptions.containsKey(Long.valueOf(j))) {
                JobSubscriptions jobSubscriptions = this.keySubscriptions.get(Long.valueOf(j));
                jobSubscriptions.removeSubscription(j);
                this.keySubscriptions.remove(Long.valueOf(j));
                if (jobSubscriptions.isEmpty()) {
                    this.typeSubscriptions.remove(jobSubscriptions.getType());
                }
            }
        });
    }

    private void activateJob(long j, JobRecord jobRecord, TypedCommandWriter typedCommandWriter, JobSubscriptions jobSubscriptions, JobSubscription jobSubscription) {
        jobRecord.setDeadline(ActorClock.currentTimeMillis() + jobSubscription.getTimeout()).setWorker(jobSubscription.getWorker());
        typedCommandWriter.writeFollowUpCommand(j, JobIntent.ACTIVATE, jobRecord, recordMetadata -> {
            recordMetadata.subscriberKey(jobSubscription.getSubscriberKey());
            recordMetadata.requestStreamId(jobSubscription.getStreamId());
        });
        jobSubscriptions.addCredits(jobSubscription.getSubscriberKey(), -1);
    }

    public boolean increaseSubscriptionCreditsAsync(CreditsRequest creditsRequest) {
        return creditsRequest.writeTo(this.creditsBuffer);
    }

    private void increaseSubscriptionCredits(CreditsRequest creditsRequest) {
        long subscriberKey = creditsRequest.getSubscriberKey();
        int credits = creditsRequest.getCredits();
        if (this.keySubscriptions.containsKey(Long.valueOf(subscriberKey))) {
            JobSubscriptions jobSubscriptions = this.keySubscriptions.get(Long.valueOf(subscriberKey));
            jobSubscriptions.addCredits(subscriberKey, credits);
            activateOutstandingJobs(jobSubscriptions.getType());
        }
    }

    private void activateOutstandingJobs(DirectBuffer directBuffer) {
        JobSubscriptions jobSubscriptions = this.typeSubscriptions.get(directBuffer);
        if (jobSubscriptions != null) {
            this.state.forEachActivatableJobs(directBuffer, (j, jobRecord, iteratorControl) -> {
                JobSubscription nextAvailableSubscription = getNextAvailableSubscription(jobSubscriptions);
                if (nextAvailableSubscription == null) {
                    iteratorControl.stop();
                } else {
                    activateJob(j, jobRecord, this.writer, jobSubscriptions, nextAvailableSubscription);
                    this.writer.flush();
                }
            });
        }
    }

    private void consumeCreditsRequest() {
        CreditsRequest creditsRequest = new CreditsRequest();
        this.creditsBuffer.read((i, mutableDirectBuffer, i2, i3) -> {
            creditsRequest.wrap(mutableDirectBuffer, i2, i3);
            increaseSubscriptionCredits(creditsRequest);
        }, 1);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [io.zeebe.broker.job.old.JobSubscriptions$SubscriptionIterator] */
    private JobSubscription getNextAvailableSubscription(JobSubscriptions jobSubscriptions) {
        ?? iterator2 = jobSubscriptions.iterator2();
        JobSubscription jobSubscription = null;
        if (jobSubscriptions.getTotalCredits() > 0) {
            int size = jobSubscriptions.size();
            for (int i = 0; i < size && jobSubscription == null; i++) {
                if (!iterator2.hasNext()) {
                    iterator2.reset();
                }
                JobSubscription next = iterator2.next();
                if (next.getCredits() > 0) {
                    jobSubscription = next;
                }
            }
        }
        return jobSubscription;
    }
}
