package org.apache.pulsar.broker.service.plugin;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/plugin/FilterEntryTest.class */
public class FilterEntryTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    public void testFilter() throws Exception {
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").subscribe();
        PersistentSubscription subscription = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("sub");
        Dispatcher dispatcher = subscription.getDispatcher();
        Field declaredField = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
        declaredField.setAccessible(true);
        NarClassLoader narClassLoader = (NarClassLoader) Mockito.mock(NarClassLoader.class);
        EntryFilterWithClassLoader entryFilterWithClassLoader = (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterTest(), narClassLoader);
        EntryFilterWithClassLoader entryFilterWithClassLoader2 = (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilter2Test(), narClassLoader);
        declaredField.set(dispatcher, ImmutableList.of(entryFilterWithClassLoader, entryFilterWithClassLoader2));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        for (int i = 0; i < 10; i++) {
            create.send("test");
        }
        int i2 = 0;
        while (true) {
            Message receive = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            i2++;
            subscribe.acknowledge(receive);
        }
        AssertJUnit.assertEquals(10, i2);
        MessageIdImpl messageIdImpl = null;
        for (int i3 = 0; i3 < 10; i3++) {
            messageIdImpl = (MessageIdImpl) create.newMessage().property("REJECT", "").value("1").send();
        }
        int i4 = 0;
        while (true) {
            Message receive2 = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive2 == null) {
                break;
            }
            i4++;
            subscribe.acknowledge(receive2);
        }
        AssertJUnit.assertEquals(0, i4);
        AssertJUnit.assertNotNull(messageIdImpl);
        MessageIdImpl messageIdImpl2 = messageIdImpl;
        Awaitility.await().untilAsserted(() -> {
            PositionImpl markDeletedPosition = subscription.getCursor().getMarkDeletedPosition();
            AssertJUnit.assertEquals(markDeletedPosition.getLedgerId(), messageIdImpl2.getLedgerId());
            AssertJUnit.assertEquals(markDeletedPosition.getEntryId(), messageIdImpl2.getEntryId());
        });
        subscribe.close();
        HashMap hashMap = new HashMap();
        hashMap.put("1", "1");
        hashMap.put("2", "2");
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionProperties(hashMap).subscriptionName("sub").subscribe();
        for (int i5 = 0; i5 < 10; i5++) {
            create.newMessage().property(String.valueOf(i5), String.valueOf(i5)).value("1").send();
        }
        int i6 = 0;
        while (true) {
            Message receive3 = subscribe2.receive(1, TimeUnit.SECONDS);
            if (receive3 == null) {
                AssertJUnit.assertEquals(2, i6);
                create.close();
                subscribe2.close();
                BrokerService brokerService = this.pulsar.getBrokerService();
                Field declaredField2 = BrokerService.class.getDeclaredField("entryFilters");
                declaredField2.setAccessible(true);
                declaredField2.set(brokerService, ImmutableMap.of("1", entryFilterWithClassLoader, "2", entryFilterWithClassLoader2));
                cleanup();
                ((EntryFilterWithClassLoader) Mockito.verify(entryFilterWithClassLoader, Mockito.times(1))).close();
                ((EntryFilterWithClassLoader) Mockito.verify(entryFilterWithClassLoader2, Mockito.times(1))).close();
                return;
            }
            i6++;
            subscribe2.acknowledge(receive3);
        }
    }
}
