package io.zeebe.broker.job.processor;

import io.zeebe.broker.job.CreditsRequest;
import io.zeebe.broker.job.CreditsRequestBuffer;
import io.zeebe.broker.job.JobSubscriptionManager;
import io.zeebe.broker.job.data.JobRecord;
import io.zeebe.broker.job.processor.JobSubscriptions;
import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.RecordMetadata;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.util.EnsureUtil;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.channel.ChannelSubscription;
import io.zeebe.util.sched.clock.ActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/broker/job/processor/ActivateJobStreamProcessor.class */
public class ActivateJobStreamProcessor implements TypedRecordProcessor<JobRecord>, StreamProcessorLifecycleAware {
    protected final CreditsRequestBuffer creditsBuffer = new CreditsRequestBuffer(JobSubscriptionManager.NUM_CONCURRENT_REQUESTS);
    private final JobSubscriptions subscriptions = new JobSubscriptions(8);
    private final JobSubscriptions.SubscriptionIterator jobDistributionIterator = this.subscriptions.iterator2();
    private final DirectBuffer subscribedJobType;
    private ActorControl actor;
    private StreamProcessorContext context;
    private JobSubscription selectedSubscriber;
    private ChannelSubscription creditsSubscription;

    /* JADX WARN: Type inference failed for: r1v5, types: [io.zeebe.broker.job.processor.JobSubscriptions$SubscriptionIterator] */
    public ActivateJobStreamProcessor(DirectBuffer directBuffer) {
        this.subscribedJobType = directBuffer;
    }

    public DirectBuffer getSubscriptedJobType() {
        return this.subscribedJobType;
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        this.context = typedStreamProcessor.getStreamProcessorContext();
        this.actor = this.context.getActorControl();
        this.creditsSubscription = this.actor.consume(this.creditsBuffer, this::consumeCreditsRequest);
        this.context.suspendController();
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onClose() {
        if (this.creditsSubscription != null) {
            this.creditsSubscription.cancel();
            this.creditsSubscription = null;
        }
    }

    public TypedStreamProcessor createStreamProcessor(TypedStreamEnvironment typedStreamEnvironment) {
        return typedStreamEnvironment.newStreamProcessor().onEvent(ValueType.JOB, (Intent) JobIntent.CREATED, (TypedRecordProcessor<?>) this).onEvent(ValueType.JOB, (Intent) JobIntent.TIMED_OUT, (TypedRecordProcessor<?>) this).onEvent(ValueType.JOB, (Intent) JobIntent.FAILED, (TypedRecordProcessor<?>) this).onEvent(ValueType.JOB, (Intent) JobIntent.RETRIES_UPDATED, (TypedRecordProcessor<?>) this).build();
    }

    public ActorFuture<Void> addSubscription(JobSubscription jobSubscription) {
        try {
            EnsureUtil.ensureNotNull("subscription", jobSubscription);
            EnsureUtil.ensureNotNullOrEmpty("job type", jobSubscription.getJobType());
            EnsureUtil.ensureNotNullOrEmpty("worker", jobSubscription.getWorker());
            EnsureUtil.ensureGreaterThan("length of worker", jobSubscription.getWorker().capacity(), 0L);
            EnsureUtil.ensureLessThanOrEqual("length of worker", jobSubscription.getWorker().capacity(), 64L);
            EnsureUtil.ensureGreaterThan("timeout", jobSubscription.getTimeout(), 0L);
            EnsureUtil.ensureGreaterThan("subscription credits", jobSubscription.getCredits(), 0L);
            return this.actor.call(() -> {
                this.subscriptions.addSubscription(jobSubscription);
                this.context.resumeController();
            });
        } catch (Exception e) {
            return CompletableActorFuture.completedExceptionally(e);
        }
    }

    public ActorFuture<Void> removeSubscription(long j) {
        return this.actor.call(() -> {
            this.subscriptions.removeSubscription(j);
            if (this.subscriptions.isEmpty()) {
                this.context.suspendController();
            }
        });
    }

    private void consumeCreditsRequest() {
        CreditsRequest creditsRequest = new CreditsRequest();
        this.creditsBuffer.read((i, mutableDirectBuffer, i2, i3) -> {
            creditsRequest.wrap(mutableDirectBuffer, i2, i3);
            increaseSubscriptionCredits(creditsRequest);
        }, 1);
    }

    public boolean increaseSubscriptionCreditsAsync(CreditsRequest creditsRequest) {
        return creditsRequest.writeTo(this.creditsBuffer);
    }

    protected void increaseSubscriptionCredits(CreditsRequest creditsRequest) {
        this.subscriptions.addCredits(creditsRequest.getSubscriberKey(), creditsRequest.getCredits());
        this.context.resumeController();
    }

    protected JobSubscription getNextAvailableSubscription() {
        JobSubscription jobSubscription = null;
        if (this.subscriptions.getTotalCredits() > 0) {
            int size = this.subscriptions.size();
            for (int i = 0; i < size && jobSubscription == null; i++) {
                if (!this.jobDistributionIterator.hasNext()) {
                    this.jobDistributionIterator.reset();
                }
                JobSubscription next = this.jobDistributionIterator.next();
                if (next.getCredits() > 0) {
                    jobSubscription = next;
                }
            }
        }
        return jobSubscription;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<JobRecord> typedRecord) {
        this.selectedSubscriber = null;
        JobRecord value = typedRecord.getValue();
        if (!BufferUtil.equals(value.getType(), this.subscribedJobType) || value.getRetries() <= 0) {
            return;
        }
        this.selectedSubscriber = getNextAvailableSubscription();
        if (this.selectedSubscriber != null) {
            value.setDeadline(ActorClock.currentTimeMillis() + this.selectedSubscriber.getTimeout()).setWorker(this.selectedSubscriber.getWorker());
        }
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public long writeRecord(TypedRecord<JobRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
        long j = 0;
        if (this.selectedSubscriber != null) {
            j = typedStreamWriter.writeFollowUpCommand(typedRecord.getKey(), JobIntent.ACTIVATE, typedRecord.getValue(), this::assignToSelectedSubscriber);
        }
        return j;
    }

    private void assignToSelectedSubscriber(RecordMetadata recordMetadata) {
        recordMetadata.subscriberKey(this.selectedSubscriber.getSubscriberKey());
        recordMetadata.requestStreamId(this.selectedSubscriber.getStreamId());
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void updateState(TypedRecord<JobRecord> typedRecord) {
        if (this.selectedSubscriber != null) {
            this.subscriptions.addCredits(this.selectedSubscriber.getSubscriberKey(), -1);
            if (this.subscriptions.getTotalCredits() <= 0) {
                this.context.suspendController();
            }
        }
    }
}
