package org.apache.pulsar.broker.service.persistent;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/RescheduleReadHandlerTest.class */
public class RescheduleReadHandlerTest {
    private LongSupplier readIntervalMsSupplier;
    private ScheduledExecutorService executor;
    private Runnable cancelPendingRead;
    private Runnable rescheduleReadImmediately;
    private BooleanSupplier hasPendingReadRequestThatMightWait;
    private LongSupplier readOpCounterSupplier;
    private BooleanSupplier hasEntriesInReplayQueue;
    private RescheduleReadHandler rescheduleReadHandler;

    @BeforeMethod
    public void setUp() {
        this.readIntervalMsSupplier = (LongSupplier) Mockito.mock(LongSupplier.class);
        this.executor = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
        this.cancelPendingRead = (Runnable) Mockito.mock(Runnable.class);
        this.rescheduleReadImmediately = (Runnable) Mockito.mock(Runnable.class);
        this.hasPendingReadRequestThatMightWait = (BooleanSupplier) Mockito.mock(BooleanSupplier.class);
        this.readOpCounterSupplier = (LongSupplier) Mockito.mock(LongSupplier.class);
        this.hasEntriesInReplayQueue = (BooleanSupplier) Mockito.mock(BooleanSupplier.class);
        this.rescheduleReadHandler = new RescheduleReadHandler(this.readIntervalMsSupplier, this.executor, this.cancelPendingRead, () -> {
            this.rescheduleReadImmediately.run();
        }, this.hasPendingReadRequestThatMightWait, this.readOpCounterSupplier, this.hasEntriesInReplayQueue);
    }

