package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterPolicies;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/Consumer.class */
public class Consumer {
    private final Subscription subscription;
    private final CommandSubscribe.SubType subType;
    private final TransportCnx cnx;
    private final String appId;
    private final String topicName;
    private final int partitionIdx;
    private final long consumerId;
    private final int priorityLevel;
    private final boolean readCompacted;
    private final String consumerName;
    private final Rate msgOut;
    private final Rate msgRedeliver;
    private final LongAdder msgOutCounter;
    private final LongAdder msgRedeliverCounter;
    private final LongAdder bytesOutCounter;
    private final LongAdder messageAckCounter;
    private final Rate messageAckRate;
    private volatile long lastConsumedTimestamp;
    private volatile long lastAckedTimestamp;
    private volatile long lastConsumedFlowTimestamp;
    private Rate chunkedMessageRate;
    private volatile int messagePermits;
    private volatile int permitsReceivedWhileConsumerBlocked;
    private final ConcurrentLongLongPairHashMap pendingAcks;
    private final ConsumerStatsImpl stats;
    private final boolean isDurable;
    private final boolean isPersistentTopic;
    private volatile int unackedMessages;
    private volatile boolean blockedConsumerOnUnackedMsgs;
    private final Map<String, String> metadata;
    private final KeySharedMeta keySharedMeta;
    private final AtomicDouble avgMessagesPerEntry;
    private static final double avgPercent = 0.9d;
    private boolean preciseDispatcherFlowControl;
    private PositionImpl readPositionWhenJoining;
    private final String clientAddress;
    private final MessageId startMessageId;
    private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
    private volatile long consumerEpoch;
    private long negativeUnackedMsgsTimestamp;
    private final SchemaType schemaType;
    private final Instant connectedSince;
    private volatile Attributes openTelemetryAttributes;
    private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
    private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
    private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
    private static final long[] EMPTY_ACK_SET = new long[0];
    private static final AtomicReferenceFieldUpdater<Consumer, Attributes> OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Consumer.class, Attributes.class, "openTelemetryAttributes");
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public Consumer(Subscription subscription, CommandSubscribe.SubType subType, String str, long j, int i, String str2, boolean z, TransportCnx transportCnx, String str3, Map<String, String> map, boolean z2, KeySharedMeta keySharedMeta, MessageId messageId, long j2) {
        this(subscription, subType, str, j, i, str2, z, transportCnx, str3, map, z2, keySharedMeta, messageId, j2, null);
    }

    public Consumer(Subscription subscription, CommandSubscribe.SubType subType, String str, long j, int i, String str2, boolean z, TransportCnx transportCnx, String str3, Map<String, String> map, boolean z2, KeySharedMeta keySharedMeta, MessageId messageId, long j2, SchemaType schemaType) {
        this.messagePermits = 0;
        this.permitsReceivedWhileConsumerBlocked = 0;
        this.unackedMessages = 0;
        this.blockedConsumerOnUnackedMsgs = false;
        this.avgMessagesPerEntry = new AtomicDouble(0.0d);
        this.connectedSince = Instant.now();
        this.subscription = subscription;
        this.subType = subType;
        this.topicName = str;
        this.partitionIdx = TopicName.getPartitionIndex(str);
        this.consumerId = j;
        this.priorityLevel = i;
        this.readCompacted = z2;
        this.consumerName = str2;
        this.isDurable = z;
        this.isPersistentTopic = subscription.getTopic() instanceof PersistentTopic;
        this.keySharedMeta = keySharedMeta;
        this.cnx = transportCnx;
        this.msgOut = new Rate();
        this.chunkedMessageRate = new Rate();
        this.msgRedeliver = new Rate();
        this.msgRedeliverCounter = new LongAdder();
        this.bytesOutCounter = new LongAdder();
        this.msgOutCounter = new LongAdder();
        this.messageAckCounter = new LongAdder();
        this.messageAckRate = new Rate();
        this.appId = str3;
        this.startMessageId = (z2 && messageId == null) ? MessageId.earliest : messageId;
        this.preciseDispatcherFlowControl = transportCnx.isPreciseDispatcherFlowControl();
        PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
        MESSAGE_PERMITS_UPDATER.set(this, 0);
        UNACKED_MESSAGES_UPDATER.set(this, 0);
        this.metadata = map != null ? map : Collections.emptyMap();
        this.stats = new ConsumerStatsImpl();
        this.stats.setAddress(transportCnx.clientSourceAddressAndPort());
        this.stats.consumerName = str2;
        this.stats.setConnectedSince(DateFormatter.format(this.connectedSince));
        this.stats.setClientVersion(transportCnx.getClientVersion());
        this.stats.metadata = this.metadata;
        if (Subscription.isIndividualAckMode(subType)) {
            this.pendingAcks = ConcurrentLongLongPairHashMap.newBuilder().autoShrink(subscription.getTopic().getBrokerService().getPulsar().getConfiguration().isAutoShrinkForConsumerPendingAcksMap()).expectedItems(256).concurrencyLevel(1).build();
        } else {
            this.pendingAcks = null;
        }
        this.clientAddress = transportCnx.clientSourceAddress();
        this.consumerEpoch = j2;
        this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService().getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
        this.schemaType = schemaType;
        OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
    }

    @VisibleForTesting
    Consumer(String str, int i) {
        this.messagePermits = 0;
        this.permitsReceivedWhileConsumerBlocked = 0;
        this.unackedMessages = 0;
        this.blockedConsumerOnUnackedMsgs = false;
        this.avgMessagesPerEntry = new AtomicDouble(0.0d);
        this.connectedSince = Instant.now();
        this.subscription = null;
        this.subType = null;
        this.cnx = null;
        this.appId = null;
        this.topicName = null;
        this.partitionIdx = 0;
        this.consumerId = 0L;
        this.priorityLevel = 0;
        this.readCompacted = false;
        this.consumerName = str;
        this.msgOut = null;
        this.msgRedeliver = null;
        this.msgRedeliverCounter = null;
        this.msgOutCounter = null;
        this.bytesOutCounter = null;
        this.messageAckCounter = null;
        this.messageAckRate = null;
        this.pendingAcks = null;
        this.stats = null;
        this.isDurable = false;
        this.isPersistentTopic = false;
        this.metadata = null;
        this.keySharedMeta = null;
        this.clientAddress = null;
        this.startMessageId = null;
        this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
        this.schemaType = null;
        MESSAGE_PERMITS_UPDATER.set(this, i);
        OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
    }

    public CommandSubscribe.SubType subType() {
        return this.subType;
    }

    public long consumerId() {
        return this.consumerId;
    }

    public String consumerName() {
        return this.consumerName;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyActiveConsumerChange(Consumer consumer) {
        if (log.isDebugEnabled()) {
            log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", new Object[]{Long.valueOf(this.consumerId), this.topicName, this.subscription.getName(), consumer});
        }
        this.cnx.getCommandSender().sendActiveConsumerChange(this.consumerId, this == consumer);
    }

    public boolean readCompacted() {
        return this.readCompacted;
    }

    public Future<Void> sendMessages(List<? extends Entry> list, EntryBatchSizes entryBatchSizes, EntryBatchIndexesAcks entryBatchIndexesAcks, int i, long j, long j2, RedeliveryTracker redeliveryTracker) {
        return sendMessages(list, entryBatchSizes, entryBatchIndexesAcks, i, j, j2, redeliveryTracker, -1L);
    }

    public Future<Void> sendMessages(List<? extends Entry> list, EntryBatchSizes entryBatchSizes, EntryBatchIndexesAcks entryBatchIndexesAcks, int i, long j, long j2, RedeliveryTracker redeliveryTracker, long j3) {
        return sendMessages(list, null, entryBatchSizes, entryBatchIndexesAcks, i, j, j2, redeliveryTracker, j3);
    }

    public Future<Void> sendMessages(List<? extends Entry> list, List<Integer> list2, EntryBatchSizes entryBatchSizes, EntryBatchIndexesAcks entryBatchIndexesAcks, int i, long j, long j2, RedeliveryTracker redeliveryTracker, long j3) {
        this.lastConsumedTimestamp = System.currentTimeMillis();
        if (list.isEmpty() || i == 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId)});
            }
            entryBatchSizes.recyle();
            if (entryBatchIndexesAcks != null) {
                entryBatchIndexesAcks.recycle();
            }
            Promise<Void> newPromise = this.cnx.newPromise();
            newPromise.setSuccess((Object) null);
            return newPromise;
        }
        int i2 = i;
        int i3 = 0;
        for (int i4 = 0; i4 < list.size(); i4++) {
            Entry entry = list.get(i4);
            if (entry != null) {
                i3++;
                if (this.pendingAcks != null) {
                    int batchSize = entryBatchSizes.getBatchSize(i4);
                    int stickyKeyHash = list2 == null ? getStickyKeyHash(entry) : list2.get(i4).intValue();
                    long[] ackSet = entryBatchIndexesAcks == null ? null : entryBatchIndexesAcks.getAckSet(i4);
                    if (ackSet != null) {
                        i2 -= batchSize - BitSet.valueOf(ackSet).cardinality();
                    }
                    this.pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, stickyKeyHash);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in broker.service.Consumer for consumerId: {}", new Object[]{this.topicName, this.subscription, Long.valueOf(entry.getLedgerId()), Long.valueOf(entry.getEntryId()), Integer.valueOf(batchSize), Long.valueOf(this.consumerId)});
                    }
                }
            }
        }
        if (this.avgMessagesPerEntry.get() < 1.0d) {
            this.avgMessagesPerEntry.set((1.0d * i) / i3);
        } else {
            this.avgMessagesPerEntry.set((this.avgMessagesPerEntry.get() * avgPercent) + ((0.09999999999999998d * i) / i3));
        }
        int totalAckedIndexCount = entryBatchIndexesAcks == null ? 0 : entryBatchIndexesAcks.getTotalAckedIndexCount();
        MESSAGE_PERMITS_UPDATER.addAndGet(this, totalAckedIndexCount - i);
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer for consumerId: {}; avgMessagesPerEntry is {}", new Object[]{this.topicName, this.subscription, Integer.valueOf(totalAckedIndexCount), Integer.valueOf(i), Long.valueOf(this.consumerId), Double.valueOf(this.avgMessagesPerEntry.get())});
        }
        incrementUnackedMessages(i2);
        Future<Void> mo283sendMessagesToConsumer = this.cnx.getCommandSender().mo283sendMessagesToConsumer(this.consumerId, this.topicName, this.subscription, this.partitionIdx, list, entryBatchSizes, entryBatchIndexesAcks, redeliveryTracker, j3);
        mo283sendMessagesToConsumer.addListener(future -> {
            if (future.isSuccess()) {
                this.msgOut.recordMultipleEvents(i, j);
                this.msgOutCounter.add(i);
                this.bytesOutCounter.add(j);
                this.chunkedMessageRate.recordMultipleEvents(j2, 0L);
                return;
            }
            if (log.isDebugEnabled()) {
                Logger logger = log;
                Object[] objArr = new Object[4];
                objArr[0] = this.topicName;
                objArr[1] = this.subscription;
                objArr[2] = future.cause() == null ? "" : future.cause().getMessage();
                objArr[3] = toString();
                logger.debug("[{}-{}] Sent messages to client fail by IO exception[{}], close the connection immediately. Consumer: {}", objArr);
            }
        });
        return mo283sendMessagesToConsumer;
    }

    private void incrementUnackedMessages(int i) {
        if (!Subscription.isIndividualAckMode(this.subType) || addAndGetUnAckedMsgs(this, i) < getMaxUnackedMessages() || getMaxUnackedMessages() <= 0) {
            return;
        }
        this.blockedConsumerOnUnackedMsgs = true;
    }

    public boolean isWritable() {
        return this.cnx.isWritable();
    }

    public void close() throws BrokerServiceException {
        close(false);
    }

    public void close(boolean z) throws BrokerServiceException {
        this.subscription.removeConsumer(this, z);
        this.cnx.removedConsumer(this);
    }

    public void disconnect() {
        disconnect(false);
    }

    public void disconnect(boolean z) {
        disconnect(z, Optional.empty());
    }

    public void disconnect(boolean z, Optional<BrokerLookupData> optional) {
        log.info("Disconnecting consumer: {}", this);
        this.cnx.closeConsumer(this, optional);
        try {
            close(z);
        } catch (BrokerServiceException e) {
            log.warn("Consumer {} was already closed: {}", new Object[]{this, e.getMessage(), e});
        }
    }

    public void doUnsubscribe(long j, boolean z) {
        this.subscription.doUnsubscribe(this, z).thenAccept(r7 -> {
            log.info("Unsubscribed successfully from {}", this.subscription);
            this.cnx.removedConsumer(this);
            this.cnx.getCommandSender().sendSuccessResponse(j);
        }).exceptionally(th -> {
            log.warn("Unsubscribe failed for {}", this.subscription, th);
            this.cnx.getCommandSender().sendErrorResponse(j, BrokerServiceException.getClientErrorCode(th), th.getCause().getMessage());
            return null;
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    public CompletableFuture<Void> messageAcked(CommandAck commandAck) {
        CompletableFuture individualAckWithTransaction;
        PositionImpl positionImpl;
        this.lastAckedTimestamp = System.currentTimeMillis();
        Map<String, Long> emptyMap = Collections.emptyMap();
        if (commandAck.getPropertiesCount() > 0) {
            emptyMap = (Map) commandAck.getPropertiesList().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
        }
        if (commandAck.getAckType() != CommandAck.AckType.Cumulative) {
            individualAckWithTransaction = (commandAck.hasTxnidLeastBits() && commandAck.hasTxnidMostBits()) ? individualAckWithTransaction(commandAck) : individualAckNormal(commandAck, emptyMap);
        } else {
            if (commandAck.getMessageIdsCount() != 1) {
                log.warn("[{}] [{}] Received multi-message ack", this.subscription, Long.valueOf(this.consumerId));
                return CompletableFuture.completedFuture(null);
            }
            if (Subscription.isIndividualAckMode(this.subType)) {
                log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", this.subscription, Long.valueOf(this.consumerId));
                return CompletableFuture.completedFuture(null);
            }
            MessageIdData messageIdAt = commandAck.getMessageIdAt(0);
            if (messageIdAt.getAckSetsCount() > 0) {
                long[] jArr = new long[messageIdAt.getAckSetsCount()];
                for (int i = 0; i < messageIdAt.getAckSetsCount(); i++) {
                    jArr[i] = messageIdAt.getAckSetAt(i);
                }
                positionImpl = PositionImpl.get(messageIdAt.getLedgerId(), messageIdAt.getEntryId(), jArr);
            } else {
                positionImpl = PositionImpl.get(messageIdAt.getLedgerId(), messageIdAt.getEntryId());
            }
            if (commandAck.hasTxnidMostBits() && commandAck.hasTxnidLeastBits()) {
                individualAckWithTransaction = transactionCumulativeAcknowledge(commandAck.getTxnidMostBits(), commandAck.getTxnidLeastBits(), Collections.singletonList(positionImpl)).thenApply((Function<? super Void, ? extends U>) r3 -> {
                    return 1L;
                });
            } else {
                this.subscription.acknowledgeMessage(Collections.singletonList(positionImpl), CommandAck.AckType.Cumulative, emptyMap);
                individualAckWithTransaction = CompletableFuture.completedFuture(1L);
            }
        }
        return individualAckWithTransaction.thenApply(l -> {
            this.messageAckRate.recordEvent(l.longValue());
            this.messageAckCounter.add(l.longValue());
            return null;
        });
    }

    private CompletableFuture<Long> individualAckNormal(CommandAck commandAck, Map<String, Long> map) {
        PositionImpl positionImpl;
        long ackedCountForMsgIdNoAckSets;
        ArrayList arrayList = new ArrayList();
        long j = 0;
        for (int i = 0; i < commandAck.getMessageIdsCount(); i++) {
            MessageIdData messageIdAt = commandAck.getMessageIdAt(i);
            Pair<Consumer, Long> ackOwnerConsumerAndBatchSize = getAckOwnerConsumerAndBatchSize(messageIdAt.getLedgerId(), messageIdAt.getEntryId());
            Consumer consumer = (Consumer) ackOwnerConsumerAndBatchSize.getLeft();
            long longValue = ((Long) ackOwnerConsumerAndBatchSize.getRight()).longValue();
            if (messageIdAt.getAckSetsCount() > 0) {
                long[] jArr = new long[messageIdAt.getAckSetsCount()];
                for (int i2 = 0; i2 < messageIdAt.getAckSetsCount(); i2++) {
                    jArr[i2] = messageIdAt.getAckSetAt(i2);
                }
                positionImpl = PositionImpl.get(messageIdAt.getLedgerId(), messageIdAt.getEntryId(), jArr);
                ackedCountForMsgIdNoAckSets = getAckedCountForBatchIndexLevelEnabled(positionImpl, longValue, jArr, consumer);
                if (isTransactionEnabled() && Subscription.isIndividualAckMode(this.subType)) {
                    ((PersistentSubscription) this.subscription).syncBatchPositionBitSetForPendingAck(positionImpl);
                }
                addAndGetUnAckedMsgs(consumer, -((int) ackedCountForMsgIdNoAckSets));
            } else {
                positionImpl = PositionImpl.get(messageIdAt.getLedgerId(), messageIdAt.getEntryId());
                ackedCountForMsgIdNoAckSets = getAckedCountForMsgIdNoAckSets(longValue, positionImpl, consumer);
                if (checkCanRemovePendingAcksAndHandle(consumer, positionImpl, messageIdAt)) {
                    addAndGetUnAckedMsgs(consumer, -((int) ackedCountForMsgIdNoAckSets));
                }
            }
            arrayList.add(Pair.of(consumer, positionImpl));
            checkAckValidationError(commandAck, positionImpl);
            j += ackedCountForMsgIdNoAckSets;
        }
        this.subscription.acknowledgeMessage((List) arrayList.stream().map((v0) -> {
            return v0.getRight();
        }).collect(Collectors.toList()), CommandAck.AckType.Individual, map);
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        completableFuture.complete(Long.valueOf(j));
        if (isTransactionEnabled() && Subscription.isIndividualAckMode(this.subType)) {
            completableFuture.whenComplete((l, th) -> {
                arrayList.forEach(pair -> {
                    Consumer consumer2 = (Consumer) pair.getLeft();
                    PositionImpl positionImpl2 = (Position) pair.getRight();
                    if (positionImpl2.getAckSet() == null || !((PersistentSubscription) this.subscription).checkIsCanDeleteConsumerPendingAck(positionImpl2)) {
                        return;
                    }
                    removePendingAcks(consumer2, positionImpl2);
                });
            });
        }
        return completableFuture;
    }

    private CompletableFuture<Long> individualAckWithTransaction(CommandAck commandAck) {
        long j;
        long j2;
        ArrayList arrayList = new ArrayList();
        if (!isTransactionEnabled()) {
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
        }
        LongAdder longAdder = new LongAdder();
        for (int i = 0; i < commandAck.getMessageIdsCount(); i++) {
            MessageIdData messageIdAt = commandAck.getMessageIdAt(i);
            PositionImpl positionImpl = PositionImpl.get(messageIdAt.getLedgerId(), messageIdAt.getEntryId());
            Consumer consumer = (Consumer) getAckOwnerConsumerAndBatchSize(messageIdAt.getLedgerId(), messageIdAt.getEntryId()).getLeft();
            if (messageIdAt.hasBatchSize()) {
                j = messageIdAt.getBatchSize();
                j2 = messageIdAt.getBatchSize();
                arrayList.add(Pair.of(consumer, new MutablePair(positionImpl, Integer.valueOf(messageIdAt.getBatchSize()))));
            } else {
                j = 0;
                j2 = 1;
                arrayList.add(Pair.of(consumer, new MutablePair(positionImpl, Integer.valueOf((int) 0))));
            }
            if (messageIdAt.getAckSetsCount() > 0) {
                long[] jArr = new long[messageIdAt.getAckSetsCount()];
                for (int i2 = 0; i2 < messageIdAt.getAckSetsCount(); i2++) {
                    jArr[i2] = messageIdAt.getAckSetAt(i2);
                }
                positionImpl.setAckSet(jArr);
                j2 = getAckedCountForTransactionAck(j, jArr);
            }
            addAndGetUnAckedMsgs(consumer, -((int) j2));
            checkCanRemovePendingAcksAndHandle(consumer, positionImpl, messageIdAt);
            checkAckValidationError(commandAck, positionImpl);
            longAdder.add(j2);
        }
        CompletableFuture<Void> transactionIndividualAcknowledge = transactionIndividualAcknowledge(commandAck.getTxnidMostBits(), commandAck.getTxnidLeastBits(), (List) arrayList.stream().map((v0) -> {
            return v0.getRight();
        }).collect(Collectors.toList()));
        if (Subscription.isIndividualAckMode(this.subType)) {
            transactionIndividualAcknowledge.whenComplete((r5, th) -> {
                arrayList.forEach(pair -> {
                    Consumer consumer2 = (Consumer) pair.getLeft();
                    MutablePair mutablePair = (MutablePair) pair.getRight();
                    if (((PositionImpl) mutablePair.getLeft()).getAckSet() == null || !((PersistentSubscription) this.subscription).checkIsCanDeleteConsumerPendingAck((PositionImpl) mutablePair.left)) {
                        return;
                    }
                    removePendingAcks(consumer2, (PositionImpl) mutablePair.left);
                });
            });
        }
        return transactionIndividualAcknowledge.thenApply(r4 -> {
            return Long.valueOf(longAdder.sum());
        });
    }

    private long getAckedCountForMsgIdNoAckSets(long j, PositionImpl positionImpl, Consumer consumer) {
        return (this.isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(this.subType) && getCursorAckSet(positionImpl) != null) ? getAckedCountForBatchIndexLevelEnabled(positionImpl, j, EMPTY_ACK_SET, consumer) : j;
    }

    private long getAckedCountForBatchIndexLevelEnabled(PositionImpl positionImpl, long j, long[] jArr, Consumer consumer) {
        long j2 = 0;
        if (this.isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(this.subType) && consumer.getPendingAcks().get(positionImpl.getLedgerId(), positionImpl.getEntryId()) != null) {
            long[] cursorAckSet = getCursorAckSet(positionImpl);
            if (cursorAckSet != null) {
                BitSetRecyclable resetWords = BitSetRecyclable.create().resetWords(cursorAckSet);
                int cardinality = resetWords.cardinality();
                BitSetRecyclable resetWords2 = BitSetRecyclable.create().resetWords(jArr);
                resetWords.and(resetWords2);
                resetWords2.recycle();
                j2 = cardinality - resetWords.cardinality();
                resetWords.recycle();
            } else {
                j2 = j - BitSet.valueOf(jArr).cardinality();
            }
        }
        return j2;
    }

    private long getAckedCountForTransactionAck(long j, long[] jArr) {
        long cardinality = j - r0.cardinality();
        BitSetRecyclable.create().resetWords(jArr).recycle();
        return cardinality;
    }

    private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl positionImpl, long j) {
        long[] cursorAckSet;
        long j2 = j;
        if (this.isAcknowledgmentAtBatchIndexLevelEnabled && (cursorAckSet = getCursorAckSet(positionImpl)) != null) {
            BitSetRecyclable resetWords = BitSetRecyclable.create().resetWords(cursorAckSet);
            j2 = resetWords.cardinality();
            resetWords.recycle();
        }
        return j2;
    }

    private void checkAckValidationError(CommandAck commandAck, PositionImpl positionImpl) {
        if (commandAck.hasValidationError()) {
            log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", new Object[]{this.subscription, Long.valueOf(this.consumerId), positionImpl, commandAck.getValidationError()});
        }
    }

    private boolean checkCanRemovePendingAcksAndHandle(Consumer consumer, PositionImpl positionImpl, MessageIdData messageIdData) {
        if (Subscription.isIndividualAckMode(this.subType) && messageIdData.getAckSetsCount() == 0) {
            return removePendingAcks(consumer, positionImpl);
        }
        return false;
    }

    private Pair<Consumer, Long> getAckOwnerConsumerAndBatchSize(long j, long j2) {
        ConcurrentLongLongPairHashMap.LongPair longPair;
        if (Subscription.isIndividualAckMode(this.subType)) {
            ConcurrentLongLongPairHashMap.LongPair longPair2 = getPendingAcks().get(j, j2);
            if (longPair2 != null) {
                return Pair.of(this, Long.valueOf(longPair2.first));
            }
            for (Consumer consumer : this.subscription.getConsumers()) {
                if (consumer != this && (longPair = consumer.getPendingAcks().get(j, j2)) != null) {
                    return Pair.of(consumer, Long.valueOf(longPair.first));
                }
            }
        }
        return Pair.of(this, 1L);
    }

    private long[] getCursorAckSet(PositionImpl positionImpl) {
        if (this.subscription instanceof PersistentSubscription) {
            return ((PersistentSubscription) this.subscription).getCursor().getDeletedBatchIndexesAsLongArray(positionImpl);
        }
        return null;
    }

    private boolean isTransactionEnabled() {
        return (this.subscription instanceof PersistentSubscription) && ((PersistentTopic) this.subscription.getTopic()).getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled();
    }

    private CompletableFuture<Void> transactionIndividualAcknowledge(long j, long j2, List<MutablePair<PositionImpl, Integer>> list) {
        if (this.subscription instanceof PersistentSubscription) {
            return ((PersistentSubscription) this.subscription).transactionIndividualAcknowledge(new TxnID(j, j2), list);
        }
        log.error("Transaction acknowledge only support the `PersistentSubscription`.");
        return FutureUtil.failedFuture(new TransactionConflictException("Transaction acknowledge only support the `PersistentSubscription`."));
    }

    private CompletableFuture<Void> transactionCumulativeAcknowledge(long j, long j2, List<PositionImpl> list) {
        if (!isTransactionEnabled()) {
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
        }
        if (this.subscription instanceof PersistentSubscription) {
            return ((PersistentSubscription) this.subscription).transactionCumulativeAcknowledge(new TxnID(j, j2), list);
        }
        log.error("Transaction acknowledge only support the `PersistentSubscription`.");
        return FutureUtil.failedFuture(new TransactionConflictException("Transaction acknowledge only support the `PersistentSubscription`."));
    }

    public void flowPermits(int i) {
        int andAdd;
        Preconditions.checkArgument(i > 0);
        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
        if (shouldBlockConsumerOnUnackMsgs() && this.unackedMessages >= getMaxUnackedMessages()) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
        if (this.blockedConsumerOnUnackedMsgs) {
            andAdd = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, i);
        } else {
            andAdd = MESSAGE_PERMITS_UPDATER.getAndAdd(this, i);
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Added {} message permits in broker.service.Consumer before updating dispatcher for consumer {}", new Object[]{this.topicName, this.subscription, Integer.valueOf(i), Long.valueOf(this.consumerId)});
            }
            this.subscription.consumerFlow(this, i);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = {} ", new Object[]{this.topicName, this.subscription, Integer.valueOf(i), Integer.valueOf(andAdd), Boolean.valueOf(this.blockedConsumerOnUnackedMsgs)});
        }
    }

    void flowConsumerBlockedPermits(Consumer consumer) {
        int andSet = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
        MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, andSet);
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added {} blocked permits to broker.service.Consumer for consumer {}", new Object[]{this.topicName, this.subscription, Integer.valueOf(andSet), Long.valueOf(this.consumerId)});
        }
        this.subscription.consumerFlow(consumer, andSet);
    }

    public int getAvailablePermits() {
        return MESSAGE_PERMITS_UPDATER.get(this);
    }

    public int getAvgMessagesPerEntry() {
        return (int) Math.round(this.avgMessagesPerEntry.get());
    }

    public boolean isBlocked() {
        return this.blockedConsumerOnUnackedMsgs;
    }

    public void reachedEndOfTopic() {
        this.cnx.getCommandSender().sendReachedEndOfTopic(this.consumerId);
    }

    public void topicMigrated(Optional<ClusterPolicies.ClusterUrl> optional) {
        if (optional.isPresent()) {
            ClusterPolicies.ClusterUrl clusterUrl = optional.get();
            this.cnx.getCommandSender().sendTopicMigrated(CommandTopicMigrated.ResourceType.Consumer, this.consumerId, clusterUrl.getBrokerServiceUrl(), clusterUrl.getBrokerServiceUrlTls());
            disconnect();
        }
    }

    public boolean checkAndApplyTopicMigration() {
        if (!this.subscription.isSubscriptionMigrated()) {
            return false;
        }
        Optional<ClusterPolicies.ClusterUrl> migratedClusterUrl = AbstractTopic.getMigratedClusterUrl(this.cnx.getBrokerService().getPulsar(), this.topicName);
        if (!migratedClusterUrl.isPresent()) {
            return false;
        }
        ClusterPolicies.ClusterUrl clusterUrl = migratedClusterUrl.get();
        this.cnx.getCommandSender().sendTopicMigrated(CommandTopicMigrated.ResourceType.Consumer, this.consumerId, clusterUrl.getBrokerServiceUrl(), clusterUrl.getBrokerServiceUrlTls());
        disconnect();
        return true;
    }

    private boolean shouldBlockConsumerOnUnackMsgs() {
        return Subscription.isIndividualAckMode(this.subType) && getMaxUnackedMessages() > 0;
    }

    public void updateRates() {
        this.msgOut.calculateRate();
        this.chunkedMessageRate.calculateRate();
        this.msgRedeliver.calculateRate();
        this.messageAckRate.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateRedeliver = this.msgRedeliver.getRate();
        this.stats.messageAckRate = this.messageAckRate.getValueRate();
        this.stats.chunkedMessageRate = this.chunkedMessageRate.getRate();
    }

    public void updateStats(ConsumerStatsImpl consumerStatsImpl) {
        this.msgOutCounter.add(consumerStatsImpl.msgOutCounter);
        this.bytesOutCounter.add(consumerStatsImpl.bytesOutCounter);
        this.msgOut.recordMultipleEvents(consumerStatsImpl.msgOutCounter, consumerStatsImpl.bytesOutCounter);
        this.lastAckedTimestamp = consumerStatsImpl.lastAckedTimestamp;
        this.lastConsumedTimestamp = consumerStatsImpl.lastConsumedTimestamp;
        this.lastConsumedFlowTimestamp = consumerStatsImpl.lastConsumedFlowTimestamp;
        MESSAGE_PERMITS_UPDATER.set(this, consumerStatsImpl.availablePermits);
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", new Object[]{this.topicName, this.subscription, Integer.valueOf(consumerStatsImpl.availablePermits), Long.valueOf(this.consumerId)});
        }
        this.unackedMessages = consumerStatsImpl.unackedMessages;
        this.blockedConsumerOnUnackedMsgs = consumerStatsImpl.blockedConsumerOnUnackedMsgs;
        this.avgMessagesPerEntry.set(consumerStatsImpl.avgMessagesPerEntry);
    }

    public ConsumerStatsImpl getStats() {
        this.stats.msgOutCounter = this.msgOutCounter.longValue();
        this.stats.bytesOutCounter = this.bytesOutCounter.longValue();
        this.stats.lastAckedTimestamp = this.lastAckedTimestamp;
        this.stats.lastConsumedTimestamp = this.lastConsumedTimestamp;
        this.stats.lastConsumedFlowTimestamp = this.lastConsumedFlowTimestamp;
        this.stats.availablePermits = getAvailablePermits();
        this.stats.unackedMessages = this.unackedMessages;
        this.stats.blockedConsumerOnUnackedMsgs = this.blockedConsumerOnUnackedMsgs;
        this.stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
        if (this.readPositionWhenJoining != null) {
            this.stats.readPositionWhenJoining = this.readPositionWhenJoining.toString();
        }
        return this.stats;
    }

    public long getMsgOutCounter() {
        return this.msgOutCounter.longValue();
    }

    public long getBytesOutCounter() {
        return this.bytesOutCounter.longValue();
    }

    public long getMessageAckCounter() {
        return this.messageAckCounter.sum();
    }

    public long getMessageRedeliverCounter() {
        return this.msgRedeliverCounter.sum();
    }

    public int getUnackedMessages() {
        return this.unackedMessages;
    }

    public KeySharedMeta getKeySharedMeta() {
        return this.keySharedMeta;
    }

    public String toString() {
        return (this.subscription == null || this.cnx == null) ? MoreObjects.toStringHelper(this).add("consumerId", this.consumerId).add("consumerName", this.consumerName).toString() : MoreObjects.toStringHelper(this).add("subscription", this.subscription).add("consumerId", this.consumerId).add("consumerName", this.consumerName).add("address", this.cnx.toString()).toString();
    }

    public CompletableFuture<Void> checkPermissionsAsync() {
        TopicName topicName = TopicName.get(this.subscription.getTopicName());
        if (this.cnx.getBrokerService().getAuthorizationService() == null) {
            return CompletableFuture.completedFuture(null);
        }
        return this.cnx.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName, TopicOperation.CONSUME, this.appId, new AuthenticationDataSubscription(this.cnx.getAuthenticationData(), this.subscription.getName())).handle((bool, th) -> {
            if (th != null) {
                log.warn("[{}] Get unexpected error while authorizing [{}]  {}", new Object[]{this.appId, this.subscription.getTopicName(), th.getMessage(), th});
            }
            if (bool != null && bool.booleanValue()) {
                return null;
            }
            log.info("[{}] is not allowed to consume from topic [{}] anymore", this.appId, this.subscription.getTopicName());
            disconnect();
            return null;
        });
    }

    public boolean equals(Object obj) {
        if (!(obj instanceof Consumer)) {
            return false;
        }
        Consumer consumer = (Consumer) obj;
        return this.consumerId == consumer.consumerId && Objects.equals(this.cnx.clientAddress(), consumer.cnx.clientAddress());
    }

    public int hashCode() {
        return this.consumerName.hashCode() + (31 * this.cnx.hashCode());
    }

    private boolean removePendingAcks(Consumer consumer, PositionImpl positionImpl) {
        if (!consumer.getPendingAcks().remove(positionImpl.getLedgerId(), positionImpl.getEntryId())) {
            return false;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received ack {}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId), positionImpl});
        }
        if (!(UNACKED_MESSAGES_UPDATER.get(consumer) <= getMaxUnackedMessages() / 2 && consumer.blockedConsumerOnUnackedMsgs && consumer.shouldBlockConsumerOnUnackMsgs()) && shouldBlockConsumerOnUnackMsgs()) {
            return true;
        }
        consumer.blockedConsumerOnUnackedMsgs = false;
        flowConsumerBlockedPermits(consumer);
        return true;
    }

    public ConcurrentLongLongPairHashMap getPendingAcks() {
        return this.pendingAcks;
    }

    public int getPriorityLevel() {
        return this.priorityLevel;
    }

    public void redeliverUnacknowledgedMessages(long j) {
        clearUnAckedMsgs();
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received redelivery", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId)});
        }
        if (this.pendingAcks != null) {
            ArrayList<PositionImpl> arrayList = new ArrayList((int) this.pendingAcks.size());
            MutableInt mutableInt = new MutableInt(0);
            this.pendingAcks.forEach((j2, j3, j4, j5) -> {
                mutableInt.add((int) getUnAckedCountForBatchIndexLevelEnabled(PositionImpl.get(j2, j3), j4));
                arrayList.add(new PositionImpl(j2, j3));
            });
            for (PositionImpl positionImpl : arrayList) {
                this.pendingAcks.remove(positionImpl.getLedgerId(), positionImpl.getEntryId());
            }
            this.msgRedeliver.recordMultipleEvents(mutableInt.intValue(), mutableInt.intValue());
            this.msgRedeliverCounter.add(mutableInt.intValue());
            this.subscription.redeliverUnacknowledgedMessages(this, arrayList);
        } else {
            this.subscription.redeliverUnacknowledgedMessages(this, j);
        }
        flowConsumerBlockedPermits(this);
    }

    public void redeliverUnacknowledgedMessages(List<MessageIdData> list) {
        int i = 0;
        ArrayList arrayList = new ArrayList();
        for (MessageIdData messageIdData : list) {
            PositionImpl positionImpl = PositionImpl.get(messageIdData.getLedgerId(), messageIdData.getEntryId());
            ConcurrentLongLongPairHashMap.LongPair longPair = this.pendingAcks.get(positionImpl.getLedgerId(), positionImpl.getEntryId());
            if (longPair != null) {
                int unAckedCountForBatchIndexLevelEnabled = (int) getUnAckedCountForBatchIndexLevelEnabled(positionImpl, longPair.first);
                this.pendingAcks.remove(positionImpl.getLedgerId(), positionImpl.getEntryId());
                i += unAckedCountForBatchIndexLevelEnabled;
                arrayList.add(positionImpl);
            }
        }
        addAndGetUnAckedMsgs(this, -i);
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId), Integer.valueOf(i), Integer.valueOf(arrayList.size())});
        }
        this.subscription.redeliverUnacknowledgedMessages(this, arrayList);
        this.msgRedeliver.recordMultipleEvents(i, i);
        this.msgRedeliverCounter.add(i);
        int andSet = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
        if (andSet > 0) {
            MESSAGE_PERMITS_UPDATER.getAndAdd(this, andSet);
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Added {} blockedPermits to broker.service.Consumer's messagePermits for consumer {}", new Object[]{this.topicName, this.subscription, Integer.valueOf(andSet), Long.valueOf(this.consumerId)});
            }
            this.subscription.consumerFlow(this, andSet);
        }
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    private int addAndGetUnAckedMsgs(Consumer consumer, int i) {
        int i2 = 0;
        if (this.isPersistentTopic && Subscription.isIndividualAckMode(this.subType)) {
            this.subscription.addUnAckedMessages(i);
            i2 = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, i);
        }
        if (i2 < 0 && System.currentTimeMillis() - this.negativeUnackedMsgsTimestamp >= 10000) {
            this.negativeUnackedMsgsTimestamp = System.currentTimeMillis();
            log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", new Object[]{Integer.valueOf(i2), Integer.valueOf(i), consumer});
        }
        return i2;
    }

    private void clearUnAckedMsgs() {
        this.subscription.addUnAckedMessages(-UNACKED_MESSAGES_UPDATER.getAndSet(this, 0));
    }

    public boolean isPreciseDispatcherFlowControl() {
        return this.preciseDispatcherFlowControl;
    }

    public void setReadPositionWhenJoining(PositionImpl positionImpl) {
        this.readPositionWhenJoining = positionImpl;
    }

    public int getMaxUnackedMessages() {
        if (!this.isDurable || this.subscription == null) {
            return 0;
        }
        return ((Integer) this.subscription.getTopic().getHierarchyTopicPolicies().getMaxUnackedMessagesOnConsumer().get()).intValue();
    }

    public TransportCnx cnx() {
        return this.cnx;
    }

    public String getClientAddress() {
        return this.clientAddress;
    }

    public String getClientAddressAndPort() {
        return this.cnx.clientSourceAddressAndPort();
    }

    public String getClientVersion() {
        return this.cnx.getClientVersion();
    }

    public MessageId getStartMessageId() {
        return this.startMessageId;
    }

    public Map<String, String> getMetadata() {
        return this.metadata;
    }

    private int getStickyKeyHash(Entry entry) {
        return StickyKeyConsumerSelector.makeStickyKeyHash(entry instanceof EntryAndMetadata ? ((EntryAndMetadata) entry).getStickyKey() : Commands.peekStickyKey(entry.getDataBuffer(), this.topicName, this.subscription.getName()));
    }

    public Attributes getOpenTelemetryAttributes() {
        return this.openTelemetryAttributes != null ? this.openTelemetryAttributes : OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.updateAndGet(this, attributes -> {
            if (attributes != null) {
                return attributes;
            }
            TopicName topicName = TopicName.get(this.subscription.getTopic().getName());
            AttributesBuilder put = Attributes.builder().put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, this.consumerName).put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, Long.valueOf(this.consumerId)).put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, this.subscription.getName()).put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, this.subType.toString()).put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString()).put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant()).put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace()).put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName());
            if (topicName.isPartitioned()) {
                put.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex());
            }
            return put.build();
        });
    }

    public long getConsumerEpoch() {
        return this.consumerEpoch;
    }

    public void setConsumerEpoch(long j) {
        this.consumerEpoch = j;
    }

    public SchemaType getSchemaType() {
        return this.schemaType;
    }

    public Instant getConnectedSince() {
        return this.connectedSince;
    }
}
