package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.class */
public class MessageDuplicationTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(MessageDuplicationTest.class);
    private static final int BROKER_DEDUPLICATION_ENTRIES_INTERVAL = 10;
    private static final int BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS = 10;
    private static final String REPLICATOR_PREFIX = "foo";

    @Test
    public void testIsDuplicate() {
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setBrokerDeduplicationEntriesInterval(10);
        serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(10);
        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(pulsarService)).getConfiguration();
        MessageDeduplication messageDeduplication = (MessageDeduplication) BrokerTestUtil.spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, (PersistentTopic) Mockito.mock(PersistentTopic.class), (ManagedLedger) Mockito.mock(ManagedLedger.class));
        ((MessageDeduplication) Mockito.doReturn(true).when(messageDeduplication)).isEnabled();
        ByteBuf message = getMessage("producer1", 0L);
        Topic.PublishContext publishContext = getPublishContext("producer1", 0L);
        ByteBuf message2 = getMessage("producer2", 1L);
        Topic.PublishContext publishContext2 = getPublishContext("producer2", 1L);
        Assert.assertEquals(messageDeduplication.isDuplicate(publishContext, message), MessageDeduplication.MessageDupStatus.NotDup);
        Long l = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l);
        Assert.assertEquals(l.longValue(), 0L);
        Assert.assertEquals(messageDeduplication.isDuplicate(publishContext2, message2), MessageDeduplication.MessageDupStatus.NotDup);
        Long l2 = (Long) messageDeduplication.highestSequencedPushed.get("producer2");
        Assert.assertNotNull(l2);
        Assert.assertEquals(l2.longValue(), 1L);
        Assert.assertEquals(messageDeduplication.isDuplicate(getPublishContext("producer1", 1L), getMessage("producer1", 1L)), MessageDeduplication.MessageDupStatus.NotDup);
        Long l3 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l3);
        Assert.assertEquals(l3.longValue(), 1L);
        Assert.assertEquals(messageDeduplication.isDuplicate(getPublishContext("producer1", 5L), getMessage("producer1", 5L)), MessageDeduplication.MessageDupStatus.NotDup);
        Long l4 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l4);
        Assert.assertEquals(l4.longValue(), 5L);
        Assert.assertEquals(messageDeduplication.isDuplicate(getPublishContext("producer1", 0L), getMessage("producer1", 0L)), MessageDeduplication.MessageDupStatus.Unknown);
        Long l5 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l5);
        Assert.assertEquals(l5.longValue(), 5L);
        messageDeduplication.highestSequencedPersisted.put("producer1", 0L);
        Assert.assertEquals(messageDeduplication.isDuplicate(getPublishContext("producer1", 0L), getMessage("producer1", 0L)), MessageDeduplication.MessageDupStatus.Dup);
        Long l6 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l6);
        Assert.assertEquals(l6.longValue(), 5L);
        messageDeduplication.highestSequencedPushed.put("producer1", 0L);
        messageDeduplication.highestSequencedPersisted.put("producer1", 0L);
        ByteBuf message3 = getMessage("producer1", 0L);
        Assert.assertEquals(messageDeduplication.isDuplicate(getPublishContext("producer1", 1L, 5L), message3), MessageDeduplication.MessageDupStatus.NotDup);
        Long l7 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l7);
        Assert.assertEquals(l7.longValue(), 5L);
        Assert.assertEquals(messageDeduplication.isDuplicate(getPublishContext("producer1", 4L, 8L), message3), MessageDeduplication.MessageDupStatus.Unknown);
        Long l8 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l8);
        Assert.assertEquals(l8.longValue(), 5L);
    }

    @Test
    public void testInactiveProducerRemove() throws Exception {
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setBrokerDeduplicationEntriesInterval(10);
        serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(10);
        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
        serviceConfiguration.setBrokerDeduplicationProducerInactivityTimeoutMinutes(1);
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(pulsarService)).getConfiguration();
        MessageDeduplication messageDeduplication = (MessageDeduplication) BrokerTestUtil.spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, persistentTopic, managedLedger);
        ((MessageDeduplication) Mockito.doReturn(true).when(messageDeduplication)).isEnabled();
        ((MessageDeduplication) Mockito.doReturn((ManagedCursor) Mockito.mock(ManagedCursor.class)).when(messageDeduplication)).getManagedCursor();
        Topic.PublishContext publishContext = (Topic.PublishContext) Mockito.mock(Topic.PublishContext.class);
        Field declaredField = MessageDeduplication.class.getDeclaredField("inactiveProducers");
        declaredField.setAccessible(true);
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) declaredField.get(messageDeduplication);
        Mockito.when(Long.valueOf(publishContext.getHighestSequenceId())).thenReturn(2L);
        Mockito.when(Long.valueOf(publishContext.getSequenceId())).thenReturn(1L);
        Mockito.when(publishContext.getProducerName()).thenReturn("test1");
        messageDeduplication.isDuplicate(publishContext, (ByteBuf) null);
        Mockito.when(publishContext.getProducerName()).thenReturn("test2");
        messageDeduplication.isDuplicate(publishContext, (ByteBuf) null);
        Mockito.when(publishContext.getProducerName()).thenReturn("test3");
        messageDeduplication.isDuplicate(publishContext, (ByteBuf) null);
        messageDeduplication.producerRemoved("test1");
        messageDeduplication.producerRemoved("test2");
        messageDeduplication.producerRemoved("test3");
        messageDeduplication.purgeInactiveProducers();
        Assert.assertEquals(concurrentHashMap.size(), 3);
        ((MessageDeduplication) Mockito.doReturn(false).when(messageDeduplication)).isEnabled();
        concurrentHashMap.put("test2", Long.valueOf(System.currentTimeMillis() - 80000));
        concurrentHashMap.put("test3", Long.valueOf(System.currentTimeMillis() - 80000));
        messageDeduplication.purgeInactiveProducers();
        Assert.assertFalse(concurrentHashMap.containsKey("test2"));
        Assert.assertFalse(concurrentHashMap.containsKey("test3"));
        ((MessageDeduplication) Mockito.doReturn(true).when(messageDeduplication)).isEnabled();
        concurrentHashMap.put("test2", Long.valueOf(System.currentTimeMillis() - 70000));
        concurrentHashMap.put("test3", Long.valueOf(System.currentTimeMillis() - 70000));
        messageDeduplication.purgeInactiveProducers();
        Assert.assertFalse(concurrentHashMap.containsKey("test2"));
        Assert.assertFalse(concurrentHashMap.containsKey("test3"));
        Field declaredField2 = MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
        declaredField2.setAccessible(true);
        ConcurrentOpenHashMap concurrentOpenHashMap = (ConcurrentOpenHashMap) declaredField2.get(messageDeduplication);
        Assert.assertEquals(((Long) concurrentOpenHashMap.get("test1")).longValue(), 2L);
        Assert.assertFalse(concurrentOpenHashMap.containsKey("test2"));
        Assert.assertFalse(concurrentOpenHashMap.containsKey("test3"));
    }

    @Test
    public void testIsDuplicateWithFailure() {
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setBrokerDeduplicationEntriesInterval(10);
        serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(10);
        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(pulsarService)).getConfiguration();
        ((PulsarService) Mockito.doReturn(Mockito.mock(PulsarResources.class)).when(pulsarService)).getPulsarResources();
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        MessageDeduplication messageDeduplication = (MessageDeduplication) Mockito.spy(new MessageDeduplication(pulsarService, (PersistentTopic) Mockito.mock(PersistentTopic.class), managedLedger));
        ((MessageDeduplication) Mockito.doReturn(true).when(messageDeduplication)).isEnabled();
        EventLoopGroup eventLoopGroup = (EventLoopGroup) Mockito.mock(EventLoopGroup.class);
        ((EventLoopGroup) Mockito.doAnswer(invocationOnMock -> {
            ((Runnable) invocationOnMock.getArguments()[0]).run();
            return null;
        }).when(eventLoopGroup)).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        BrokerService brokerService = (BrokerService) Mockito.mock(BrokerService.class);
        ((BrokerService) Mockito.doReturn(eventLoopGroup).when(brokerService)).executor();
        ((BrokerService) Mockito.doReturn(pulsarService).when(brokerService)).pulsar();
        ((BrokerService) Mockito.doReturn(new BacklogQuotaManager(pulsarService)).when(brokerService)).getBacklogQuotaManager();
        PersistentTopic persistentTopic = (PersistentTopic) BrokerTestUtil.spyWithClassAndConstructorArgs(PersistentTopic.class, "topic-1", brokerService, managedLedger, messageDeduplication);
        ByteBuf message = getMessage("producer1", 0L);
        Topic.PublishContext publishContext = getPublishContext("producer1", 0L);
        ByteBuf message2 = getMessage("producer2", 1L);
        Topic.PublishContext publishContext2 = getPublishContext("producer2", 1L);
        persistentTopic.publishMessage(message, publishContext);
        persistentTopic.addComplete(new PositionImpl(0L, 1L), (ByteBuf) null, publishContext);
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(1))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        Long l = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l);
        Assert.assertEquals(l.longValue(), 0L);
        Long l2 = (Long) messageDeduplication.highestSequencedPersisted.get("producer1");
        Assert.assertNotNull(l2);
        Assert.assertEquals(l2.longValue(), 0L);
        persistentTopic.publishMessage(message2, publishContext2);
        persistentTopic.addComplete(new PositionImpl(0L, 2L), (ByteBuf) null, publishContext2);
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(2))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        Long l3 = (Long) messageDeduplication.highestSequencedPushed.get("producer2");
        Assert.assertNotNull(l3);
        Assert.assertEquals(l3.longValue(), 1L);
        Long l4 = (Long) messageDeduplication.highestSequencedPersisted.get("producer2");
        Assert.assertNotNull(l4);
        Assert.assertEquals(l4.longValue(), 1L);
        ByteBuf message3 = getMessage("producer1", 1L);
        Topic.PublishContext publishContext3 = getPublishContext("producer1", 1L);
        persistentTopic.publishMessage(message3, publishContext3);
        persistentTopic.addComplete(new PositionImpl(0L, 3L), (ByteBuf) null, publishContext3);
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(3))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        Long l5 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l5);
        Assert.assertEquals(l5.longValue(), 1L);
        Long l6 = (Long) messageDeduplication.highestSequencedPersisted.get("producer1");
        Assert.assertNotNull(l6);
        Assert.assertEquals(l6.longValue(), 1L);
        ByteBuf message4 = getMessage("producer1", 5L);
        Topic.PublishContext publishContext4 = getPublishContext("producer1", 5L);
        persistentTopic.publishMessage(message4, publishContext4);
        persistentTopic.addComplete(new PositionImpl(0L, 4L), (ByteBuf) null, publishContext4);
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(4))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        Long l7 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l7);
        Assert.assertEquals(l7.longValue(), 5L);
        Long l8 = (Long) messageDeduplication.highestSequencedPersisted.get("producer1");
        Assert.assertNotNull(l8);
        Assert.assertEquals(l8.longValue(), 5L);
        ByteBuf message5 = getMessage("producer1", 0L);
        Topic.PublishContext publishContext5 = getPublishContext("producer1", 0L);
        persistentTopic.publishMessage(message5, publishContext5);
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(4))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        Long l9 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l9);
        Assert.assertEquals(l9.longValue(), 5L);
        ((Topic.PublishContext) Mockito.verify(publishContext5, Mockito.times(1))).completed((Exception) ArgumentMatchers.eq((Object) null), ArgumentMatchers.eq(-1L), ArgumentMatchers.eq(-1L));
        persistentTopic.publishMessage(getMessage("producer1", 6L), getPublishContext("producer1", 6L));
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(5))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        Long l10 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l10);
        Assert.assertEquals(l10.longValue(), 6L);
        Long l11 = (Long) messageDeduplication.highestSequencedPersisted.get("producer1");
        Assert.assertNotNull(l11);
        Assert.assertEquals(l11.longValue(), 5L);
        ByteBuf message6 = getMessage("producer1", 6L);
        Topic.PublishContext publishContext6 = getPublishContext("producer1", 6L);
        persistentTopic.publishMessage(message6, publishContext6);
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(5))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        ((Topic.PublishContext) Mockito.verify(publishContext6, Mockito.times(1))).completed((Exception) ArgumentMatchers.any(MessageDeduplication.MessageDupUnknownException.class), ArgumentMatchers.eq(-1L), ArgumentMatchers.eq(-1L));
        persistentTopic.addComplete(new PositionImpl(0L, 5L), (ByteBuf) null, publishContext6);
        ByteBuf message7 = getMessage("producer1", 7L);
        Topic.PublishContext publishContext7 = getPublishContext("producer1", 7L);
        persistentTopic.publishMessage(message7, publishContext7);
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(6))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        persistentTopic.addFailed(new ManagedLedgerException("test"), publishContext7);
        Assert.assertEquals(messageDeduplication.highestSequencedPushed.size(), 2L);
        Assert.assertEquals(messageDeduplication.highestSequencedPersisted.size(), 2L);
        Assert.assertEquals(((Long) messageDeduplication.highestSequencedPushed.get("producer1")).longValue(), 6L);
        Assert.assertEquals(((Long) messageDeduplication.highestSequencedPersisted.get("producer1")).longValue(), 6L);
        Assert.assertEquals(((Long) messageDeduplication.highestSequencedPushed.get("producer2")).longValue(), 1L);
        Assert.assertEquals(((Long) messageDeduplication.highestSequencedPersisted.get("producer2")).longValue(), 1L);
        ((MessageDeduplication) Mockito.verify(messageDeduplication, Mockito.times(1))).resetHighestSequenceIdPushed();
        ByteBuf message8 = getMessage("producer1", 6L);
        Topic.PublishContext publishContext8 = getPublishContext("producer1", 6L);
        persistentTopic.publishMessage(message8, publishContext8);
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(6))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        ((Topic.PublishContext) Mockito.verify(publishContext8, Mockito.times(1))).completed((Exception) ArgumentMatchers.eq((Object) null), ArgumentMatchers.eq(-1L), ArgumentMatchers.eq(-1L));
        Long l12 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l12);
        Assert.assertEquals(l12.longValue(), 6L);
        ByteBuf message9 = getMessage("producer1", 8L);
        Topic.PublishContext publishContext9 = getPublishContext("producer1", 8L);
        persistentTopic.publishMessage(message9, publishContext9);
        ((ManagedLedger) Mockito.verify(managedLedger, Mockito.times(7))).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        persistentTopic.addComplete(new PositionImpl(0L, 5L), (ByteBuf) null, publishContext9);
        Long l13 = (Long) messageDeduplication.highestSequencedPushed.get("producer1");
        Assert.assertNotNull(l13);
        Assert.assertEquals(l13.longValue(), 8L);
        Long l14 = (Long) messageDeduplication.highestSequencedPersisted.get("producer1");
        Assert.assertNotNull(l14);
        Assert.assertEquals(l14.longValue(), 8L);
    }

    public ByteBuf getMessage(String str, long j) {
        return Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, new MessageMetadata().setProducerName(str).setSequenceId(j).setPublishTime(System.currentTimeMillis()), Unpooled.copiedBuffer(new byte[0]));
    }

    public Topic.PublishContext getPublishContext(final String str, final long j) {
        return (Topic.PublishContext) Mockito.spy(new Topic.PublishContext() { // from class: org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.1
            public String getProducerName() {
                return str;
            }

            public long getSequenceId() {
                return j;
            }

            public void completed(Exception exc, long j2, long j3) {
            }
        });
    }

    public Topic.PublishContext getPublishContext(final String str, final long j, final long j2) {
        return (Topic.PublishContext) Mockito.spy(new Topic.PublishContext() { // from class: org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.2
            public String getProducerName() {
                return str;
            }

            public long getSequenceId() {
                return j;
            }

            public long getHighestSequenceId() {
                return j2;
            }

            public void completed(Exception exc, long j3, long j4) {
            }
        });
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        this.conf.setBrokerDeduplicationEnabled(true);
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testMessageDeduplication() throws Exception {
        String str = "persistent://prop/ns-abc/testMessageDeduplication";
        String str2 = "test-producer";
        Producer create = this.pulsarClient.newProducer(Schema.STRING).producerName("test-producer").topic("persistent://prop/ns-abc/testMessageDeduplication").create();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://prop/ns-abc/testMessageDeduplication").get()).orElse(null);
        Assert.assertNotNull(persistentTopic);
        MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
        Assert.assertFalse(messageDeduplication.getInactiveProducers().containsKey("test-producer"));
        create.close();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(messageDeduplication.getInactiveProducers().containsKey(str2));
        });
        this.admin.topicPolicies().setDeduplicationStatus("persistent://prop/ns-abc/testMessageDeduplication", false);
        Awaitility.await().untilAsserted(() -> {
            Boolean deduplicationStatus = this.admin.topicPolicies().getDeduplicationStatus(str);
            Assert.assertNotNull(deduplicationStatus);
            Assert.assertFalse(deduplicationStatus.booleanValue());
        });
        messageDeduplication.purgeInactiveProducers();
        Assert.assertTrue(messageDeduplication.getInactiveProducers().isEmpty());
    }
}