    @Test
    public void rescheduleReadImmediately() {
        Mockito.when(Long.valueOf(this.readIntervalMsSupplier.getAsLong())).thenReturn(0L);
        this.rescheduleReadHandler.rescheduleRead();
        ((Runnable) Mockito.verify(this.rescheduleReadImmediately)).run();
        ((ScheduledExecutorService) Mockito.verify(this.executor, Mockito.never())).schedule((Runnable) Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit) Mockito.any(TimeUnit.class));
    }

    @Test
    public void rescheduleReadWithDelay() {
        Mockito.when(Long.valueOf(this.readIntervalMsSupplier.getAsLong())).thenReturn(100L);
        this.rescheduleReadHandler.rescheduleRead();
        ((Runnable) Mockito.verify(this.rescheduleReadImmediately, Mockito.never())).run();
        ((ScheduledExecutorService) Mockito.verify(this.executor)).schedule((Runnable) Mockito.any(Runnable.class), Mockito.eq(100L), (TimeUnit) Mockito.eq(TimeUnit.MILLISECONDS));
    }

    @Test
    public void rescheduleReadWithDelayAndCancelPendingRead() {
        Mockito.when(Long.valueOf(this.readIntervalMsSupplier.getAsLong())).thenReturn(100L);
        Mockito.when(Boolean.valueOf(this.hasPendingReadRequestThatMightWait.getAsBoolean())).thenReturn(true);
        Mockito.when(Long.valueOf(this.readOpCounterSupplier.getAsLong())).thenReturn(5L);
        Mockito.when(Boolean.valueOf(this.hasEntriesInReplayQueue.getAsBoolean())).thenReturn(true);
        ((ScheduledExecutorService) Mockito.doAnswer(invocationOnMock -> {
            ((Runnable) invocationOnMock.getArgument(0)).run();
            return null;
        }).when(this.executor)).schedule((Runnable) Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit) Mockito.any(TimeUnit.class));
        this.rescheduleReadHandler.rescheduleRead();
        ((ScheduledExecutorService) Mockito.verify(this.executor)).schedule((Runnable) Mockito.any(Runnable.class), Mockito.eq(100L), (TimeUnit) Mockito.eq(TimeUnit.MILLISECONDS));
        ((Runnable) Mockito.verify(this.rescheduleReadImmediately)).run();
        ((Runnable) Mockito.verify(this.cancelPendingRead)).run();
    }

    @Test
    public void rescheduleReadWithDelayAndDontCancelPendingReadIfNoEntriesInReplayQueue() {
        Mockito.when(Long.valueOf(this.readIntervalMsSupplier.getAsLong())).thenReturn(100L);
        Mockito.when(Boolean.valueOf(this.hasPendingReadRequestThatMightWait.getAsBoolean())).thenReturn(true);
        Mockito.when(Long.valueOf(this.readOpCounterSupplier.getAsLong())).thenReturn(5L);
        Mockito.when(Boolean.valueOf(this.hasEntriesInReplayQueue.getAsBoolean())).thenReturn(false);
        ((ScheduledExecutorService) Mockito.doAnswer(invocationOnMock -> {
            ((Runnable) invocationOnMock.getArgument(0)).run();
            return null;
        }).when(this.executor)).schedule((Runnable) Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit) Mockito.any(TimeUnit.class));
        this.rescheduleReadHandler.rescheduleRead();
        ((ScheduledExecutorService) Mockito.verify(this.executor)).schedule((Runnable) Mockito.any(Runnable.class), Mockito.eq(100L), (TimeUnit) Mockito.eq(TimeUnit.MILLISECONDS));
        ((Runnable) Mockito.verify(this.rescheduleReadImmediately)).run();
        ((Runnable) Mockito.verify(this.cancelPendingRead, Mockito.never())).run();
    }

    @Test
    public void rescheduleReadBatching() {
        Mockito.when(Long.valueOf(this.readOpCounterSupplier.getAsLong())).thenReturn(5L);
        Mockito.when(Long.valueOf(this.readIntervalMsSupplier.getAsLong())).thenReturn(100L);
        AtomicReference atomicReference = new AtomicReference();
        ((ScheduledExecutorService) Mockito.doAnswer(invocationOnMock -> {
            Runnable runnable = (Runnable) invocationOnMock.getArgument(0);
            if (atomicReference.compareAndSet(null, runnable)) {
                return null;
            }
            runnable.run();
            return null;
        }).when(this.executor)).schedule((Runnable) Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit) Mockito.any(TimeUnit.class));
        this.rescheduleReadHandler.rescheduleRead();
        this.rescheduleReadHandler.rescheduleRead();
        this.rescheduleReadHandler.rescheduleRead();
        ((Runnable) atomicReference.get()).run();
        ((Runnable) Mockito.verify(this.rescheduleReadImmediately, Mockito.times(1))).run();
    }

    @Test
    public void rescheduleReadWithoutCancelPendingReadWhenReadOpCounterIncrements() {
        Mockito.when(Long.valueOf(this.readIntervalMsSupplier.getAsLong())).thenReturn(100L);
        Mockito.when(Boolean.valueOf(this.hasPendingReadRequestThatMightWait.getAsBoolean())).thenReturn(true);
        Mockito.when(Long.valueOf(this.readOpCounterSupplier.getAsLong())).thenReturn(5L).thenReturn(6L);
        Mockito.when(Boolean.valueOf(this.hasEntriesInReplayQueue.getAsBoolean())).thenReturn(true);
        ((ScheduledExecutorService) Mockito.doAnswer(invocationOnMock -> {
            ((Runnable) invocationOnMock.getArgument(0)).run();
            return null;
        }).when(this.executor)).schedule((Runnable) Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit) Mockito.any(TimeUnit.class));
        this.rescheduleReadHandler.rescheduleRead();
        ((ScheduledExecutorService) Mockito.verify(this.executor)).schedule((Runnable) Mockito.any(Runnable.class), Mockito.eq(100L), (TimeUnit) Mockito.eq(TimeUnit.MILLISECONDS));
        ((Runnable) Mockito.verify(this.rescheduleReadImmediately)).run();
        ((Runnable) Mockito.verify(this.cancelPendingRead, Mockito.never())).run();
    }
}
