package org.apache.pulsar.broker.service.persistent;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.MoreObjects;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.glassfish.hk2.utilities.BuilderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentSubscription.class */
public class PersistentSubscription implements Subscription {
    protected final PersistentTopic topic;
    protected final ManagedCursor cursor;
    protected volatile Dispatcher dispatcher;
    protected final String topicName;
    protected final String subName;
    protected final String fullName;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private PersistentMessageExpiryMonitor expiryMonitor;
    private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
    private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";
    private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
    private volatile Position lastMarkDeleteForTransactionMarker;
    private final PendingAckHandle pendingAckHandle;
    private static final Logger log;
    private static final AtomicIntegerFieldUpdater<PersistentSubscription> IS_FENCED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentSubscription.class, "isFenced");
    private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap();
    private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
    private volatile int isFenced = 0;
    private long lastExpireTimestamp = 0;
    private long lastConsumedFlowTimestamp = 0;
    private long lastMarkDeleteAdvancedTimestamp = 0;
    private volatile boolean isDeleteTransactionMarkerInProcess = false;
    private final AsyncCallbacks.MarkDeleteCallback markDeleteCallback = new AsyncCallbacks.MarkDeleteCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.2
        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback
        public void markDeleteComplete(Object obj) {
            PositionImpl positionImpl = (PositionImpl) obj;
            PositionImpl positionImpl2 = (PositionImpl) PersistentSubscription.this.cursor.getMarkDeletedPosition();
            if (PersistentSubscription.log.isDebugEnabled()) {
                PersistentSubscription.log.debug("[{}][{}] Mark deleted messages to position {} from position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, positionImpl2, positionImpl});
            }
            PersistentSubscription.this.notifyTheMarkDeletePositionMoveForwardIfNeeded(positionImpl);
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback
        public void markDeleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
            if (PersistentSubscription.log.isDebugEnabled()) {
                PersistentSubscription.log.debug("[{}][{}] Failed to mark delete for position {}: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, obj, managedLedgerException});
            }
        }
    };
    private final AsyncCallbacks.DeleteCallback deleteCallback = new AsyncCallbacks.DeleteCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.3
        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback
        public void deleteComplete(Object obj) {
            if (PersistentSubscription.log.isDebugEnabled()) {
                PersistentSubscription.log.debug("[{}][{}] Deleted message at {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, obj});
            }
            PersistentSubscription.this.notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) obj);
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback
        public void deleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
            PersistentSubscription.log.warn("[{}][{}] Failed to delete message at {}: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, obj, managedLedgerException});
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Map<String, Long> getBaseCursorProperties(boolean z) {
        return z ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isCursorFromReplicatedSubscription(ManagedCursor managedCursor) {
        return managedCursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
    }

    public PersistentSubscription(PersistentTopic persistentTopic, String str, ManagedCursor managedCursor, boolean z) {
        this.topic = persistentTopic;
        this.cursor = managedCursor;
        this.topicName = persistentTopic.getName();
        this.subName = str;
        this.fullName = MoreObjects.toStringHelper(this).add("topic", this.topicName).add(BuilderHelper.NAME_KEY, this.subName).toString();
        this.expiryMonitor = new PersistentMessageExpiryMonitor(this.topicName, str, managedCursor, this);
        setReplicated(z);
        if (persistentTopic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
            this.pendingAckHandle = new PendingAckHandleImpl(this);
        } else {
            this.pendingAckHandle = new PendingAckHandleDisabled();
        }
        IS_FENCED_UPDATER.set(this, 0);
    }

    public void updateLastMarkDeleteAdvancedTimestamp() {
        this.lastMarkDeleteAdvancedTimestamp = Math.max(this.lastMarkDeleteAdvancedTimestamp, System.currentTimeMillis());
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public BrokerInterceptor interceptor() {
        return this.topic.getBrokerService().getInterceptor();
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public String getName() {
        return this.subName;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public Topic getTopic() {
        return this.topic;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public boolean isReplicated() {
        return this.replicatedSubscriptionSnapshotCache != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setReplicated(boolean z) {
        this.replicatedSubscriptionSnapshotCache = z ? new ReplicatedSubscriptionSnapshotCache(this.subName, this.topic.getBrokerService().pulsar().getConfiguration().getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()) : null;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        this.cursor.updateLastActive();
        if (IS_FENCED_UPDATER.get(this) == 1) {
            log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
            throw new BrokerServiceException.SubscriptionFencedException("Subscription is fenced");
        }
        if (this.dispatcher == null || !this.dispatcher.isConsumerConnected()) {
            Dispatcher dispatcher = null;
            switch (consumer.subType()) {
                case Exclusive:
                    if (this.dispatcher == null || this.dispatcher.getType() != PulsarApi.CommandSubscribe.SubType.Exclusive) {
                        dispatcher = this.dispatcher;
                        this.dispatcher = new PersistentDispatcherSingleActiveConsumer(this.cursor, PulsarApi.CommandSubscribe.SubType.Exclusive, 0, this.topic, this);
                        break;
                    }
                    break;
                case Shared:
                    if (this.dispatcher == null || this.dispatcher.getType() != PulsarApi.CommandSubscribe.SubType.Shared) {
                        dispatcher = this.dispatcher;
                        this.dispatcher = new PersistentDispatcherMultipleConsumers(this.topic, this.cursor, this);
                        break;
                    }
                    break;
                case Failover:
                    int partitionIndex = TopicName.getPartitionIndex(this.topicName);
                    if (partitionIndex < 0) {
                        partitionIndex = -1;
                    }
                    if (this.dispatcher == null || this.dispatcher.getType() != PulsarApi.CommandSubscribe.SubType.Failover) {
                        dispatcher = this.dispatcher;
                        this.dispatcher = new PersistentDispatcherSingleActiveConsumer(this.cursor, PulsarApi.CommandSubscribe.SubType.Failover, partitionIndex, this.topic, this);
                        break;
                    }
                    break;
                case Key_Shared:
                    PulsarApi.KeySharedMeta keySharedMeta = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta() : PulsarApi.KeySharedMeta.getDefaultInstance();
                    PulsarApi.KeySharedMode keySharedMode = keySharedMeta.getKeySharedMode();
                    if (this.dispatcher == null || this.dispatcher.getType() != PulsarApi.CommandSubscribe.SubType.Key_Shared || ((PersistentStickyKeyDispatcherMultipleConsumers) this.dispatcher).getKeySharedMode() != keySharedMode) {
                        dispatcher = this.dispatcher;
                        this.dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(this.topic, this.cursor, this, this.topic.getBrokerService().getPulsar().getConfiguration(), keySharedMeta);
                        break;
                    }
                    break;
                default:
                    throw new BrokerServiceException.ServerMetadataException("Unsupported subscription type");
            }
            if (dispatcher != null) {
                dispatcher.close().thenRun(() -> {
                    log.info("[{}][{}] Successfully closed previous dispatcher", this.topicName, this.subName);
                }).exceptionally(th -> {
                    log.error("[{}][{}] Failed to close previous dispatcher", new Object[]{this.topicName, this.subName, th});
                    return null;
                });
            }
        } else if (consumer.subType() != this.dispatcher.getType()) {
            throw new BrokerServiceException.SubscriptionBusyException("Subscription is of different type");
        }
        this.dispatcher.addConsumer(consumer);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public synchronized void removeConsumer(Consumer consumer, boolean z) throws BrokerServiceException {
        this.cursor.updateLastActive();
        if (this.dispatcher != null) {
            this.dispatcher.removeConsumer(consumer);
        }
        if (this.dispatcher.getConsumers().isEmpty()) {
            deactivateCursor();
            if (!this.cursor.isDurable()) {
                close().thenRun(() -> {
                    synchronized (this) {
                        if (this.dispatcher != null) {
                            this.dispatcher.close().thenRun(() -> {
                                log.info("[{}][{}] Successfully closed dispatcher for reader", this.topicName, this.subName);
                            }).exceptionally(th -> {
                                log.error("[{}][{}] Failed to close dispatcher for reader", new Object[]{this.topicName, this.subName, th});
                                return null;
                            });
                        }
                    }
                }).exceptionally(th -> {
                    log.error("[{}][{}] Failed to close subscription for reader", new Object[]{this.topicName, this.subName, th});
                    return null;
                });
                this.topic.getBrokerService().pulsar().getExecutor().submit(() -> {
                    this.topic.removeSubscription(this.subName);
                    if (z) {
                        return;
                    }
                    try {
                        this.topic.getManagedLedger().deleteCursor(this.cursor.getName());
                    } catch (InterruptedException | ManagedLedgerException e) {
                        log.warn("[{}] [{}] Failed to remove non durable cursor", new Object[]{this.topic.getName(), this.subName, e});
                    }
                });
            }
        }
        PersistentTopic.USAGE_COUNT_UPDATER.decrementAndGet(this.topic);
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Removed consumer -- count: {}", new Object[]{this.topic.getName(), this.subName, consumer.consumerName(), Long.valueOf(PersistentTopic.USAGE_COUNT_UPDATER.get(this.topic))});
        }
    }

    public void deactivateCursor() {
        this.cursor.setInactive();
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void consumerFlow(Consumer consumer, int i) {
        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
        this.dispatcher.consumerFlow(consumer, i);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void acknowledgeMessage(List<Position> list, PulsarApi.CommandAck.AckType ackType, Map<String, Long> map) {
        PulsarMarkers.ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition;
        Position markDeletedPosition = this.cursor.getMarkDeletedPosition();
        if (ackType != PulsarApi.CommandAck.AckType.Cumulative) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Individual acks on {}", new Object[]{this.topicName, this.subName, list});
            }
            this.cursor.asyncDelete(list, this.deleteCallback, markDeletedPosition);
            if (this.topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
                list.forEach(position -> {
                    if (((ManagedCursorImpl) this.cursor).isMessageDeleted(position)) {
                        this.pendingAckHandle.clearIndividualPosition(position);
                    }
                });
            }
            if (this.dispatcher != null) {
                this.dispatcher.getRedeliveryTracker().removeBatch(list);
            }
        } else {
            if (list.size() != 1) {
                log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids.", this.topicName, this.subName);
                return;
            }
            Position position2 = list.get(0);
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Cumulative ack on {}", new Object[]{this.topicName, this.subName, position2});
            }
            this.cursor.asyncMarkDelete(position2, mergeCursorProperties(map), this.markDeleteCallback, markDeletedPosition);
        }
        if (!this.cursor.getMarkDeletedPosition().equals(markDeletedPosition)) {
            updateLastMarkDeleteAdvancedTimestamp();
            ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache = this.replicatedSubscriptionSnapshotCache;
            if (replicatedSubscriptionSnapshotCache != null && (advancedMarkDeletePosition = replicatedSubscriptionSnapshotCache.advancedMarkDeletePosition((PositionImpl) this.cursor.getMarkDeletedPosition())) != null) {
                this.topic.getReplicatedSubscriptionController().ifPresent(replicatedSubscriptionsController -> {
                    replicatedSubscriptionsController.localSubscriptionUpdated(this.subName, advancedMarkDeletePosition);
                });
            }
        }
        if (this.topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
            Position markDeletedPosition2 = this.cursor.getMarkDeletedPosition();
            if ((this.lastMarkDeleteForTransactionMarker == null || ((PositionImpl) this.lastMarkDeleteForTransactionMarker).compareTo((PositionImpl) markDeletedPosition2) < 0) && !this.isDeleteTransactionMarkerInProcess) {
                this.isDeleteTransactionMarkerInProcess = true;
                deleteTransactionMarker((PositionImpl) markDeletedPosition2, ackType, map);
            }
        }
        if (this.topic.getManagedLedger().isTerminated() && this.cursor.getNumberOfEntriesInBacklog(false) == 0 && this.dispatcher != null) {
            this.dispatcher.getConsumers().forEach((v0) -> {
                v0.reachedEndOfTopic();
            });
        }
    }

    private void deleteTransactionMarker(final PositionImpl positionImpl, final PulsarApi.CommandAck.AckType ackType, final Map<String, Long> map) {
        if (positionImpl == null) {
            this.isDeleteTransactionMarkerInProcess = false;
            return;
        }
        ManagedLedgerImpl managedLedgerImpl = (ManagedLedgerImpl) this.cursor.getManagedLedger();
        final PositionImpl nextValidPosition = managedLedgerImpl.getNextValidPosition(positionImpl);
        managedLedgerImpl.asyncReadEntry(nextValidPosition, new AsyncCallbacks.ReadEntryCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.1
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
            public void readEntryComplete(Entry entry, Object obj) {
                try {
                    PulsarApi.MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(entry.getDataBuffer());
                    PersistentSubscription.this.isDeleteTransactionMarkerInProcess = false;
                    if (Markers.isTxnCommitMarker(parseMessageMetadata) || Markers.isTxnAbortMarker(parseMessageMetadata)) {
                        PersistentSubscription.this.lastMarkDeleteForTransactionMarker = positionImpl;
                        parseMessageMetadata.recycle();
                        PersistentSubscription.this.acknowledgeMessage(Collections.singletonList(nextValidPosition), ackType, map);
                    }
                } finally {
                    entry.release();
                }
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
            public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentSubscription.this.isDeleteTransactionMarkerInProcess = false;
                if (PersistentSubscription.log.isDebugEnabled()) {
                    PersistentSubscription.log.debug("Fail to read transaction marker! Position : {}", positionImpl, managedLedgerException);
                }
            }
        }, null);
    }

    public CompletableFuture<Void> transactionIndividualAcknowledge(TxnID txnID, List<MutablePair<PositionImpl, Integer>> list) {
        return this.pendingAckHandle.individualAcknowledgeMessage(txnID, list);
    }

    public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID txnID, List<PositionImpl> list) {
        return this.pendingAckHandle.cumulativeAcknowledgeMessage(txnID, list);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position position) {
        PositionImpl positionImpl = (PositionImpl) position;
        PositionImpl positionImpl2 = (PositionImpl) this.cursor.getMarkDeletedPosition();
        if (this.dispatcher == null || positionImpl2.compareTo(positionImpl) <= 0) {
            return;
        }
        this.dispatcher.markDeletePositionMoveForward();
    }

    public String toString() {
        return this.fullName;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public String getTopicName() {
        return this.topicName;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public PulsarApi.CommandSubscribe.SubType getType() {
        if (this.dispatcher != null) {
            return this.dispatcher.getType();
        }
        return null;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public String getTypeString() {
        PulsarApi.CommandSubscribe.SubType type = getType();
        if (type == null) {
            return "None";
        }
        switch (type) {
            case Exclusive:
                return "Exclusive";
            case Shared:
                return "Shared";
            case Failover:
                return "Failover";
            case Key_Shared:
                return "Key_Shared";
            default:
                return "Null";
        }
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> clearBacklog() {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Backlog size before clearing: {}", new Object[]{this.topicName, this.subName, Long.valueOf(this.cursor.getNumberOfEntriesInBacklog(false))});
        }
        this.cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.4
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback
            public void clearBacklogComplete(Object obj) {
                if (PersistentSubscription.log.isDebugEnabled()) {
                    PersistentSubscription.log.debug("[{}][{}] Backlog size after clearing: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Long.valueOf(PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog(false))});
                }
                if (PersistentSubscription.this.dispatcher != null) {
                    PersistentSubscription.this.dispatcher.clearDelayedMessages();
                }
                completableFuture.complete(null);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback
            public void clearBacklogFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentSubscription.log.error("[{}][{}] Failed to clear backlog", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, managedLedgerException});
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, null);
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> skipMessages(final int i) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Skipping {} messages, current backlog {}", new Object[]{this.topicName, this.subName, Integer.valueOf(i), Long.valueOf(this.cursor.getNumberOfEntriesInBacklog(false))});
        }
        this.cursor.asyncSkipEntries(i, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.5
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback
            public void skipEntriesComplete(Object obj) {
                if (PersistentSubscription.log.isDebugEnabled()) {
                    PersistentSubscription.log.debug("[{}][{}] Skipped {} messages, new backlog {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Integer.valueOf(i), Long.valueOf(PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog(false))});
                }
                completableFuture.complete(null);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback
            public void skipEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentSubscription.log.error("[{}][{}] Failed to skip {} messages", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Integer.valueOf(i), managedLedgerException});
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, null);
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> resetCursor(final long j) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(this.topicName, this.cursor);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Resetting subscription to timestamp {}", new Object[]{this.topicName, this.subName, Long.valueOf(j)});
        }
        persistentMessageFinder.findMessages(j, new AsyncCallbacks.FindEntryCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.6
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback
            public void findEntryComplete(Position position, Object obj) {
                Position next;
                if (position == null) {
                    next = PersistentSubscription.this.cursor.getFirstPosition();
                    if (next == null) {
                        PersistentSubscription.log.warn("[{}][{}] Unable to find position for timestamp {}. Unable to reset cursor to first position", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Long.valueOf(j)});
                        completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition("Unable to find position for specified timestamp"));
                        return;
                    }
                    PersistentSubscription.log.info("[{}][{}] Unable to find position for timestamp {}. Resetting cursor to first position {} in ledger", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Long.valueOf(j), next});
                } else {
                    next = position.getNext();
                }
                PersistentSubscription.this.resetCursor(next, completableFuture);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback
            public void findEntryFailed(ManagedLedgerException managedLedgerException, Optional<Position> optional, Object obj) {
                if (managedLedgerException instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                    completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(managedLedgerException.getMessage()));
                } else {
                    completableFuture.completeExceptionally(new BrokerServiceException(managedLedgerException));
                }
            }
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> resetCursor(Position position) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        resetCursor(position, completableFuture);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetCursor(Position position, CompletableFuture<Void> completableFuture) {
        CompletableFuture<Void> completedFuture;
        if (!IS_FENCED_UPDATER.compareAndSet(this, 0, 1)) {
            completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to fence subscription"));
            return;
        }
        synchronized (this) {
            completedFuture = (this.dispatcher == null || !this.dispatcher.isConsumerConnected()) ? CompletableFuture.completedFuture(null) : this.dispatcher.disconnectActiveConsumers(true);
        }
        completedFuture.whenComplete((r11, th) -> {
            if (this.dispatcher != null) {
                this.dispatcher.resetCloseFuture();
            }
            if (th != null) {
                log.error("[{}][{}] Failed to disconnect consumer from subscription", new Object[]{this.topicName, this.subName, th});
                IS_FENCED_UPDATER.set(this, 0);
                completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to disconnect consumers from subscription"));
                return;
            }
            log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset", this.topicName, this.subName);
            try {
                this.cursor.asyncResetCursor(position, new AsyncCallbacks.ResetCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.7
                    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ResetCursorCallback
                    public void resetComplete(Object obj) {
                        if (PersistentSubscription.log.isDebugEnabled()) {
                            PersistentSubscription.log.debug("[{}][{}] Successfully reset subscription to position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, position});
                        }
                        if (PersistentSubscription.this.dispatcher != null) {
                            PersistentSubscription.this.dispatcher.cursorIsReset();
                        }
                        PersistentSubscription.IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        completableFuture.complete(null);
                    }

                    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ResetCursorCallback
                    public void resetFailed(ManagedLedgerException managedLedgerException, Object obj) {
                        PersistentSubscription.log.error("[{}][{}] Failed to reset subscription to position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, position, managedLedgerException});
                        PersistentSubscription.IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        if (managedLedgerException instanceof ManagedLedgerException.InvalidCursorPositionException) {
                            completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition(managedLedgerException.getMessage()));
                        } else if (managedLedgerException instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                            completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(managedLedgerException.getMessage()));
                        } else {
                            completableFuture.completeExceptionally(new BrokerServiceException(managedLedgerException));
                        }
                    }
                });
            } catch (Exception e) {
                log.error("[{}][{}] Error while resetting cursor", new Object[]{this.topicName, this.subName, e});
                IS_FENCED_UPDATER.set(this, 0);
                completableFuture.completeExceptionally(new BrokerServiceException(e));
            }
        });
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Entry> peekNthMessage(int i) {
        final CompletableFuture<Entry> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Getting message at position {}", new Object[]{this.topicName, this.subName, Integer.valueOf(i)});
        }
        this.cursor.asyncGetNthEntry(i, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.8
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
            public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                completableFuture.completeExceptionally(managedLedgerException);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
            public void readEntryComplete(Entry entry, Object obj) {
                completableFuture.complete(entry);
            }
        }, null);
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public long getNumberOfEntriesInBacklog(boolean z) {
        return this.cursor.getNumberOfEntriesInBacklog(z);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public synchronized Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    public long getNumberOfEntriesSinceFirstNotAckedMessage() {
        return this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
    }

    public int getTotalNonContiguousDeletedMessagesRange() {
        return this.cursor.getTotalNonContiguousDeletedMessagesRange();
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> close() {
        synchronized (this) {
            if (this.dispatcher != null && this.dispatcher.isConsumerConnected()) {
                return FutureUtil.failedFuture(new BrokerServiceException.SubscriptionBusyException("Subscription has active consumers"));
            }
            IS_FENCED_UPDATER.set(this, 1);
            log.info("[{}][{}] Successfully closed subscription [{}]", new Object[]{this.topicName, this.subName, this.cursor});
            return CompletableFuture.completedFuture(null);
        }
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public synchronized CompletableFuture<Void> disconnect() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        IS_FENCED_UPDATER.set(this, 1);
        (this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenCompose(obj -> {
            return close();
        }).thenRun(() -> {
            log.info("[{}][{}] Successfully disconnected and closed subscription", this.topicName, this.subName);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            IS_FENCED_UPDATER.set(this, 0);
            if (this.dispatcher != null) {
                this.dispatcher.reset();
            }
            log.error("[{}][{}] Error disconnecting consumers from subscription", new Object[]{this.topicName, this.subName, th});
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> delete() {
        return delete(false);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> deleteForcefully() {
        return delete(true);
    }

    private CompletableFuture<Void> delete(boolean z) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        log.info("[{}][{}] Unsubscribing", this.topicName, this.subName);
        CompletableFuture completableFuture2 = new CompletableFuture();
        if (z) {
            disconnect().thenRun(() -> {
                completableFuture2.complete(null);
            }).exceptionally(th -> {
                log.error("[{}][{}] Error disconnecting and closing subscription", new Object[]{this.topicName, this.subName, th});
                completableFuture2.completeExceptionally(th);
                return null;
            });
        } else {
            close().thenRun(() -> {
                completableFuture2.complete(null);
            }).exceptionally(th2 -> {
                log.error("[{}][{}] Error closing subscription", new Object[]{this.topicName, this.subName, th2});
                completableFuture2.completeExceptionally(th2);
                return null;
            });
        }
        completableFuture2.thenCompose(r4 -> {
            return this.topic.unsubscribe(this.subName);
        }).thenAccept(r6 -> {
            synchronized (this) {
                (this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
                    log.info("[{}][{}] Successfully deleted subscription", this.topicName, this.subName);
                    completableFuture.complete(null);
                }).exceptionally(th3 -> {
                    IS_FENCED_UPDATER.set(this, 0);
                    if (this.dispatcher != null) {
                        this.dispatcher.reset();
                    }
                    log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, th3});
                    completableFuture.completeExceptionally(th3);
                    return null;
                });
            }
        }).exceptionally(th3 -> {
            IS_FENCED_UPDATER.set(this, 0);
            log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, th3});
            completableFuture.completeExceptionally(th3);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
        } catch (BrokerServiceException e) {
            log.warn("Error removing consumer {}", consumer);
            completableFuture.completeExceptionally(e);
        }
        if (this.dispatcher.canUnsubscribe(consumer)) {
            consumer.close();
            return delete();
        }
        completableFuture.completeExceptionally(new BrokerServiceException.ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe"));
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public List<Consumer> getConsumers() {
        Dispatcher dispatcher = this.dispatcher;
        return dispatcher != null ? dispatcher.getConsumers() : Collections.emptyList();
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public boolean expireMessages(int i) {
        if (getNumberOfEntriesInBacklog(false) == 0) {
            return false;
        }
        if (this.dispatcher != null && this.dispatcher.isConsumerConnected() && getNumberOfEntriesInBacklog(false) < 1000 && !this.topic.isOldestMessageExpired(this.cursor, i)) {
            return false;
        }
        this.lastExpireTimestamp = System.currentTimeMillis();
        return this.expiryMonitor.expireMessages(i);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public boolean expireMessages(Position position) {
        this.lastExpireTimestamp = System.currentTimeMillis();
        return this.expiryMonitor.expireMessages(position);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public double getExpiredMessageRate() {
        return this.expiryMonitor.getMessageExpiryRate();
    }

    public long estimateBacklogSize() {
        return this.cursor.getEstimatedSizeSinceMarkDeletePosition();
    }

    public SubscriptionStats getStats(Boolean bool, boolean z) {
        LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;
        Consumer activeConsumer;
        SubscriptionStats subscriptionStats = new SubscriptionStats();
        subscriptionStats.lastExpireTimestamp = this.lastExpireTimestamp;
        subscriptionStats.lastConsumedFlowTimestamp = this.lastConsumedFlowTimestamp;
        subscriptionStats.lastMarkDeleteAdvancedTimestamp = this.lastMarkDeleteAdvancedTimestamp;
        Dispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            Map<String, List<String>> consumerKeyHashRanges = getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared ? ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getConsumerKeyHashRanges() : null;
            dispatcher.getConsumers().forEach(consumer -> {
                ConsumerStats stats = consumer.getStats();
                subscriptionStats.consumers.add(stats);
                subscriptionStats.msgRateOut += stats.msgRateOut;
                subscriptionStats.msgThroughputOut += stats.msgThroughputOut;
                subscriptionStats.bytesOutCounter += stats.bytesOutCounter;
                subscriptionStats.msgOutCounter += stats.msgOutCounter;
                subscriptionStats.msgRateRedeliver += stats.msgRateRedeliver;
                subscriptionStats.chuckedMessageRate = (int) (subscriptionStats.chuckedMessageRate + stats.chuckedMessageRate);
                subscriptionStats.unackedMessages += stats.unackedMessages;
                subscriptionStats.lastConsumedTimestamp = Math.max(subscriptionStats.lastConsumedTimestamp, stats.lastConsumedTimestamp);
                subscriptionStats.lastAckedTimestamp = Math.max(subscriptionStats.lastAckedTimestamp, stats.lastAckedTimestamp);
                if (consumerKeyHashRanges == null || !consumerKeyHashRanges.containsKey(consumer.consumerName())) {
                    return;
                }
                stats.keyHashRanges = (List) consumerKeyHashRanges.get(consumer.consumerName());
            });
        }
        subscriptionStats.type = getType();
        if ((dispatcher instanceof PersistentDispatcherSingleActiveConsumer) && (activeConsumer = ((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer()) != null) {
            subscriptionStats.activeConsumerName = activeConsumer.consumerName();
        }
        if (Subscription.isIndividualAckMode(subscriptionStats.type) && (dispatcher instanceof PersistentDispatcherMultipleConsumers)) {
            PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = (PersistentDispatcherMultipleConsumers) dispatcher;
            subscriptionStats.unackedMessages = persistentDispatcherMultipleConsumers.getTotalUnackedMessages();
            subscriptionStats.blockedSubscriptionOnUnackedMsgs = persistentDispatcherMultipleConsumers.isBlockedDispatcherOnUnackedMsgs();
            subscriptionStats.msgDelayed = persistentDispatcherMultipleConsumers.getNumberOfDelayedMessages();
        }
        subscriptionStats.msgBacklog = getNumberOfEntriesInBacklog(bool.booleanValue());
        if (z) {
            subscriptionStats.backlogSize = ((ManagedLedgerImpl) this.topic.getManagedLedger()).getEstimatedBacklogSize((PositionImpl) this.cursor.getMarkDeletedPosition());
        }
        subscriptionStats.msgBacklogNoDelayed = subscriptionStats.msgBacklog - subscriptionStats.msgDelayed;
        subscriptionStats.msgRateExpired = this.expiryMonitor.getMessageExpiryRate();
        subscriptionStats.totalMsgExpired = this.expiryMonitor.getTotalMessageExpired();
        subscriptionStats.isReplicated = isReplicated();
        subscriptionStats.isDurable = this.cursor.isDurable();
        if (getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared && (dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) && (recentlyJoinedConsumers = ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getRecentlyJoinedConsumers()) != null && recentlyJoinedConsumers.size() > 0) {
            recentlyJoinedConsumers.forEach((consumer2, positionImpl) -> {
                subscriptionStats.consumersAfterMarkDeletePosition.put(consumer2.consumerName(), positionImpl.toString());
            });
        }
        subscriptionStats.nonContiguousDeletedMessagesRanges = this.cursor.getTotalNonContiguousDeletedMessagesRange();
        subscriptionStats.nonContiguousDeletedMessagesRangesSerializedSize = this.cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
        return subscriptionStats;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void redeliverUnacknowledgedMessages(Consumer consumer) {
        this.dispatcher.redeliverUnacknowledgedMessages(consumer);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> list) {
        this.dispatcher.redeliverUnacknowledgedMessages(consumer, list);
    }

    private void trimByMarkDeletePosition(List<PositionImpl> list) {
        list.removeIf(positionImpl -> {
            return this.cursor.getMarkDeletedPosition() != null && positionImpl.compareTo((PositionImpl) this.cursor.getMarkDeletedPosition()) <= 0;
        });
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void addUnAckedMessages(int i) {
        this.dispatcher.addUnAckedMessages(i);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public synchronized long getNumberOfEntriesDelayed() {
        if (this.dispatcher != null) {
            return this.dispatcher.getNumberOfDelayedMessages();
        }
        return 0L;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void markTopicWithBatchMessagePublished() {
        this.topic.markBatchMessagePublished();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void topicTerminated() {
        if (this.cursor.getNumberOfEntriesInBacklog(false) != 0 || null == this.dispatcher) {
            return;
        }
        this.dispatcher.getConsumers().forEach((v0) -> {
            v0.reachedEndOfTopic();
        });
    }

    protected Map<String, Long> mergeCursorProperties(Map<String, Long> map) {
        Map<String, Long> map2 = isReplicated() ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
        if (map.isEmpty()) {
            return map2;
        }
        TreeMap treeMap = new TreeMap();
        treeMap.putAll(map);
        treeMap.putAll(map2);
        return treeMap;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void processReplicatedSubscriptionSnapshot(PulsarMarkers.ReplicatedSubscriptionsSnapshot replicatedSubscriptionsSnapshot) {
        ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache = this.replicatedSubscriptionSnapshotCache;
        if (replicatedSubscriptionSnapshotCache != null) {
            replicatedSubscriptionSnapshotCache.addNewSnapshot(replicatedSubscriptionsSnapshot);
        }
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> endTxn(long j, long j2, int i) {
        TxnID txnID = new TxnID(j, j2);
        if (PulsarApi.TxnAction.COMMIT.getNumber() == i) {
            return this.pendingAckHandle.commitTxn(txnID, Collections.emptyMap());
        }
        if (PulsarApi.TxnAction.ABORT.getNumber() != i) {
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Unsupported txnAction " + i));
        }
        Consumer consumer = null;
        if (getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
            consumer = ((PersistentDispatcherSingleActiveConsumer) getDispatcher()).getActiveConsumer();
        }
        return this.pendingAckHandle.abortTxn(txnID, consumer);
    }

    @VisibleForTesting
    public ManagedCursor getCursor() {
        return this.cursor;
    }

    public void syncBatchPositionBitSetForPendingAck(PositionImpl positionImpl) {
        this.pendingAckHandle.syncBatchPositionAckSetForTransaction(positionImpl);
    }

    public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl positionImpl) {
        return this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(positionImpl);
    }

    static {
        REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
        log = LoggerFactory.getLogger(PersistentSubscription.class);
    }
}
