package io.zeebe.broker.task.processor;

import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedEvent;
import io.zeebe.broker.logstreams.processor.TypedEventProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.task.CreditsRequest;
import io.zeebe.broker.task.CreditsRequestBuffer;
import io.zeebe.broker.task.TaskSubscriptionManager;
import io.zeebe.broker.task.data.TaskEvent;
import io.zeebe.broker.task.data.TaskState;
import io.zeebe.broker.task.processor.TaskSubscriptions;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.EnsureUtil;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.clock.ActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/broker/task/processor/LockTaskStreamProcessor.class */
public class LockTaskStreamProcessor implements TypedEventProcessor<TaskEvent>, StreamProcessorLifecycleAware {
    protected final CreditsRequestBuffer creditsBuffer = new CreditsRequestBuffer(TaskSubscriptionManager.NUM_CONCURRENT_REQUESTS, this::increaseSubscriptionCredits);
    private final TaskSubscriptions subscriptions = new TaskSubscriptions(8);
    private final TaskSubscriptions.SubscriptionIterator taskDistributionIterator = this.subscriptions.iterator2();
    private final TaskSubscriptions.SubscriptionIterator managementIterator = this.subscriptions.iterator2();
    private final DirectBuffer subscribedTaskType;
    private int partitionId;
    private ActorControl actor;
    private StreamProcessorContext context;
    private TaskSubscription selectedSubscriber;

    /* JADX WARN: Type inference failed for: r1v5, types: [io.zeebe.broker.task.processor.TaskSubscriptions$SubscriptionIterator] */
    /* JADX WARN: Type inference failed for: r1v8, types: [io.zeebe.broker.task.processor.TaskSubscriptions$SubscriptionIterator] */
    public LockTaskStreamProcessor(DirectBuffer directBuffer) {
        this.subscribedTaskType = directBuffer;
    }

    public DirectBuffer getSubscriptedTaskType() {
        return this.subscribedTaskType;
    }

    public int getLogStreamPartitionId() {
        return this.partitionId;
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        this.context = typedStreamProcessor.getStreamProcessorContext();
        this.actor = this.context.getActorControl();
        this.context.suspendController();
    }

    public TypedStreamProcessor createStreamProcessor(TypedStreamEnvironment typedStreamEnvironment) {
        this.partitionId = typedStreamEnvironment.getStream().getPartitionId();
        return typedStreamEnvironment.newStreamProcessor().onEvent(EventType.TASK_EVENT, TaskState.CREATED, this).onEvent(EventType.TASK_EVENT, TaskState.LOCK_EXPIRED, this).onEvent(EventType.TASK_EVENT, TaskState.FAILED, this).onEvent(EventType.TASK_EVENT, TaskState.RETRIES_UPDATED, this).build();
    }

    public ActorFuture<Void> addSubscription(TaskSubscription taskSubscription) {
        try {
            EnsureUtil.ensureNotNull("subscription", taskSubscription);
            EnsureUtil.ensureNotNullOrEmpty("lock task type", taskSubscription.getLockTaskType());
            EnsureUtil.ensureNotNullOrEmpty("lock owner", taskSubscription.getLockOwner());
            EnsureUtil.ensureGreaterThan("length of lock owner", taskSubscription.getLockOwner().capacity(), 0L);
            EnsureUtil.ensureLessThanOrEqual("length of lock owner", taskSubscription.getLockOwner().capacity(), 64L);
            EnsureUtil.ensureGreaterThan("lock duration", taskSubscription.getLockDuration(), 0L);
            EnsureUtil.ensureGreaterThan("subscription credits", taskSubscription.getCredits(), 0L);
            if (BufferUtil.equals(taskSubscription.getLockTaskType(), this.subscribedTaskType)) {
                return this.actor.call(() -> {
                    this.subscriptions.addSubscription(taskSubscription);
                    this.context.resumeController();
                });
            }
            throw new RuntimeException(String.format("Subscription task type is not equal to '%s'.", BufferUtil.bufferAsString(this.subscribedTaskType)));
        } catch (Exception e) {
            return CompletableActorFuture.completedExceptionally(e);
        }
    }

    public ActorFuture<Boolean> removeSubscription(long j) {
        return this.actor.call(() -> {
            this.subscriptions.removeSubscription(j);
            boolean isEmpty = this.subscriptions.isEmpty();
            if (isEmpty) {
                this.context.suspendController();
            }
            return Boolean.valueOf(!isEmpty);
        });
    }

    public ActorFuture<Boolean> onClientChannelCloseAsync(int i) {
        return this.actor.call(() -> {
            this.managementIterator.reset();
            while (this.managementIterator.hasNext()) {
                if (this.managementIterator.next().getStreamId() == i) {
                    this.managementIterator.remove();
                }
            }
            boolean isEmpty = this.subscriptions.isEmpty();
            if (isEmpty) {
                this.context.suspendController();
            }
            return Boolean.valueOf(!isEmpty);
        });
    }

    public boolean increaseSubscriptionCreditsAsync(CreditsRequest creditsRequest) {
        this.actor.call(() -> {
            this.creditsBuffer.handleRequests();
        });
        return this.creditsBuffer.offerRequest(creditsRequest);
    }

    protected void increaseSubscriptionCredits(CreditsRequest creditsRequest) {
        this.subscriptions.addCredits(creditsRequest.getSubscriberKey(), creditsRequest.getCredits());
        this.context.resumeController();
    }

    protected TaskSubscription getNextAvailableSubscription() {
        TaskSubscription taskSubscription = null;
        if (this.subscriptions.getTotalCredits() > 0) {
            int size = this.subscriptions.size();
            for (int i = 0; i < size && taskSubscription == null; i++) {
                if (!this.taskDistributionIterator.hasNext()) {
                    this.taskDistributionIterator.reset();
                }
                TaskSubscription next = this.taskDistributionIterator.next();
                if (next.getCredits() > 0) {
                    taskSubscription = next;
                }
            }
        }
        return taskSubscription;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public void processEvent(TypedEvent<TaskEvent> typedEvent) {
        this.selectedSubscriber = null;
        TaskEvent value = typedEvent.getValue();
        if (!BufferUtil.equals(value.getType(), this.subscribedTaskType) || value.getRetries() <= 0) {
            return;
        }
        this.selectedSubscriber = getNextAvailableSubscription();
        if (this.selectedSubscriber != null) {
            value.setState(TaskState.LOCK).setLockTime(ActorClock.currentTimeMillis() + this.selectedSubscriber.getLockDuration()).setLockOwner(this.selectedSubscriber.getLockOwner());
        }
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public long writeEvent(TypedEvent<TaskEvent> typedEvent, TypedStreamWriter typedStreamWriter) {
        long j = 0;
        if (this.selectedSubscriber != null) {
            j = typedStreamWriter.writeFollowupEvent(typedEvent.getKey(), typedEvent.getValue(), this::assignToSelectedSubscriber);
        }
        return j;
    }

    private void assignToSelectedSubscriber(BrokerEventMetadata brokerEventMetadata) {
        brokerEventMetadata.subscriberKey(this.selectedSubscriber.getSubscriberKey());
        brokerEventMetadata.requestStreamId(this.selectedSubscriber.getStreamId());
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
    public void updateState(TypedEvent<TaskEvent> typedEvent) {
        if (this.selectedSubscriber != null) {
            this.subscriptions.addCredits(this.selectedSubscriber.getSubscriberKey(), -1);
            if (this.subscriptions.getTotalCredits() <= 0) {
                this.context.suspendController();
            }
        }
    }
}
