package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.DrainingHashesTracker;
import org.apache.pulsar.broker.service.EntryAndMetadata;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.ImpactedConsumersResult;
import org.apache.pulsar.broker.service.PendingAcksMap;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.StickyKeyDispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.class */
public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers implements StickyKeyDispatcher {
    private final boolean allowOutOfOrderDelivery;
    private final StickyKeyConsumerSelector selector;
    private final boolean drainingHashesRequired;
    private boolean skipNextReplayToTriggerLookAhead;
    private final KeySharedMode keySharedMode;
    private final DrainingHashesTracker drainingHashesTracker;
    private final RescheduleReadHandler rescheduleReadHandler;
    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode = new int[KeySharedMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode[KeySharedMode.AUTO_SPLIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode[KeySharedMode.STICKY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers$ReplayPositionFilter.class */
    private class ReplayPositionFilter implements Predicate<Position> {
        private final Map<Consumer, MutableInt> availablePermitsMap = new HashMap();

        private ReplayPositionFilter() {
        }

        @Override // java.util.function.Predicate
        public boolean test(Position position) {
            if (PersistentStickyKeyDispatcherMultipleConsumers.this.isAllowOutOfOrderDelivery()) {
                return true;
            }
            Long hash = PersistentStickyKeyDispatcherMultipleConsumers.this.redeliveryMessages.getHash(position.getLedgerId(), position.getEntryId());
            if (hash == null) {
                if (!PersistentStickyKeyDispatcherMultipleConsumers.log.isDebugEnabled()) {
                    return true;
                }
                PersistentStickyKeyDispatcherMultipleConsumers.log.debug("[{}] replay of entry at position {} doesn't contain sticky key hash.", PersistentStickyKeyDispatcherMultipleConsumers.this.name, position);
                return true;
            }
            Consumer select = PersistentStickyKeyDispatcherMultipleConsumers.this.selector.select(hash.intValue());
            if (select == null) {
                return false;
            }
            MutableInt computeIfAbsent = this.availablePermitsMap.computeIfAbsent(select, consumer -> {
                return new MutableInt(PersistentStickyKeyDispatcherMultipleConsumers.this.getAvailablePermits(select));
            });
            if (computeIfAbsent.intValue() <= 0) {
                return false;
            }
            if (PersistentStickyKeyDispatcherMultipleConsumers.this.drainingHashesRequired && PersistentStickyKeyDispatcherMultipleConsumers.this.drainingHashesTracker.shouldBlockStickyKeyHash(select, hash.intValue())) {
                return false;
            }
            computeIfAbsent.decrement();
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic persistentTopic, ManagedCursor managedCursor, Subscription subscription, ServiceConfiguration serviceConfiguration, KeySharedMeta keySharedMeta) {
        super(persistentTopic, managedCursor, subscription, keySharedMeta.isAllowOutOfOrderDelivery());
        this.skipNextReplayToTriggerLookAhead = false;
        this.allowOutOfOrderDelivery = keySharedMeta.isAllowOutOfOrderDelivery();
        this.keySharedMode = keySharedMeta.getKeySharedMode();
        this.drainingHashesRequired = this.keySharedMode == KeySharedMode.AUTO_SPLIT && !this.allowOutOfOrderDelivery;
        this.drainingHashesTracker = this.drainingHashesRequired ? new DrainingHashesTracker(getName(), this::stickyKeyHashUnblocked) : null;
        Objects.requireNonNull(serviceConfiguration);
        this.rescheduleReadHandler = new RescheduleReadHandler(serviceConfiguration::getKeySharedUnblockingIntervalMs, persistentTopic.getBrokerService().executor(), this::cancelPendingRead, () -> {
            reScheduleReadInMs(0L);
        }, () -> {
            return this.havePendingRead;
        }, this::getReadMoreEntriesCallCount, () -> {
            return !this.redeliveryMessages.isEmpty();
        });
        switch (AnonymousClass2.$SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode[this.keySharedMode.ordinal()]) {
            case 1:
                if (serviceConfiguration.isSubscriptionKeySharedUseConsistentHashing()) {
                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(serviceConfiguration.getSubscriptionKeySharedConsistentHashingReplicaPoints(), this.drainingHashesRequired);
                    return;
                } else {
                    this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector(this.drainingHashesRequired);
                    return;
                }
            case 2:
                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
                return;
            default:
                throw new IllegalArgumentException("Invalid key-shared mode: " + this.keySharedMode);
        }
    }

    private void stickyKeyHashUnblocked(int i) {
        if (log.isDebugEnabled()) {
            if (i > -1) {
                log.debug("[{}] Sticky key hash {} is unblocked", getName(), Integer.valueOf(i));
            } else {
                log.debug("[{}] Some sticky key hashes are unblocked", getName());
            }
        }
        reScheduleReadWithKeySharedUnblockingInterval();
    }

    private void reScheduleReadWithKeySharedUnblockingInterval() {
        this.rescheduleReadHandler.rescheduleRead();
    }

    @Override // org.apache.pulsar.broker.service.StickyKeyDispatcher
    @VisibleForTesting
    public StickyKeyConsumerSelector getSelector() {
        return this.selector;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
        if (IS_CLOSED_UPDATER.get(this) != 1) {
            return super.addConsumer(consumer).thenCompose(r5 -> {
                return this.selector.addConsumer(consumer);
            }).thenAccept((java.util.function.Consumer<? super U>) optional -> {
                if (this.drainingHashesRequired) {
                    consumer.setPendingAcksAddHandler(this::handleAddingPendingAck);
                    consumer.setPendingAcksRemoveHandler(new PendingAcksMap.PendingAcksRemoveHandler() { // from class: org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.1
                        @Override // org.apache.pulsar.broker.service.PendingAcksMap.PendingAcksRemoveHandler
                        public void handleRemoving(Consumer consumer2, long j, long j2, int i, boolean z) {
                            PersistentStickyKeyDispatcherMultipleConsumers.this.drainingHashesTracker.reduceRefCount(consumer2, i, z);
                        }

                        @Override // org.apache.pulsar.broker.service.PendingAcksMap.PendingAcksRemoveHandler
                        public void startBatch() {
                            PersistentStickyKeyDispatcherMultipleConsumers.this.drainingHashesTracker.startBatch();
                        }

                        @Override // org.apache.pulsar.broker.service.PendingAcksMap.PendingAcksRemoveHandler
                        public void endBatch() {
                            PersistentStickyKeyDispatcherMultipleConsumers.this.drainingHashesTracker.endBatch();
                        }
                    });
                    DrainingHashesTracker drainingHashesTracker = this.drainingHashesTracker;
                    Objects.requireNonNull(drainingHashesTracker);
                    consumer.setDrainingHashesConsumerStatsUpdater(drainingHashesTracker::updateConsumerStats);
                    registerDrainingHashes(consumer, (ImpactedConsumersResult) optional.orElseThrow());
                }
            }).exceptionally(th -> {
                internalRemoveConsumer(consumer);
                throw FutureUtil.wrapToCompletionException(th);
            });
        }
        log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.name, consumer);
        consumer.disconnect();
        return CompletableFuture.completedFuture(null);
    }

    private synchronized void registerDrainingHashes(Consumer consumer, ImpactedConsumersResult impactedConsumersResult) {
        impactedConsumersResult.processRemovedHashRanges((consumer2, removedHashRanges) -> {
            if (consumer2 != consumer) {
                consumer2.getPendingAcks().forEach((j, j2, i, i2) -> {
                    if (i2 == 0) {
                        log.warn("[{}] Sticky key hash was missing for {}:{}", new Object[]{getName(), Long.valueOf(j), Long.valueOf(j2)});
                    } else if (removedHashRanges.containsStickyKey(i2)) {
                        this.drainingHashesTracker.addEntry(consumer2, i2);
                    }
                });
            }
        });
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        Optional<ImpactedConsumersResult> removeConsumer = this.selector.removeConsumer(consumer);
        super.removeConsumer(consumer);
        if (this.drainingHashesRequired) {
            registerDrainingHashes(consumer, removeConsumer.orElseThrow());
            this.drainingHashesTracker.consumerRemoved(consumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    public synchronized void clearComponentsAfterRemovedAllConsumers() {
        super.clearComponentsAfterRemovedAllConsumers();
        if (this.drainingHashesRequired) {
            this.drainingHashesTracker.clear();
        }
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected synchronized boolean trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType readType, List<Entry> list) {
        this.lastNumberOfEntriesProcessed = 0;
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        long j4 = 0;
        if (list.size() == 0) {
            return true;
        }
        if (this.consumerSet.isEmpty()) {
            list.forEach((v0) -> {
                v0.release();
            });
            this.cursor.rewind();
            return false;
        }
        if (!this.allowOutOfOrderDelivery) {
            Optional<Position> firstPositionInReplay = getFirstPositionInReplay();
            if (firstPositionInReplay.isPresent()) {
                Position position = firstPositionInReplay.get();
                if (this.minReplayedPosition != null && position.compareTo(this.minReplayedPosition) < 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this read and retry with readMoreEntries.", new Object[]{this.name, position, this.minReplayedPosition, readType});
                    }
                    if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
                        list.forEach(this::addEntryToReplay);
                    } else if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                        list.forEach((v0) -> {
                            v0.release();
                        });
                    }
                    this.skipNextBackoff = true;
                    return true;
                }
            }
        }
        MutableBoolean mutableBoolean = new MutableBoolean();
        Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching = filterAndGroupEntriesForDispatching(list, readType, mutableBoolean);
        AtomicInteger atomicInteger = new AtomicInteger(filterAndGroupEntriesForDispatching.size());
        for (Map.Entry<Consumer, List<Entry>> entry : filterAndGroupEntriesForDispatching.entrySet()) {
            Consumer key = entry.getKey();
            List<Entry> value = entry.getValue();
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} with messages num {}, read type is {}", new Object[]{this.name, key.consumerName(), Integer.valueOf(value.size()), readType});
            }
            if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                for (Entry entry2 : value) {
                    this.redeliveryMessages.remove(entry2.getLedgerId(), entry2.getEntryId());
                }
            }
            SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
            EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(value.size());
            EntryBatchIndexesAcks entryBatchIndexesAcks = EntryBatchIndexesAcks.get(value.size());
            j3 += filterEntriesForConsumer(value, entryBatchSizes, threadLocal, entryBatchIndexesAcks, this.cursor, readType == PersistentDispatcherMultipleConsumers.ReadType.Replay, key);
            j4 += value.size();
            key.sendMessages(value, entryBatchSizes, entryBatchIndexesAcks, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), threadLocal.getTotalChunkedMessages(), getRedeliveryTracker()).addListener(future -> {
                if (future.isDone() && atomicInteger.decrementAndGet() == 0) {
                    readMoreEntriesAsync();
                }
            });
            TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(threadLocal.getTotalMessages() - entryBatchIndexesAcks.getTotalAckedIndexCount()));
            j += threadLocal.getTotalMessages();
            j2 += threadLocal.getTotalBytes();
        }
        this.lastNumberOfEntriesProcessed = (int) j4;
        acquirePermitsForDeliveredMessages(this.topic, this.cursor, j3, j, j2);
        if (!mutableBoolean.booleanValue()) {
            return j3 == 0;
        }
        this.skipNextReplayToTriggerLookAhead = true;
        this.skipNextBackoff = this.cursor.hasMoreEntries();
        return true;
    }

    private boolean handleAddingPendingAck(Consumer consumer, long j, long j2, int i) {
        if (i == 0) {
            log.warn("[{}] Sticky key hash is missing for {}:{}", new Object[]{getName(), Long.valueOf(j), Long.valueOf(j2)});
            IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Sticky key hash is missing for " + j + ":" + illegalArgumentException);
            throw illegalArgumentException;
        }
        DrainingHashesTracker.DrainingHashEntry entry = this.drainingHashesTracker.getEntry(i);
        if (entry != null && entry.getConsumer() != consumer) {
            log.warn("[{}] Another consumer id {} is already draining hash {}. Skipping adding {}:{} to pending acks for consumer {}. Adding the message to replay.", new Object[]{getName(), entry.getConsumer(), Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2), consumer});
            addMessageToReplay(j, j2, i);
            return false;
        }
        if (!log.isDebugEnabled()) {
            return true;
        }
        log.debug("[{}] Adding {}:{} to pending acks for consumer id:{} name:{} with sticky key hash {}", new Object[]{getName(), Long.valueOf(j), Long.valueOf(j2), Long.valueOf(consumer.consumerId()), consumer.consumerName(), Integer.valueOf(i)});
        return true;
    }

    private boolean isReplayQueueSizeBelowLimit() {
        return this.redeliveryMessages.size() < getEffectiveLookAheadLimit();
    }

    private int getEffectiveLookAheadLimit() {
        return getEffectiveLookAheadLimit(this.serviceConfig, this.consumerList.size());
    }

    static int getEffectiveLookAheadLimit(ServiceConfiguration serviceConfiguration, int i) {
        int i2;
        int keySharedLookAheadMsgInReplayThresholdPerConsumer = serviceConfiguration.getKeySharedLookAheadMsgInReplayThresholdPerConsumer();
        int keySharedLookAheadMsgInReplayThresholdPerSubscription = serviceConfiguration.getKeySharedLookAheadMsgInReplayThresholdPerSubscription();
        if (keySharedLookAheadMsgInReplayThresholdPerConsumer <= 0) {
            i2 = keySharedLookAheadMsgInReplayThresholdPerSubscription;
        } else {
            i2 = keySharedLookAheadMsgInReplayThresholdPerConsumer * i;
            if (keySharedLookAheadMsgInReplayThresholdPerSubscription > 0 && keySharedLookAheadMsgInReplayThresholdPerSubscription < i2) {
                i2 = keySharedLookAheadMsgInReplayThresholdPerSubscription;
            }
        }
        if (i2 <= 0) {
            int maxUnackedMessagesPerSubscription = serviceConfiguration.getMaxUnackedMessagesPerSubscription();
            if (maxUnackedMessagesPerSubscription <= 0) {
                maxUnackedMessagesPerSubscription = Integer.MAX_VALUE;
            }
            int maxUnackedMessagesPerConsumer = i * serviceConfiguration.getMaxUnackedMessagesPerConsumer();
            if (maxUnackedMessagesPerConsumer <= 0) {
                maxUnackedMessagesPerConsumer = Integer.MAX_VALUE;
            }
            i2 = Math.min(maxUnackedMessagesPerSubscription, maxUnackedMessagesPerConsumer);
        }
        return i2;
    }

    private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entry> list, PersistentDispatcherMultipleConsumers.ReadType readType, MutableBoolean mutableBoolean) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        boolean isReplayQueueSizeBelowLimit = isReplayQueueSizeBelowLimit();
        HashSet hashSet = (isReplayQueueSizeBelowLimit && readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) ? new HashSet() : null;
        HashSet hashSet2 = isReplayQueueSizeBelowLimit ? new HashSet() : null;
        for (Entry entry : list) {
            EntryAndMetadata create = entry instanceof EntryAndMetadata ? (EntryAndMetadata) entry : EntryAndMetadata.create(entry, Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), getSubscriptionName(), -1L));
            int stickyKeyHash = getStickyKeyHash(create);
            Consumer select = this.selector.select(stickyKeyHash);
            MutableBoolean mutableBoolean2 = null;
            boolean z = false;
            if (select != null) {
                if (isReplayQueueSizeBelowLimit) {
                    hashSet2.add(select);
                }
                mutableBoolean2 = (isReplayQueueSizeBelowLimit && readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) ? new MutableBoolean(false) : null;
                MutableInt mutableInt = (MutableInt) hashMap2.computeIfAbsent(select, consumer -> {
                    return new MutableInt(getAvailablePermits(select));
                });
                if (mutableInt.intValue() > 0 && canDispatchEntry(select, create, readType, stickyKeyHash, mutableBoolean2)) {
                    mutableInt.decrement();
                    z = true;
                }
            }
            if (z) {
                ((List) hashMap.computeIfAbsent(select, consumer2 -> {
                    return new ArrayList();
                })).add(create);
            } else {
                if (mutableBoolean2 != null && mutableBoolean2.isTrue()) {
                    hashSet.add(select);
                }
                addMessageToReplay(create.getLedgerId(), create.getEntryId(), stickyKeyHash);
                create.release();
            }
        }
        if (isReplayQueueSizeBelowLimit && hashMap.isEmpty()) {
            if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
                Iterator it = hashSet.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Consumer consumer3 = (Consumer) it.next();
                    if (!hashMap.containsKey(consumer3) && ((MutableInt) hashMap2.get(consumer3)).intValue() > 0) {
                        mutableBoolean.setTrue();
                        break;
                    }
                }
            }
            if (!mutableBoolean.booleanValue()) {
                Iterator<Consumer> it2 = getConsumers().iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    Consumer next = it2.next();
                    if (!hashSet2.contains(next) && getAvailablePermits(next) > 0) {
                        mutableBoolean.setTrue();
                        break;
                    }
                }
            }
        }
        return hashMap;
    }

    private boolean canDispatchEntry(Consumer consumer, Entry entry, PersistentDispatcherMultipleConsumers.ReadType readType, int i, MutableBoolean mutableBoolean) {
        if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal && this.redeliveryMessages.containsStickyKeyHash(i)) {
            if (mutableBoolean == null) {
                return false;
            }
            mutableBoolean.setTrue();
            return false;
        }
        if (!this.drainingHashesRequired || !this.drainingHashesTracker.shouldBlockStickyKeyHash(consumer, i)) {
            return true;
        }
        if (mutableBoolean == null) {
            return false;
        }
        mutableBoolean.setTrue();
        return false;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected Predicate<Position> createFilterForReplay() {
        return new ReplayPositionFilter();
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected int getStickyKeyHash(Entry entry) {
        if (!(entry instanceof EntryAndMetadata)) {
            return this.selector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
        }
        StickyKeyConsumerSelector stickyKeyConsumerSelector = this.selector;
        Objects.requireNonNull(stickyKeyConsumerSelector);
        return ((EntryAndMetadata) entry).getOrUpdateCachedStickyKeyHash(stickyKeyConsumerSelector::makeStickyKeyHash);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void markDeletePositionMoveForward() {
        reScheduleReadWithKeySharedUnblockingInterval();
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected synchronized boolean canReplayMessages() {
        if (!this.skipNextReplayToTriggerLookAhead) {
            return true;
        }
        this.skipNextReplayToTriggerLookAhead = false;
        return false;
    }

    private int getAvailablePermits(Consumer consumer) {
        if (!consumer.cnx().isActive()) {
            return 0;
        }
        int max = Math.max(consumer.getAvailablePermits(), 0);
        if (max <= 0 || consumer.getMaxUnackedMessages() <= 0) {
            return max;
        }
        int max2 = Math.max(consumer.getMaxUnackedMessages() - consumer.getUnackedMessages(), 0);
        if (max2 == 0) {
            return 0;
        }
        int max3 = Math.max(consumer.getAvgMessagesPerEntry(), 1);
        return Math.min(max, ((max2 + max3) - 1) / max3);
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected boolean doesntHavePendingRead() {
        return (this.havePendingRead || this.havePendingReplayRead) ? false : true;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected boolean isNormalReadAllowed() {
        if (!isReplayQueueSizeBelowLimit()) {
            return false;
        }
        Iterator<Consumer> it = this.consumerList.iterator();
        while (it.hasNext()) {
            Consumer next = it.next();
            if (next != null && !next.isBlocked() && getAvailablePermits(next) > 0) {
                return true;
            }
        }
        return false;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected int getMaxEntriesReadLimit() {
        return Math.max(getEffectiveLookAheadLimit() - this.redeliveryMessages.size(), 1);
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected void handleNormalReadNotAllowed() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Skipping read for the topic since normal read isn't allowed. Rescheduling a read with a backoff.", this.topic.getName(), getSubscriptionName());
        }
        reScheduleReadWithBackoff();
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public CommandSubscribe.SubType getType() {
        return CommandSubscribe.SubType.Key_Shared;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> set) {
        return this.cursor.asyncReplayEntries(set, this, PersistentDispatcherMultipleConsumers.ReadType.Replay, true);
    }

    @Override // org.apache.pulsar.broker.service.StickyKeyDispatcher
    public KeySharedMode getKeySharedMode() {
        return this.keySharedMode;
    }

    @Override // org.apache.pulsar.broker.service.StickyKeyDispatcher
    public boolean isAllowOutOfOrderDelivery() {
        return this.allowOutOfOrderDelivery;
    }

    @Override // org.apache.pulsar.broker.service.StickyKeyDispatcher
    public boolean hasSameKeySharedPolicy(KeySharedMeta keySharedMeta) {
        return keySharedMeta.getKeySharedMode() == this.keySharedMode && keySharedMeta.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery;
    }

    @Override // org.apache.pulsar.broker.service.StickyKeyDispatcher
    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
        return this.selector.getConsumerKeyHashRanges();
    }

    public DrainingHashesTracker getDrainingHashesTracker() {
        return this.drainingHashesTracker;
    }
}
