/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.task.processor;

import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.logstreams.processor.NoopSnapshotSupport;
import io.zeebe.broker.task.CreditsRequest;
import io.zeebe.broker.task.CreditsRequestBuffer;
import io.zeebe.broker.task.data.TaskEvent;
import io.zeebe.broker.task.data.TaskState;
import io.zeebe.broker.task.processor.TaskSubscription;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventFilter;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.EnsureUtil;
import io.zeebe.util.buffer.BufferReader;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.buffer.BufferWriter;
import io.zeebe.util.time.ClockUtil;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;

public class LockTaskStreamProcessor
implements StreamProcessor,
EventProcessor {
    protected final BrokerEventMetadata targetEventMetadata = new BrokerEventMetadata();
    protected final NoopSnapshotSupport noopSnapshotSupport = new NoopSnapshotSupport();
    protected DeferredCommandContext cmdQueue;
    protected CreditsRequestBuffer creditsBuffer = new CreditsRequestBuffer(1024, this::increaseSubscriptionCredits);
    protected final Long2ObjectHashMap<TaskSubscription> subscriptionsById = new Long2ObjectHashMap();
    protected Iterator<TaskSubscription> subscriptionIterator;
    protected final DirectBuffer subscriptedTaskType;
    protected DirectBuffer logStreamTopicName;
    protected int logStreamPartitionId;
    protected LogStream targetStream;
    protected int availableSubscriptionCredits = 0;
    protected final TaskEvent taskEvent = new TaskEvent();
    protected long eventKey = 0L;
    protected boolean hasLockedTask;
    protected TaskSubscription lockSubscription;
    protected boolean isSuspended = true;

    public LockTaskStreamProcessor(DirectBuffer taskType) {
        this.subscriptedTaskType = taskType;
        this.subscriptionIterator = this.subscriptionsById.values().iterator();
    }

    public SnapshotSupport getStateResource() {
        return this.noopSnapshotSupport;
    }

    public boolean isSuspended() {
        this.creditsBuffer.handleRequests();
        return this.isSuspended;
    }

    public DirectBuffer getSubscriptedTaskType() {
        return this.subscriptedTaskType;
    }

    public DirectBuffer getLogStreamTopicName() {
        return this.logStreamTopicName;
    }

    public int getLogStreamPartitionId() {
        return this.logStreamPartitionId;
    }

    public void onOpen(StreamProcessorContext context) {
        this.cmdQueue = context.getStreamProcessorCmdQueue();
        LogStream sourceStream = context.getSourceStream();
        this.logStreamTopicName = sourceStream.getTopicName();
        this.logStreamPartitionId = sourceStream.getPartitionId();
        this.targetStream = context.getTargetStream();
    }

    public CompletableFuture<Void> addSubscription(TaskSubscription subscription) {
        EnsureUtil.ensureNotNull((String)"subscription", (Object)subscription);
        EnsureUtil.ensureNotNull((String)"lock task type", (Object)subscription.getLockTaskType());
        EnsureUtil.ensureNotNull((String)"lock owner", (Object)subscription.getLockOwner());
        EnsureUtil.ensureGreaterThan((String)"length of lock owner", (long)subscription.getLockOwner().capacity(), (long)0L);
        EnsureUtil.ensureLessThanOrEqual((String)"length of lock owner", (long)subscription.getLockOwner().capacity(), (long)64L);
        EnsureUtil.ensureGreaterThan((String)"lock duration", (long)subscription.getLockDuration(), (long)0L);
        EnsureUtil.ensureGreaterThan((String)"subscription credits", (long)subscription.getCredits(), (long)0L);
        if (!BufferUtil.equals((DirectBuffer)subscription.getLockTaskType(), (DirectBuffer)this.subscriptedTaskType)) {
            String errorMessage = String.format("Subscription task type is not equal to '%s'.", BufferUtil.bufferAsString((DirectBuffer)this.subscriptedTaskType));
            throw new RuntimeException(errorMessage);
        }
        return this.cmdQueue.runAsync(future -> {
            this.subscriptionsById.put(subscription.getSubscriberKey(), (Object)subscription);
            this.availableSubscriptionCredits += subscription.getCredits();
            this.isSuspended = false;
            future.complete(null);
        });
    }

    public CompletableFuture<Boolean> removeSubscription(long subscriptionId) {
        return this.cmdQueue.runAsync(future -> {
            TaskSubscription subscription = (TaskSubscription)this.subscriptionsById.remove(subscriptionId);
            boolean hasSubscriptions = this.onRemove(subscription);
            future.complete(hasSubscriptions);
        });
    }

    protected boolean onRemove(TaskSubscription subscription) {
        boolean hasSubscriptions;
        if (subscription != null) {
            this.availableSubscriptionCredits -= subscription.getCredits();
        }
        boolean bl = hasSubscriptions = this.subscriptionsById.size() > 0;
        if (!hasSubscriptions) {
            this.isSuspended = true;
        }
        return hasSubscriptions;
    }

    public CompletableFuture<Boolean> onClientChannelCloseAsync(int channelId) {
        return this.cmdQueue.runAsync(future -> {
            Iterator subscriptionIt = this.subscriptionsById.values().iterator();
            boolean hasSubscriptions = true;
            while (subscriptionIt.hasNext()) {
                TaskSubscription subscription = (TaskSubscription)subscriptionIt.next();
                if (subscription.getStreamId() != channelId) continue;
                subscriptionIt.remove();
                hasSubscriptions = this.onRemove(subscription);
            }
            future.complete(hasSubscriptions);
        });
    }

    public boolean increaseSubscriptionCreditsAsync(CreditsRequest request) {
        return this.creditsBuffer.offerRequest(request);
    }

    protected void increaseSubscriptionCredits(CreditsRequest request) {
        long subscriberKey = request.getSubscriberKey();
        int credits = request.getCredits();
        TaskSubscription subscription = (TaskSubscription)this.subscriptionsById.get(subscriberKey);
        if (subscription != null) {
            this.availableSubscriptionCredits += credits;
            subscription.setCredits(subscription.getCredits() + credits);
            this.isSuspended = false;
        }
    }

    protected TaskSubscription getNextAvailableSubscription() {
        TaskSubscription nextSubscription = null;
        if (this.availableSubscriptionCredits > 0) {
            int subscriptionSize = this.subscriptionsById.size();
            for (int seenSubscriptions = 0; seenSubscriptions < subscriptionSize && nextSubscription == null; ++seenSubscriptions) {
                TaskSubscription subscription;
                if (!this.subscriptionIterator.hasNext()) {
                    this.subscriptionIterator = this.subscriptionsById.values().iterator();
                }
                if ((subscription = this.subscriptionIterator.next()).getCredits() <= 0) continue;
                nextSubscription = subscription;
            }
        }
        return nextSubscription;
    }

    public static MetadataFilter eventFilter() {
        return m -> m.getEventType() == EventType.TASK_EVENT;
    }

    public static final EventFilter reprocessingEventFilter(DirectBuffer taskType) {
        TaskEvent taskEvent = new TaskEvent();
        return event -> {
            taskEvent.reset();
            event.readValue((BufferReader)taskEvent);
            return BufferUtil.equals((DirectBuffer)taskEvent.getType(), (DirectBuffer)taskType);
        };
    }

    public EventProcessor onEvent(LoggedEvent event) {
        this.eventKey = event.getKey();
        this.taskEvent.reset();
        event.readValue((BufferReader)this.taskEvent);
        LockTaskStreamProcessor eventProcessor = null;
        if (BufferUtil.equals((DirectBuffer)this.taskEvent.getType(), (DirectBuffer)this.subscriptedTaskType)) {
            switch (this.taskEvent.getState()) {
                case CREATED: 
                case LOCK_EXPIRED: 
                case FAILED: 
                case RETRIES_UPDATED: {
                    eventProcessor = this;
                    break;
                }
            }
        }
        return eventProcessor;
    }

    public void processEvent() {
        this.hasLockedTask = false;
        if (this.taskEvent.getRetries() > 0) {
            this.lockSubscription = this.getNextAvailableSubscription();
            if (this.lockSubscription != null) {
                long lockTimeout = ClockUtil.getCurrentTimeInMillis() + this.lockSubscription.getLockDuration();
                this.taskEvent.setState(TaskState.LOCK).setLockTime(lockTimeout).setLockOwner(this.lockSubscription.getLockOwner());
                this.hasLockedTask = true;
            }
        }
    }

    public long writeEvent(LogStreamWriter writer) {
        long position = 0L;
        if (this.hasLockedTask) {
            this.targetEventMetadata.reset();
            this.targetEventMetadata.requestStreamId(this.lockSubscription.getStreamId()).subscriberKey(this.lockSubscription.getSubscriberKey()).protocolVersion(1).raftTermId(this.targetStream.getTerm()).eventType(EventType.TASK_EVENT);
            position = writer.key(this.eventKey).metadataWriter((BufferWriter)this.targetEventMetadata).valueWriter((BufferWriter)this.taskEvent).tryWrite();
        }
        return position;
    }

    public void updateState() {
        if (this.hasLockedTask) {
            int credits = this.lockSubscription.getCredits();
            this.lockSubscription.setCredits(credits - 1);
            --this.availableSubscriptionCredits;
            if (this.availableSubscriptionCredits <= 0) {
                this.isSuspended = true;
            }
        }
    }
}

