package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.apache.pulsar.shade.com.google.common.collect.UnmodifiableIterator;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.PositionAckSetUtil;
import org.apache.pulsar.shade.org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.shade.org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Markers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/AbstractBaseDispatcher.class */
public abstract class AbstractBaseDispatcher implements Dispatcher {
    private static final Logger log = LoggerFactory.getLogger(AbstractBaseDispatcher.class);
    protected final Subscription subscription;
    protected final ServiceConfiguration serviceConfig;
    protected final boolean dispatchThrottlingOnBatchMessageEnabled;
    protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
    protected final FilterContext filterContext;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfiguration) {
        this.subscription = subscription;
        this.serviceConfig = serviceConfiguration;
        this.dispatchThrottlingOnBatchMessageEnabled = serviceConfiguration.isDispatchThrottlingOnBatchMessageEnabled();
        if (subscription == null || subscription.getTopic() == null || !MapUtils.isNotEmpty(subscription.getTopic().getBrokerService().getEntryFilters())) {
            this.entryFilters = ImmutableList.of();
            this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
        } else {
            this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
            this.filterContext = new FilterContext();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrapperArr, List<Entry> list) {
        int i = 0;
        int size = list.size();
        for (int i2 = 0; i2 < size; i2++) {
            Entry entry = list.get(i2);
            if (entry != null) {
                MessageMetadata peekMessageMetadata = Commands.peekMessageMetadata(entry.getDataBuffer(), this.subscription.toString(), -1L);
                entryWrapperArr[i2] = EntryWrapper.get(entry, peekMessageMetadata);
                i += peekMessageMetadata.getNumMessagesInBatch();
            }
        }
        return i;
    }

    public int filterEntriesForConsumer(List<Entry> list, EntryBatchSizes entryBatchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks entryBatchIndexesAcks, ManagedCursor managedCursor, boolean z, Consumer consumer) {
        return filterEntriesForConsumer(null, 0, list, entryBatchSizes, sendMessageInfo, entryBatchIndexesAcks, managedCursor, z, consumer);
    }

    public int filterEntriesForConsumer(EntryWrapper[] entryWrapperArr, int i, List<? extends Entry> list, EntryBatchSizes entryBatchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks entryBatchIndexesAcks, ManagedCursor managedCursor, boolean z, Consumer consumer) {
        PositionImpl positionInPendingAck;
        int i2 = 0;
        long j = 0;
        int i3 = 0;
        int i4 = 0;
        ArrayList arrayList = CollectionUtils.isNotEmpty(this.entryFilters) ? new ArrayList() : null;
        ArrayList arrayList2 = CollectionUtils.isNotEmpty(this.entryFilters) ? new ArrayList() : null;
        int size = list.size();
        for (int i5 = 0; i5 < size; i5++) {
            Entry entry = list.get(i5);
            if (entry != null) {
                ByteBuf dataBuffer = entry.getDataBuffer();
                int i6 = i5 + i;
                MessageMetadata metadata = (entryWrapperArr == null || entryWrapperArr[i6] == null) ? null : entryWrapperArr[i6].getMetadata();
                MessageMetadata peekMessageMetadata = metadata == null ? Commands.peekMessageMetadata(dataBuffer, this.subscription.toString(), -1L) : metadata;
                EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
                if (CollectionUtils.isNotEmpty(this.entryFilters)) {
                    fillContext(this.filterContext, peekMessageMetadata, this.subscription, consumer);
                    EntryFilter.FilterResult filterResult2 = getFilterResult(this.filterContext, entry, this.entryFilters);
                    if (filterResult2 == EntryFilter.FilterResult.REJECT) {
                        arrayList.add(entry.getPosition());
                        list.set(i5, null);
                        entry.release();
                    } else if (filterResult2 == EntryFilter.FilterResult.RESCHEDULE) {
                        arrayList2.add((PositionImpl) entry.getPosition());
                        list.set(i5, null);
                        entry.release();
                    }
                }
                if (peekMessageMetadata != null && peekMessageMetadata.hasTxnidMostBits() && peekMessageMetadata.hasTxnidLeastBits()) {
                    if (Markers.isTxnMarker(peekMessageMetadata)) {
                        individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
                        list.set(i5, null);
                        entry.release();
                    } else if (((PersistentTopic) this.subscription.getTopic()).isTxnAborted(new TxnID(peekMessageMetadata.getTxnidMostBits(), peekMessageMetadata.getTxnidLeastBits()))) {
                        individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
                        list.set(i5, null);
                        entry.release();
                    }
                }
                if (peekMessageMetadata == null || Markers.isServerOnlyMarker(peekMessageMetadata)) {
                    PositionImpl positionImpl = (PositionImpl) entry.getPosition();
                    if (Markers.isReplicatedSubscriptionSnapshotMarker(peekMessageMetadata)) {
                        processReplicatedSubscriptionSnapshot(positionImpl, dataBuffer);
                    }
                    list.set(i5, null);
                    entry.release();
                    individualAcknowledgeMessageIfNeeded(positionImpl, Collections.emptyMap());
                } else if (trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), peekMessageMetadata)) {
                    list.set(i5, null);
                    entry.release();
                } else {
                    i4++;
                    int numMessagesInBatch = peekMessageMetadata.getNumMessagesInBatch();
                    i2 += numMessagesInBatch;
                    j += dataBuffer.readableBytes();
                    i3 += peekMessageMetadata.hasChunkId() ? 1 : 0;
                    entryBatchSizes.setBatchSize(i5, numMessagesInBatch);
                    long[] jArr = null;
                    if (entryBatchIndexesAcks != null && managedCursor != null) {
                        PositionImpl positionImpl2 = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                        jArr = managedCursor.getDeletedBatchIndexesAsLongArray(positionImpl2);
                        if ((this.subscription instanceof PersistentSubscription) && (((PersistentSubscription) this.subscription).getPendingAckHandle() instanceof PendingAckHandleImpl) && (positionInPendingAck = ((PersistentSubscription) this.subscription).getPositionInPendingAck(positionImpl2)) != null) {
                            if (positionInPendingAck.hasAckSet()) {
                                jArr = jArr != null ? PositionAckSetUtil.andAckSet(jArr, positionInPendingAck.getAckSet()) : positionInPendingAck.getAckSet();
                                if (PositionAckSetUtil.isAckSetEmpty(jArr)) {
                                    list.set(i5, null);
                                    entry.release();
                                }
                            } else {
                                list.set(i5, null);
                                entry.release();
                            }
                        }
                        if (jArr != null) {
                            entryBatchIndexesAcks.setIndexesAcks(i5, Pair.of(Integer.valueOf(numMessagesInBatch), jArr));
                        } else {
                            entryBatchIndexesAcks.setIndexesAcks(i5, null);
                        }
                    }
                    BrokerInterceptor interceptor = this.subscription.interceptor();
                    if (null != interceptor) {
                        interceptor.beforeSendMessage(this.subscription, entry, jArr, peekMessageMetadata);
                    }
                }
            }
        }
        if (CollectionUtils.isNotEmpty(arrayList)) {
            this.subscription.acknowledgeMessage(arrayList, CommandAck.AckType.Individual, Collections.emptyMap());
            int size2 = arrayList.size();
            Topic topic = this.subscription.getTopic();
            if (topic instanceof AbstractTopic) {
                ((AbstractTopic) topic).addFilteredEntriesCount(size2);
            }
        }
        if (CollectionUtils.isNotEmpty(arrayList2)) {
            this.subscription.getTopic().getBrokerService().getPulsar().getExecutor().schedule(() -> {
                this.subscription.redeliverUnacknowledgedMessages(consumer, (List<PositionImpl>) arrayList2);
            }, this.serviceConfig.getDispatcherEntryFilterRescheduledMessageDelay(), TimeUnit.MILLISECONDS);
        }
        sendMessageInfo.setTotalMessages(i2);
        sendMessageInfo.setTotalBytes(j);
        sendMessageInfo.setTotalChunkedMessages(i3);
        return i4;
    }

    private void individualAcknowledgeMessageIfNeeded(Position position, Map<String, Long> map) {
        if (this.subscription instanceof CompactorSubscription) {
            return;
        }
        this.subscription.acknowledgeMessage(Collections.singletonList(position), CommandAck.AckType.Individual, map);
    }

    private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry, ImmutableList<EntryFilterWithClassLoader> immutableList) {
        UnmodifiableIterator<EntryFilterWithClassLoader> it = immutableList.iterator();
        while (it.hasNext()) {
            EntryFilter.FilterResult filterEntry = it.next().filterEntry(entry, filterContext);
            if (filterEntry == null) {
                filterEntry = EntryFilter.FilterResult.ACCEPT;
            }
            if (filterEntry != EntryFilter.FilterResult.ACCEPT) {
                return filterEntry;
            }
        }
        return EntryFilter.FilterResult.ACCEPT;
    }

    private void fillContext(FilterContext filterContext, MessageMetadata messageMetadata, Subscription subscription, Consumer consumer) {
        filterContext.reset();
        filterContext.setMsgMetadata(messageMetadata);
        filterContext.setSubscription(subscription);
        filterContext.setConsumer(consumer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract boolean isConsumersExceededOnSubscription();

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isConsumersExceededOnSubscription(AbstractTopic abstractTopic, int i) {
        Integer num;
        return !abstractTopic.isSystemTopic() && (num = abstractTopic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get()) != null && num.intValue() > 0 && num.intValue() <= i;
    }

    private void processReplicatedSubscriptionSnapshot(PositionImpl positionImpl, ByteBuf byteBuf) {
        Commands.skipMessageMetadata(byteBuf);
        try {
            this.subscription.processReplicatedSubscriptionSnapshot(Markers.parseReplicatedSubscriptionsSnapshot(byteBuf));
        } catch (Throwable th) {
            log.warn("Failed to process replicated subscription snapshot at {} -- {}", new Object[]{positionImpl, th.getMessage(), th});
        }
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void resetCloseFuture() {
    }

    protected abstract void reScheduleRead();

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean reachDispatchRateLimit(DispatchRateLimiter dispatchRateLimiter) {
        if (!dispatchRateLimiter.isDispatchRateLimitingEnabled() || dispatchRateLimiter.hasMessageDispatchPermit()) {
            return false;
        }
        reScheduleRead();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Pair<Integer, Long> updateMessagesToRead(DispatchRateLimiter dispatchRateLimiter, int i, long j) {
        return computeReadLimits(i, (int) dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(), j, dispatchRateLimiter.getAvailableDispatchRateLimitOnByte());
    }

    protected static Pair<Integer, Long> computeReadLimits(int i, int i2, long j, long j2) {
        if (i2 > 0) {
            i = Math.min(i, i2);
        }
        if (j2 > 0) {
            j = Math.min(j, j2);
        }
        return Pair.of(Integer.valueOf(i), Long.valueOf(j));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public byte[] peekStickyKey(ByteBuf byteBuf) {
        return Commands.peekStickyKey(byteBuf, this.subscription.getTopicName(), this.subscription.getName());
    }
}
