package org.apache.pulsar.broker.service.plugin;

import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/plugin/FilterEntryTest.class */
public class FilterEntryTest extends BrokerTestBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(FilterEntryTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    @Test
    public void testOverride() throws Exception {
        this.conf.setAllowOverrideEntryFilters(true);
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        for (int i = 0; i < 10; i++) {
            create.send("test");
        }
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get();
        EntryFilter entryFilter = (EntryFilter) Mockito.mock(EntryFilter.class);
        Mockito.when(entryFilter.filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class))).thenReturn(EntryFilter.FilterResult.REJECT);
        setMockFilterToTopic(persistentTopic, List.of(entryFilter));
        EntryFilter entryFilter2 = (EntryFilter) Mockito.mock(EntryFilter.class);
        Mockito.when(entryFilter2.filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class))).thenReturn(EntryFilter.FilterResult.ACCEPT);
        setMockBrokerFilter(List.of(entryFilter2));
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub").subscribe();
        int i2 = 0;
        while (true) {
            Message receive = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            i2++;
            subscribe.acknowledge(receive);
        }
        AssertJUnit.assertEquals(0, i2);
        this.conf.setAllowOverrideEntryFilters(false);
        subscribe.close();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub" + "1").subscribe();
        int i3 = 0;
        while (true) {
            Message receive2 = subscribe2.receive(1, TimeUnit.SECONDS);
            if (receive2 == null) {
                AssertJUnit.assertEquals(10, i3);
                this.conf.setAllowOverrideEntryFilters(false);
                subscribe2.close();
                return;
            }
            i3++;
            subscribe2.acknowledge(receive2);
        }
    }

    @Test
    public void testEntryFilterWithCompactor() throws Exception {
        this.conf.setAllowOverrideEntryFilters(true);
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        ArrayList arrayList = new ArrayList();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        create.newMessage().key("K1").value("V1").send();
        create.newMessage().key("K2").value("V2").send();
        create.newMessage().key("K3").value("V3").send();
        create.newMessage().key("K4").value("V4").send();
        arrayList.add("V2");
        arrayList.add("V4");
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get();
        EntryFilter entryFilter = (EntryFilter) Mockito.mock(EntryFilter.class);
        ((EntryFilter) Mockito.doAnswer(invocationOnMock -> {
            String partitionKey = ((FilterContext) invocationOnMock.getArgument(1)).getMsgMetadata().getPartitionKey();
            return (partitionKey.equals("K1") || partitionKey.equals("K3")) ? EntryFilter.FilterResult.REJECT : EntryFilter.FilterResult.ACCEPT;
        }).when(entryFilter)).filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class));
        setMockFilterToTopic(persistentTopic, List.of(entryFilter));
        ArrayList arrayList2 = new ArrayList();
        RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, str, "__compaction").get();
        while (((Boolean) rawReader.hasMessageAvailableAsync().get()).booleanValue()) {
            RawMessage rawMessage = (RawMessage) rawReader.readNextAsync().get();
            try {
                ByteBuf headersAndPayload = rawMessage.getHeadersAndPayload();
                Commands.skipMessageMetadata(headersAndPayload);
                byte[] bArr = new byte[headersAndPayload.readableBytes()];
                headersAndPayload.readBytes(bArr);
                arrayList2.add(new String(bArr));
                if (rawMessage != null) {
                    rawMessage.close();
                }
            } catch (Throwable th) {
                if (rawMessage != null) {
                    try {
                        rawMessage.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        rawReader.closeAsync().get();
        Assert.assertEquals(arrayList, arrayList2);
    }

    private void setMockFilterToTopic(PersistentTopic persistentTopic, List<EntryFilter> list) {
        FieldUtils.writeField(persistentTopic, "entryFilters", Pair.of((Object) null, list), true);
    }

    private void setMockBrokerFilter(List<EntryFilter> list) {
        FieldUtils.writeField(this.pulsar.getBrokerService().getEntryFilterProvider(), "brokerEntryFilters", list, true);
    }

    @Test
    public void testFilter() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("1", "1");
        hashMap.put("2", "2");
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionProperties(hashMap).isAckReceiptEnabled(true).subscriptionName("sub").subscribe();
        PersistentSubscription subscription = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("sub");
        Dispatcher dispatcher = subscription.getDispatcher();
        Field declaredField = EntryFilterSupport.class.getDeclaredField("entryFilters");
        declaredField.setAccessible(true);
        Field declaredField2 = EntryFilterSupport.class.getDeclaredField("hasFilter");
        declaredField2.setAccessible(true);
        NarClassLoader narClassLoader = (NarClassLoader) Mockito.mock(NarClassLoader.class);
        EntryFilterWithClassLoader entryFilterWithClassLoader = (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, new EntryFilterTest(), narClassLoader, false);
        EntryFilterWithClassLoader entryFilterWithClassLoader2 = (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, new EntryFilter2Test(), narClassLoader, false);
        declaredField.set(dispatcher, List.of(entryFilterWithClassLoader, entryFilterWithClassLoader2));
        declaredField2.set(dispatcher, true);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        for (int i = 0; i < 10; i++) {
            create.send("test");
        }
        verifyBacklog(str, "sub", 10, 10, 10, 10, 0, 0, 0, 0);
        int i2 = 0;
        while (true) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            i2++;
            subscribe.acknowledge(receive);
        }
        AssertJUnit.assertEquals(10, i2);
        verifyBacklog(str, "sub", 0, 0, 0, 0, 0, 0, 0, 0);
        subscribe.close();
        MessageIdImpl messageIdImpl = null;
        for (int i3 = 0; i3 < 10; i3++) {
            messageIdImpl = (MessageIdImpl) create.newMessage().property("REJECT", "").value("1").send();
        }
        verifyBacklog(str, "sub", 10, 10, 0, 0, 10, 10, 0, 0);
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).isAckReceiptEnabled(true).subscriptionProperties(hashMap).subscriptionName("sub").subscribe();
        int i4 = 0;
        while (true) {
            Message receive2 = subscribe2.receive(5, TimeUnit.SECONDS);
            if (receive2 == null) {
                break;
            }
            i4++;
            subscribe2.acknowledge(receive2);
        }
        AssertJUnit.assertEquals(0, i4);
        verifyBacklog(str, "sub", 0, 0, 0, 0, 0, 0, 0, 0);
        AssertJUnit.assertNotNull(messageIdImpl);
        MessageIdImpl messageIdImpl2 = messageIdImpl;
        Awaitility.await().untilAsserted(() -> {
            PositionImpl markDeletedPosition = subscription.getCursor().getMarkDeletedPosition();
            AssertJUnit.assertEquals(markDeletedPosition.getLedgerId(), messageIdImpl2.getLedgerId());
            AssertJUnit.assertEquals(markDeletedPosition.getEntryId(), messageIdImpl2.getEntryId());
        });
        subscribe2.close();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionProperties(hashMap).subscriptionName("sub").subscribe();
        for (int i5 = 0; i5 < 10; i5++) {
            create.newMessage().property(String.valueOf(i5), String.valueOf(i5)).value("1").send();
        }
        int i6 = 0;
        while (true) {
            Message receive3 = subscribe3.receive(5, TimeUnit.SECONDS);
            if (receive3 == null) {
                AssertJUnit.assertEquals(2, i6);
                create.close();
                subscribe3.close();
                setMockFilterToTopic((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get(), List.of(entryFilterWithClassLoader, entryFilterWithClassLoader2));
                cleanup();
                ((EntryFilterWithClassLoader) Mockito.verify(entryFilterWithClassLoader, Mockito.times(1))).close();
                ((EntryFilterWithClassLoader) Mockito.verify(entryFilterWithClassLoader2, Mockito.times(1))).close();
                return;
            }
            i6++;
            subscribe3.acknowledge(receive3);
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topicProvider")
    public Object[][] topicProvider() {
        return new Object[]{new Object[]{"persistent://prop/ns-abc/topic" + UUID.randomUUID()}, new Object[]{"non-persistent://prop/ns-abc/topic" + UUID.randomUUID()}};
    }

    @Test(dataProvider = "topicProvider")
    public void testFilteredMsgCount(String str) throws Throwable {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").subscribe();
            try {
                Subscription subscription = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("sub");
                Dispatcher dispatcher = subscription.getDispatcher();
                Field declaredField = EntryFilterSupport.class.getDeclaredField("entryFilters");
                declaredField.setAccessible(true);
                Field declaredField2 = EntryFilterSupport.class.getDeclaredField("hasFilter");
                declaredField2.setAccessible(true);
                NarClassLoader narClassLoader = (NarClassLoader) Mockito.mock(NarClassLoader.class);
                declaredField.set(dispatcher, List.of((EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterTest(), narClassLoader, false), (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilter2Test(), narClassLoader, false)));
                declaredField2.set(dispatcher, true);
                for (int i = 0; i < 10; i++) {
                    create.send("test");
                }
                for (int i2 = 0; i2 < 10; i2++) {
                    AssertJUnit.assertNotNull(create.newMessage().property("REJECT", "").value("1").send());
                }
                int i3 = 0;
                while (true) {
                    Message receive = subscribe.receive(10, TimeUnit.SECONDS);
                    if (receive == null) {
                        break;
                    }
                    i3++;
                    AssertJUnit.assertEquals((String) receive.getValue(), "test");
                    subscribe.acknowledge(receive);
                }
                AssertJUnit.assertEquals(10, i3);
                AssertJUnit.assertEquals(subscription.getTopic().getFilteredEntriesCount(), 10L);
                if (subscribe != null) {
                    subscribe.close();
                }
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
        builder.configOverride(serviceConfiguration -> {
            serviceConfiguration.setNumExecutorThreadPoolSize(3);
        });
    }

    @Test(enabled = false)
    public void testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription() throws Throwable {
        Assert.assertTrue(this.pulsar.getConfiguration().isSubscriptionRedeliveryTrackerEnabled());
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        HashMap hashMap = new HashMap();
        hashMap.put("matchValueAccept", "FOR-1");
        hashMap.put("matchValueReschedule", "FOR-2");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("matchValueAccept", "FOR-2");
        hashMap2.put("matchValueReschedule", "FOR-1");
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionType(SubscriptionType.Shared).properties(hashMap).consumerName("consumer1").receiverQueueSize(100).subscriptionName("sub").subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).properties(hashMap2).consumerName("consumer2").topic(new String[]{str}).receiverQueueSize(100).subscriptionName("sub").subscribe();
                try {
                    Dispatcher dispatcher = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("sub").getDispatcher();
                    Field declaredField = EntryFilterSupport.class.getDeclaredField("entryFilters");
                    declaredField.setAccessible(true);
                    Field declaredField2 = EntryFilterSupport.class.getDeclaredField("hasFilter");
                    declaredField2.setAccessible(true);
                    NarClassLoader narClassLoader = (NarClassLoader) Mockito.mock(NarClassLoader.class);
                    declaredField.set(dispatcher, List.of((EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterTest(), narClassLoader, false), (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterTest(), narClassLoader, false)));
                    declaredField2.set(dispatcher, true);
                    for (int i = 0; i < 200; i++) {
                        if (i % 2 == 0) {
                            String str2 = "consumer-1 " + ((i / 2) + 1);
                            create.newMessage().property("FOR-1", "").property("debug", str2).value(str2).send();
                        } else {
                            String str3 = "consumer-2 " + ((i + 1) / 2);
                            create.newMessage().property("FOR-2", "").property("debug", str3).value(str3).send();
                        }
                    }
                    CompletableFuture completableFuture = new CompletableFuture();
                    this.pulsar.getExecutor().submit(() -> {
                        int i2 = 0;
                        while (i2 < 100) {
                            try {
                                Message receive = subscribe.receive(1, TimeUnit.MINUTES);
                                if (receive != null) {
                                    i2++;
                                    log.info("received1 {} - {} - {}", new Object[]{receive.getValue(), receive.getProperties(), Integer.valueOf(i2)});
                                    Assert.assertTrue(((String) receive.getValue()).startsWith("consumer-1 "), (String) receive.getValue());
                                    subscribe.acknowledgeAsync(receive);
                                } else {
                                    completableFuture.completeExceptionally(new Exception("consumer1 did not receive all the messages"));
                                }
                            } catch (Throwable th) {
                                completableFuture.completeExceptionally(th);
                                return;
                            }
                        }
                        completableFuture.complete(null);
                    });
                    CompletableFuture completableFuture2 = new CompletableFuture();
                    this.pulsar.getExecutor().submit(() -> {
                        int i2 = 0;
                        while (i2 < 100) {
                            try {
                                Message receive = subscribe2.receive(1, TimeUnit.MINUTES);
                                if (receive != null) {
                                    i2++;
                                    log.info("received2 {} - {} - {}", new Object[]{receive.getValue(), receive.getProperties(), Integer.valueOf(i2)});
                                    Assert.assertTrue(((String) receive.getValue()).startsWith("consumer-2 "), (String) receive.getValue());
                                    subscribe2.acknowledgeAsync(receive);
                                } else {
                                    completableFuture2.completeExceptionally(new Exception("consumer2 did not receive all the messages"));
                                }
                            } catch (Throwable th) {
                                completableFuture2.completeExceptionally(th);
                                return;
                            }
                        }
                        completableFuture2.complete(null);
                    });
                    completableFuture.get(1L, TimeUnit.MINUTES);
                    completableFuture2.get(1L, TimeUnit.MINUTES);
                    if (subscribe2 != null) {
                        subscribe2.close();
                    }
                    if (subscribe != null) {
                        subscribe.close();
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (subscribe2 != null) {
                        try {
                            subscribe2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void verifyBacklog(String str, String str2, int i, int i2, int i3, int i4, int i5, int i6, int i7, int i8) throws Exception {
        AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog = this.admin.topics().analyzeSubscriptionBacklog(str, str2, Optional.empty());
        Assert.assertEquals(i, analyzeSubscriptionBacklog.getEntries());
        Assert.assertEquals(i3, analyzeSubscriptionBacklog.getFilterAcceptedEntries());
        Assert.assertEquals(i5, analyzeSubscriptionBacklog.getFilterRejectedEntries());
        Assert.assertEquals(i7, analyzeSubscriptionBacklog.getFilterRescheduledEntries());
        Assert.assertEquals(i2, analyzeSubscriptionBacklog.getMessages());
        Assert.assertEquals(i4, analyzeSubscriptionBacklog.getFilterAcceptedMessages());
        Assert.assertEquals(i6, analyzeSubscriptionBacklog.getFilterRejectedMessages());
        Assert.assertEquals(i8, analyzeSubscriptionBacklog.getFilterRescheduledMessages());
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "overrideBrokerEntryFilters")
    public static Object[][] overrideBrokerEntryFilters() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Test(dataProvider = "overrideBrokerEntryFilters")
    public void testExecuteInOrder(boolean z) throws Exception {
        this.conf.setAllowOverrideEntryFilters(true);
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        for (int i = 0; i < 10; i++) {
            create.send("test");
        }
        EntryFilter entryFilter = (EntryFilter) Mockito.mock(EntryFilter.class);
        Mockito.when(entryFilter.filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class))).thenReturn(EntryFilter.FilterResult.REJECT);
        EntryFilter entryFilter2 = (EntryFilter) Mockito.mock(EntryFilter.class);
        Mockito.when(entryFilter2.filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class))).thenReturn(EntryFilter.FilterResult.ACCEPT);
        if (z) {
            setMockFilterToTopic((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get(), List.of(entryFilter, entryFilter2));
        } else {
            setMockFilterToTopic((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get(), List.of());
            setMockBrokerFilter(List.of(entryFilter, entryFilter2));
        }
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub").subscribe();
        int i2 = 0;
        while (true) {
            Message receive = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            i2++;
            subscribe.acknowledge(receive);
        }
        AssertJUnit.assertEquals(0, i2);
        subscribe.close();
        ((EntryFilter) Mockito.verify(entryFilter, Mockito.times(10))).filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class));
        ((EntryFilter) Mockito.verify(entryFilter2, Mockito.never())).filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class));
        if (z) {
            setMockFilterToTopic((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get(), List.of(entryFilter2, entryFilter));
        } else {
            setMockFilterToTopic((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get(), List.of());
            setMockBrokerFilter(List.of(entryFilter2, entryFilter));
        }
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub" + "-2").subscribe();
        int i3 = 0;
        while (true) {
            try {
                Message receive2 = subscribe2.receive(1, TimeUnit.SECONDS);
                if (receive2 == null) {
                    break;
                }
                i3++;
                subscribe2.acknowledge(receive2);
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe2).get(0) != null) {
                    subscribe2.close();
                }
                throw th;
            }
        }
        AssertJUnit.assertEquals(0, i3);
        ((EntryFilter) Mockito.verify(entryFilter, Mockito.times(20))).filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class));
        ((EntryFilter) Mockito.verify(entryFilter2, Mockito.times(10))).filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class));
        if (Collections.singletonList(subscribe2).get(0) != null) {
            subscribe2.close();
        }
    }
}
