package org.apache.pulsar.broker.delayed;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Clock;
import java.util.Collections;
import java.util.NavigableMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.class */
public abstract class AbstractDeliveryTrackerTest {
    protected final Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), 500, TimeUnit.MILLISECONDS);
    protected AbstractPersistentDispatcherMultipleConsumers dispatcher;
    protected Clock clock;
    protected AtomicLong clockTime;

    @AfterClass(alwaysRun = true)
    public void cleanup() {
        this.timer.stop();
    }

    @Test(dataProvider = "delayedTracker")
    public void test(DelayedDeliveryTracker delayedDeliveryTracker) throws Exception {
        Assert.assertFalse(delayedDeliveryTracker.hasMessageAvailable());
        Assert.assertTrue(delayedDeliveryTracker.addMessage(1L, 2L, 20L));
        Assert.assertTrue(delayedDeliveryTracker.addMessage(2L, 1L, 10L));
        Assert.assertTrue(delayedDeliveryTracker.addMessage(3L, 3L, 30L));
        Assert.assertTrue(delayedDeliveryTracker.addMessage(4L, 5L, 50L));
        Assert.assertTrue(delayedDeliveryTracker.addMessage(5L, 4L, 40L));
        Assert.assertFalse(delayedDeliveryTracker.hasMessageAvailable());
        Assert.assertEquals(delayedDeliveryTracker.getNumberOfDelayedMessages(), 5L);
        Assert.assertEquals(delayedDeliveryTracker.getScheduledMessages(10), Collections.emptySet());
        this.clockTime.set(15L);
        Assert.assertFalse(delayedDeliveryTracker.addMessage(6L, 6L, 10L));
        Assert.assertEquals(delayedDeliveryTracker.getNumberOfDelayedMessages(), 5L);
        Assert.assertTrue(delayedDeliveryTracker.hasMessageAvailable());
        Assert.assertEquals(delayedDeliveryTracker.getScheduledMessages(10).size(), 1);
        this.clockTime.set(60L);
        Assert.assertEquals(delayedDeliveryTracker.getNumberOfDelayedMessages(), 4L);
        Assert.assertTrue(delayedDeliveryTracker.hasMessageAvailable());
        Assert.assertEquals(delayedDeliveryTracker.getScheduledMessages(1).size(), 1);
        Assert.assertEquals(delayedDeliveryTracker.getNumberOfDelayedMessages(), 3L);
        Assert.assertTrue(delayedDeliveryTracker.hasMessageAvailable());
        Assert.assertEquals(delayedDeliveryTracker.getScheduledMessages(3).size(), 3);
        Assert.assertEquals(delayedDeliveryTracker.getNumberOfDelayedMessages(), 0L);
        Assert.assertFalse(delayedDeliveryTracker.hasMessageAvailable());
        Assert.assertEquals(delayedDeliveryTracker.getScheduledMessages(10), Collections.emptySet());
        delayedDeliveryTracker.close();
    }

    @Test(dataProvider = "delayedTracker")
    public void testWithTimer(DelayedDeliveryTracker delayedDeliveryTracker, NavigableMap<Long, TimerTask> navigableMap) throws Exception {
        Assert.assertTrue(navigableMap.isEmpty());
        Assert.assertTrue(delayedDeliveryTracker.addMessage(2L, 2L, 20L));
        Assert.assertEquals(navigableMap.size(), 1);
        Assert.assertEquals(navigableMap.firstKey().longValue(), 20L);
        Assert.assertTrue(delayedDeliveryTracker.addMessage(1L, 1L, 10L));
        Assert.assertEquals(navigableMap.size(), 1);
        Assert.assertEquals(navigableMap.firstKey().longValue(), 10L);
        Assert.assertTrue(delayedDeliveryTracker.addMessage(3L, 3L, 30L));
        Assert.assertEquals(navigableMap.size(), 1);
        Assert.assertEquals(navigableMap.firstKey().longValue(), 10L);
        this.clockTime.set(15L);
        TimerTask value = navigableMap.pollFirstEntry().getValue();
        Timeout timeout = (Timeout) Mockito.mock(Timeout.class);
        Mockito.when(Boolean.valueOf(timeout.isCancelled())).thenReturn(true);
        value.run(timeout);
        ((AbstractPersistentDispatcherMultipleConsumers) Mockito.verify(this.dispatcher, Mockito.atMostOnce())).readMoreEntriesAsync();
        value.run((Timeout) Mockito.mock(Timeout.class));
        ((AbstractPersistentDispatcherMultipleConsumers) Mockito.verify(this.dispatcher)).readMoreEntriesAsync();
        delayedDeliveryTracker.close();
    }

    @Test(dataProvider = "delayedTracker")
    public void testAddWithinTickTime(DelayedDeliveryTracker delayedDeliveryTracker) {
        this.clockTime.set(0L);
        Assert.assertFalse(delayedDeliveryTracker.addMessage(1L, 1L, 10L));
        Assert.assertFalse(delayedDeliveryTracker.addMessage(2L, 2L, 99L));
        Assert.assertFalse(delayedDeliveryTracker.addMessage(3L, 3L, 100L));
        Assert.assertTrue(delayedDeliveryTracker.addMessage(4L, 4L, 101L));
        Assert.assertTrue(delayedDeliveryTracker.addMessage(5L, 5L, 200L));
        Assert.assertEquals(delayedDeliveryTracker.getNumberOfDelayedMessages(), 2L);
        delayedDeliveryTracker.close();
    }

    @Test(dataProvider = "delayedTracker")
    public void testAddMessageWithStrictDelay(DelayedDeliveryTracker delayedDeliveryTracker) {
        this.clockTime.set(10L);
        Assert.assertFalse(delayedDeliveryTracker.addMessage(1L, 1L, 9L));
        Assert.assertFalse(delayedDeliveryTracker.addMessage(4L, 4L, 10L));
        Assert.assertTrue(delayedDeliveryTracker.addMessage(1L, 1L, 11L));
        Assert.assertEquals(delayedDeliveryTracker.getNumberOfDelayedMessages(), 1L);
        Assert.assertFalse(delayedDeliveryTracker.hasMessageAvailable());
        delayedDeliveryTracker.close();
    }

    @Test(dataProvider = "delayedTracker")
    public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict(DelayedDeliveryTracker delayedDeliveryTracker) throws Exception {
        this.clockTime.set(10000L);
        Timeout timeout = (Timeout) Mockito.mock(Timeout.class);
        Mockito.when(Boolean.valueOf(timeout.isCancelled())).then(invocationOnMock -> {
            return false;
        });
        ((AbstractDelayedDeliveryTracker) delayedDeliveryTracker).run(timeout);
        ((AbstractPersistentDispatcherMultipleConsumers) Mockito.verify(this.dispatcher, Mockito.times(1))).readMoreEntriesAsync();
        Assert.assertTrue(delayedDeliveryTracker.addMessage(1L, 1L, 10001L));
        Thread.sleep(600L);
        ((AbstractPersistentDispatcherMultipleConsumers) Mockito.verify(this.dispatcher, Mockito.times(1))).readMoreEntriesAsync();
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((AbstractPersistentDispatcherMultipleConsumers) Mockito.verify(this.dispatcher)).readMoreEntriesAsync();
        });
        delayedDeliveryTracker.close();
    }

    @Test(dataProvider = "delayedTracker")
    public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict(DelayedDeliveryTracker delayedDeliveryTracker) {
        this.clockTime.set(500000L);
        Assert.assertTrue(delayedDeliveryTracker.addMessage(1L, 1L, 500005L));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((AbstractPersistentDispatcherMultipleConsumers) Mockito.verify(this.dispatcher)).readMoreEntriesAsync();
        });
        delayedDeliveryTracker.close();
    }

    @Test(dataProvider = "delayedTracker")
    public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict(DelayedDeliveryTracker delayedDeliveryTracker) throws Exception {
        this.clockTime.set(0L);
        Assert.assertTrue(delayedDeliveryTracker.addMessage(1L, 1L, 2000L));
        Thread.sleep(1000L);
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((AbstractPersistentDispatcherMultipleConsumers) Mockito.verify(this.dispatcher)).readMoreEntriesAsync();
        });
        delayedDeliveryTracker.close();
    }
}
