package org.apache.pulsar.broker.delayed.bucket;

import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.delayed.AbstractDeliveryTrackerTest;
import org.apache.pulsar.broker.delayed.MockBucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.MockManagedCursor;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.class */
public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTest {
    private BucketSnapshotStorage bucketSnapshotStorage;

    @AfterMethod
    public void clean() throws Exception {
        if (this.bucketSnapshotStorage != null) {
            this.bucketSnapshotStorage.close();
        }
    }

    /* JADX WARN: Type inference failed for: r0v33, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r0v35, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r0v37, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r0v39, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r0v41, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r0v43, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r0v45, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r0v47, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r0v66, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r0v68, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "delayedTracker")
    public Object[][] provider(Method method) throws Exception {
        this.dispatcher = (AbstractPersistentDispatcherMultipleConsumers) Mockito.mock(AbstractPersistentDispatcherMultipleConsumers.class);
        this.clock = (Clock) Mockito.mock(Clock.class);
        this.clockTime = new AtomicLong();
        Mockito.when(Long.valueOf(this.clock.millis())).then(invocationOnMock -> {
            return Long.valueOf(this.clockTime.get());
        });
        this.bucketSnapshotStorage = new MockBucketSnapshotStorage();
        this.bucketSnapshotStorage.start();
        MockManagedCursor mockManagedCursor = new MockManagedCursor("my_test_cursor");
        ((AbstractPersistentDispatcherMultipleConsumers) Mockito.doReturn(mockManagedCursor).when(this.dispatcher)).getCursor();
        ((AbstractPersistentDispatcherMultipleConsumers) Mockito.doReturn("persistent://public/default/testDelay / " + mockManagedCursor.getName()).when(this.dispatcher)).getName();
        String name = method.getName();
        boolean z = -1;
        switch (name.hashCode()) {
            case -1709415615:
                if (name.equals("testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict")) {
                    z = 4;
                    break;
                }
                break;
            case -967360426:
                if (name.equals("testRecoverSnapshot")) {
                    z = 6;
                    break;
                }
                break;
            case -542702994:
                if (name.equals("testWithBkException")) {
                    z = 10;
                    break;
                }
                break;
            case -331827926:
                if (name.equals("testExistDelayedMessage")) {
                    z = 8;
                    break;
                }
                break;
            case -54519869:
                if (name.equals("testWithCreateFailDowngrade")) {
                    z = 11;
                    break;
                }
                break;
            case 3556498:
                if (name.equals("test")) {
                    z = false;
                    break;
                }
                break;
            case 619604772:
                if (name.equals("testMaxIndexesPerSegment")) {
                    z = 12;
                    break;
                }
                break;
            case 771160100:
                if (name.equals("testAddWithinTickTime")) {
                    z = 2;
                    break;
                }
                break;
            case 860720810:
                if (name.equals("testMergeSnapshot")) {
                    z = 9;
                    break;
                }
                break;
            case 949580006:
                if (name.equals("testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict")) {
                    z = 5;
                    break;
                }
                break;
            case 1239878077:
                if (name.equals("testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict")) {
                    z = 7;
                    break;
                }
                break;
            case 1409774108:
                if (name.equals("testAddMessageWithStrictDelay")) {
                    z = 3;
                    break;
                }
                break;
            case 1571934637:
                if (name.equals("testWithTimer")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 1L, this.clock, false, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 50)}};
            case true:
                Timer timer = (Timer) Mockito.mock(Timer.class);
                AtomicLong atomicLong = new AtomicLong();
                Clock clock = (Clock) Mockito.mock(Clock.class);
                Mockito.when(Long.valueOf(clock.millis())).then(invocationOnMock2 -> {
                    return Long.valueOf(atomicLong.get());
                });
                TreeMap treeMap = new TreeMap();
                Mockito.when(timer.newTimeout((TimerTask) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any())).then(invocationOnMock3 -> {
                    TimerTask timerTask = (TimerTask) invocationOnMock3.getArgument(0, TimerTask.class);
                    long longValue = ((Long) invocationOnMock3.getArgument(1, Long.class)).longValue();
                    long millis = atomicLong.get() + ((TimeUnit) invocationOnMock3.getArgument(2, TimeUnit.class)).toMillis(longValue);
                    treeMap.put(Long.valueOf(millis), timerTask);
                    Timeout timeout = (Timeout) Mockito.mock(Timeout.class);
                    Mockito.when(Boolean.valueOf(timeout.cancel())).then(invocationOnMock3 -> {
                        treeMap.remove(Long.valueOf(millis), timerTask);
                        return null;
                    });
                    return timeout;
                });
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, timer, 1L, clock, false, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 50), treeMap}};
            case true:
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 100L, this.clock, false, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 50)}};
            case Test.TestMessage.INTFIELD_FIELD_NUMBER /* 3 */:
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 100L, this.clock, true, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 50)}};
            case Test.TestMessage.TESTENUM_FIELD_NUMBER /* 4 */:
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 1000L, this.clock, true, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 50)}};
            case Test.TestMessage.NESTEDFIELD_FIELD_NUMBER /* 5 */:
            case true:
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 100000L, this.clock, true, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 50)}};
            case true:
            case true:
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 500L, this.clock, true, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 50)}};
            case true:
            case true:
            case true:
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 100000L, this.clock, true, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 10)}};
            case true:
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 100000L, this.clock, true, this.bucketSnapshotStorage, 20L, TimeUnit.HOURS.toMillis(1L), 5, 100)}};
            default:
                return new Object[]{new Object[]{new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 1L, this.clock, true, this.bucketSnapshotStorage, 1000L, TimeUnit.MILLISECONDS.toMillis(100L), -1, 50)}};
        }
    }

    @org.testng.annotations.Test(dataProvider = "delayedTracker")
    public void testContainsMessage(BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker) {
        bucketDelayedDeliveryTracker.addMessage(1L, 1L, 10L);
        bucketDelayedDeliveryTracker.addMessage(2L, 2L, 20L);
        Assert.assertTrue(bucketDelayedDeliveryTracker.containsMessage(1L, 1L));
        this.clockTime.set(20L);
        Assert.assertEquals(((Position) bucketDelayedDeliveryTracker.getScheduledMessages(1).stream().findFirst().get()).getEntryId(), 1L);
        bucketDelayedDeliveryTracker.addMessage(3L, 3L, 30L);
        bucketDelayedDeliveryTracker.addMessage(4L, 4L, 30L);
        bucketDelayedDeliveryTracker.addMessage(5L, 5L, 30L);
        bucketDelayedDeliveryTracker.addMessage(6L, 6L, 30L);
        Assert.assertTrue(bucketDelayedDeliveryTracker.containsMessage(3L, 3L));
        bucketDelayedDeliveryTracker.close();
    }

    @org.testng.annotations.Test(dataProvider = "delayedTracker", invocationCount = 10)
    public void testRecoverSnapshot(BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker) throws Exception {
        for (int i = 1; i <= 100; i++) {
            bucketDelayedDeliveryTracker.addMessage(i, i, i * 10);
        }
        Assert.assertEquals(bucketDelayedDeliveryTracker.getNumberOfDelayedMessages(), 100L);
        this.clockTime.set(10L);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(immutableBucket -> {
                return immutableBucket.merging || !((CompletableFuture) immutableBucket.getSnapshotCreateFuture().get()).isDone();
            }));
        });
        Assert.assertTrue(bucketDelayedDeliveryTracker.hasMessageAvailable());
        TreeSet treeSet = new TreeSet();
        Awaitility.await().untilAsserted(() -> {
            treeSet.addAll(bucketDelayedDeliveryTracker.getScheduledMessages(100));
            Assert.assertEquals(treeSet.size(), 1);
        });
        bucketDelayedDeliveryTracker.addMessage(101L, 101L, 1010L);
        bucketDelayedDeliveryTracker.close();
        this.clockTime.set(300L);
        BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker2 = new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 1000L, this.clock, true, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 50);
        AssertJUnit.assertFalse(bucketDelayedDeliveryTracker2.containsMessage(101L, 101L));
        Assert.assertEquals(bucketDelayedDeliveryTracker2.getNumberOfDelayedMessages(), 70L);
        this.clockTime.set(1000L);
        Assert.assertTrue(bucketDelayedDeliveryTracker2.hasMessageAvailable());
        TreeSet treeSet2 = new TreeSet();
        Awaitility.await().untilAsserted(() -> {
            treeSet2.addAll(bucketDelayedDeliveryTracker2.getScheduledMessages(70));
            Assert.assertEquals(treeSet2.size(), 70);
        });
        int i2 = 31;
        Iterator it = treeSet2.iterator();
        while (it.hasNext()) {
            Assert.assertEquals((Position) it.next(), PositionFactory.create(i2, i2));
            i2++;
        }
        bucketDelayedDeliveryTracker2.close();
    }

    @org.testng.annotations.Test
    public void testRoaringBitmapSerialize() {
        List<Long> of = List.of(1L, 3L, 5L, 10L, 16L, 18L, 999L, 0L);
        RoaringBitmap roaringBitmap = new RoaringBitmap();
        for (Long l : of) {
            roaringBitmap.add(l.longValue(), l.longValue() + 1);
        }
        Assert.assertEquals(roaringBitmap.getCardinality(), of.size());
        for (Long l2 : of) {
            Assert.assertTrue(roaringBitmap.contains(l2.longValue(), l2.longValue() + 1));
        }
        byte[] bArr = new byte[roaringBitmap.serializedSizeInBytes()];
        roaringBitmap.serialize(ByteBuffer.wrap(bArr));
        RoaringBitmap roaringBitmap2 = new ImmutableRoaringBitmap(ByteBuffer.wrap(bArr)).toRoaringBitmap();
        Assert.assertEquals(roaringBitmap2.getCardinality(), of.size());
        for (Long l3 : of) {
            Assert.assertTrue(roaringBitmap2.contains(l3.longValue(), l3.longValue() + 1));
        }
        byte[] bArr2 = new byte[roaringBitmap2.serializedSizeInBytes()];
        roaringBitmap.serialize(ByteBuffer.wrap(bArr2));
        Assert.assertTrue(Arrays.equals(bArr, bArr2));
        Assert.assertNotSame(bArr, bArr2);
    }

    @org.testng.annotations.Test(dataProvider = "delayedTracker")
    public void testMergeSnapshot(BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker) throws Exception {
        for (int i = 1; i <= 110; i++) {
            bucketDelayedDeliveryTracker.addMessage(i, i, i * 10);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(immutableBucket -> {
                    return immutableBucket.merging;
                }));
            });
        }
        Assert.assertEquals(110L, bucketDelayedDeliveryTracker.getNumberOfDelayedMessages());
        Assert.assertTrue(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().size() <= 10);
        bucketDelayedDeliveryTracker.addMessage(111L, 1011L, 1110L);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(immutableBucket -> {
                return immutableBucket.merging;
            }));
        });
        MutableLong mutableLong = new MutableLong();
        bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().forEach((range, immutableBucket) -> {
            mutableLong.add(immutableBucket.getNumberBucketDelayedMessages());
        });
        bucketDelayedDeliveryTracker.close();
        BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker2 = new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 1000L, this.clock, true, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 10);
        Assert.assertEquals(bucketDelayedDeliveryTracker2.getNumberOfDelayedMessages(), mutableLong.getValue());
        for (int i2 = 1; i2 <= 110; i2++) {
            bucketDelayedDeliveryTracker2.addMessage(i2, i2, i2 * 10);
        }
        this.clockTime.set(1100L);
        TreeSet treeSet = new TreeSet();
        Awaitility.await().untilAsserted(() -> {
            treeSet.addAll(bucketDelayedDeliveryTracker2.getScheduledMessages(110));
            Assert.assertEquals(treeSet.size(), 110);
        });
        for (int i3 = 1; i3 <= 110; i3++) {
            Assert.assertEquals((Position) treeSet.pollFirst(), PositionFactory.create(i3, i3));
        }
        bucketDelayedDeliveryTracker2.close();
    }

    @org.testng.annotations.Test(dataProvider = "delayedTracker")
    public void testWithBkException(BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker) throws Exception {
        MockBucketSnapshotStorage mockBucketSnapshotStorage = (MockBucketSnapshotStorage) this.bucketSnapshotStorage;
        mockBucketSnapshotStorage.injectCreateException(new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry"));
        mockBucketSnapshotStorage.injectGetMetaDataException(new BucketSnapshotPersistenceException("Bookie operation timeout, op: Get entry"));
        mockBucketSnapshotStorage.injectGetSegmentException(new BucketSnapshotPersistenceException("Bookie operation timeout, op: Get entry"));
        mockBucketSnapshotStorage.injectDeleteException(new BucketSnapshotPersistenceException("Bookie operation timeout, op: Delete entry"));
        Assert.assertEquals(1, mockBucketSnapshotStorage.createExceptionQueue.size());
        Assert.assertEquals(1, mockBucketSnapshotStorage.getMetaDataExceptionQueue.size());
        Assert.assertEquals(1, mockBucketSnapshotStorage.getSegmentExceptionQueue.size());
        Assert.assertEquals(1, mockBucketSnapshotStorage.deleteExceptionQueue.size());
        for (int i = 1; i <= 110; i++) {
            bucketDelayedDeliveryTracker.addMessage(i, i, i * 10);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(immutableBucket -> {
                    return immutableBucket.merging;
                }));
            });
        }
        Assert.assertEquals(110L, bucketDelayedDeliveryTracker.getNumberOfDelayedMessages());
        Assert.assertTrue(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().size() <= 10);
        bucketDelayedDeliveryTracker.addMessage(111L, 1011L, 1110L);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(immutableBucket -> {
                return immutableBucket.merging;
            }));
        });
        MutableLong mutableLong = new MutableLong();
        bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().forEach((range, immutableBucket) -> {
            mutableLong.add(immutableBucket.getNumberBucketDelayedMessages());
        });
        bucketDelayedDeliveryTracker.close();
        BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker2 = new BucketDelayedDeliveryTracker(this.dispatcher, this.timer, 1000L, this.clock, true, this.bucketSnapshotStorage, 5L, TimeUnit.MILLISECONDS.toMillis(10L), -1, 10);
        Long value = mutableLong.getValue();
        Assert.assertEquals(bucketDelayedDeliveryTracker2.getNumberOfDelayedMessages(), value);
        this.clockTime.set(1100L);
        mockBucketSnapshotStorage.injectGetSegmentException(new BucketSnapshotPersistenceException("Bookie operation timeout1, op: Get entry"));
        mockBucketSnapshotStorage.injectGetSegmentException(new BucketSnapshotPersistenceException("Bookie operation timeout2, op: Get entry"));
        mockBucketSnapshotStorage.injectGetSegmentException(new BucketSnapshotPersistenceException("Bookie operation timeout3, op: Get entry"));
        mockBucketSnapshotStorage.injectGetSegmentException(new BucketSnapshotPersistenceException("Bookie operation timeout4, op: Get entry"));
        Assert.assertEquals(bucketDelayedDeliveryTracker2.getScheduledMessages(100).size(), 0);
        TreeSet treeSet = new TreeSet();
        Awaitility.await().untilAsserted(() -> {
            treeSet.addAll(bucketDelayedDeliveryTracker2.getScheduledMessages(100));
            Assert.assertEquals(treeSet.size(), value);
        });
        Assert.assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty());
        Assert.assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());
        Assert.assertTrue(mockBucketSnapshotStorage.getSegmentExceptionQueue.isEmpty());
        Assert.assertTrue(mockBucketSnapshotStorage.deleteExceptionQueue.isEmpty());
        bucketDelayedDeliveryTracker2.close();
    }

    @org.testng.annotations.Test(dataProvider = "delayedTracker")
    public void testWithCreateFailDowngrade(BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker) {
        MockBucketSnapshotStorage mockBucketSnapshotStorage = (MockBucketSnapshotStorage) this.bucketSnapshotStorage;
        mockBucketSnapshotStorage.injectCreateException(new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry"));
        mockBucketSnapshotStorage.injectCreateException(new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry"));
        mockBucketSnapshotStorage.injectCreateException(new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry"));
        mockBucketSnapshotStorage.injectCreateException(new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry"));
        Assert.assertEquals(4, mockBucketSnapshotStorage.createExceptionQueue.size());
        for (int i = 1; i <= 6; i++) {
            bucketDelayedDeliveryTracker.addMessage(i, i, i * 10);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(0, bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().size());
        });
        this.clockTime.set(50L);
        Assert.assertEquals(6L, bucketDelayedDeliveryTracker.getNumberOfDelayedMessages());
        NavigableSet scheduledMessages = bucketDelayedDeliveryTracker.getScheduledMessages(5);
        for (int i2 = 1; i2 <= 5; i2++) {
            Assert.assertEquals((Position) scheduledMessages.pollFirst(), PositionFactory.create(i2, i2));
        }
    }

    @org.testng.annotations.Test(dataProvider = "delayedTracker")
    public void testMaxIndexesPerSegment(BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker) {
        for (int i = 1; i <= 101; i++) {
            bucketDelayedDeliveryTracker.addMessage(i, i, i * 10);
        }
        Assert.assertEquals(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().size(), 5);
        bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().forEach((range, immutableBucket) -> {
            Assert.assertEquals(immutableBucket.getLastSegmentEntryId(), 4);
        });
        bucketDelayedDeliveryTracker.close();
    }

    @org.testng.annotations.Test(dataProvider = "delayedTracker")
    public void testClear(BucketDelayedDeliveryTracker bucketDelayedDeliveryTracker) throws ExecutionException, InterruptedException, TimeoutException {
        for (int i = 1; i <= 1001; i++) {
            bucketDelayedDeliveryTracker.addMessage(i, i, i * 10);
        }
        Assert.assertEquals(bucketDelayedDeliveryTracker.getNumberOfDelayedMessages(), 1001L);
        Assert.assertTrue(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().size() > 0);
        Assert.assertEquals(bucketDelayedDeliveryTracker.getLastMutableBucket().size(), 1L);
        bucketDelayedDeliveryTracker.clear().get(1L, TimeUnit.MINUTES);
        Assert.assertEquals(bucketDelayedDeliveryTracker.getNumberOfDelayedMessages(), 0L);
        Assert.assertEquals(bucketDelayedDeliveryTracker.getImmutableBuckets().asMapOfRanges().size(), 0);
        Assert.assertEquals(bucketDelayedDeliveryTracker.getLastMutableBucket().size(), 0L);
        Assert.assertEquals(bucketDelayedDeliveryTracker.getSharedBucketPriorityQueue().size(), 0L);
        bucketDelayedDeliveryTracker.close();
    }
}
