package org.apache.pulsar.broker.service.streamingdispatch;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Stack;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.class */
public class StreamingEntryReaderTests extends MockedBookKeeperTestCase {
    private static final Charset Encoding = StandardCharsets.UTF_8;
    private PersistentTopic mockTopic;
    private StreamingDispatcher mockDispatcher;
    private BrokerService mockBrokerService;
    private EventLoopGroup eventLoopGroup;
    private OrderedExecutor orderedExecutor;
    private ManagedLedgerConfig config;
    private ManagedLedgerImpl ledger;
    private ManagedCursor cursor;

    protected void setUpTestCase() throws Exception {
        this.eventLoopGroup = new NioEventLoopGroup(1);
        this.orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("StreamingEntryReaderTests").build();
        this.mockTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        this.mockBrokerService = (BrokerService) Mockito.mock(BrokerService.class);
        this.mockDispatcher = (StreamingDispatcher) Mockito.mock(StreamingDispatcher.class);
        this.config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
        this.ledger = (ManagedLedgerImpl) Mockito.spy(this.factory.open("my_test_ledger", this.config));
        this.cursor = this.ledger.openCursor("test");
        Mockito.when(this.mockTopic.getBrokerService()).thenReturn(this.mockBrokerService);
        Mockito.when(this.mockBrokerService.executor()).thenReturn(this.eventLoopGroup);
        Mockito.when(this.mockBrokerService.getTopicOrderedExecutor()).thenReturn(this.orderedExecutor);
        ((StreamingDispatcher) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReaderTests.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m127answer(InvocationOnMock invocationOnMock) {
                return null;
            }
        }).when(this.mockDispatcher)).notifyConsumersEndOfTopic();
    }

    protected void cleanUpTestCase() {
        if (this.eventLoopGroup != null) {
            this.eventLoopGroup.shutdownNow();
            this.eventLoopGroup = null;
        }
        if (this.orderedExecutor != null) {
            this.orderedExecutor.shutdownNow();
            this.orderedExecutor = null;
        }
    }

    @Test
    public void testCanReadEntryFromMLedgerHappyPath() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Stack stack = new Stack();
        for (int i = 0; i < 150; i++) {
            this.ledger.addEntry(String.format("message-%d", Integer.valueOf(i)).getBytes(Encoding));
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader(this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher) Mockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry) invocationOnMock.getArgument(0, Entry.class);
            stack.push(entry.getPosition());
            Assert.assertEquals(new String(entry.getData()), String.format("message-%d", Integer.valueOf(atomicInteger.getAndIncrement())));
            this.cursor.seek(this.ledger.getNextValidPosition(entry.getPosition()));
            return null;
        }).when(this.mockDispatcher)).readEntryComplete((Entry) ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest) ArgumentMatchers.any(PendingReadEntryRequest.class));
        streamingEntryReader.asyncReadEntries(50, 700L, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 50);
        });
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        streamingEntryReader.asyncReadEntries(50, 700L, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 100);
        });
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        streamingEntryReader.asyncReadEntries(50, 700L, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 150);
        });
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
    }

    @Test
    public void testCanReadEntryFromMLedgerSizeExceededLimit() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Stack stack = new Stack();
        ArrayList arrayList = new ArrayList();
        int length = "mmmmmmmmmmessage-0".getBytes().length;
        for (int i = 0; i < 15; i++) {
            this.ledger.addEntry(String.format("mmmmmmmmmmessage-%d", Integer.valueOf(i)).getBytes(Encoding));
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader(this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher) Mockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry) invocationOnMock.getArgument(0, Entry.class);
            stack.push(entry.getPosition());
            arrayList.add(new String(entry.getData()));
            this.cursor.seek(this.ledger.getNextValidPosition(entry.getPosition()));
            return null;
        }).when(this.mockDispatcher)).readEntryComplete((Entry) ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest) ArgumentMatchers.any(PendingReadEntryRequest.class));
        ((StreamingDispatcher) Mockito.doAnswer(invocationOnMock2 -> {
            atomicBoolean.set(true);
            return null;
        }).when(this.mockDispatcher)).canReadMoreEntries(ArgumentMatchers.anyBoolean());
        final PositionImpl positionAfterN = this.ledger.getPositionAfterN(this.ledger.getFirstPosition(), 3L, ManagedLedgerImpl.PositionBound.startExcluded);
        ((ManagedLedgerImpl) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReaderTests.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m128answer(InvocationOnMock invocationOnMock3) throws Throwable {
                AsyncCallbacks.ReadEntryCallback readEntryCallback = (AsyncCallbacks.ReadEntryCallback) invocationOnMock3.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
                OrderedScheduler orderedScheduler = StreamingEntryReaderTests.this.executor;
                PositionImpl positionImpl = positionAfterN;
                orderedScheduler.schedule(() -> {
                    readEntryCallback.readEntryComplete(EntryImpl.create(positionImpl.getLedgerId(), positionImpl.getEntryId(), "mmmmmmmmmmessage-2".getBytes()), invocationOnMock3.getArgument(2));
                }, 200L, TimeUnit.MILLISECONDS);
                return null;
            }
        }).when(this.ledger)).asyncReadEntry((PositionImpl) ArgumentMatchers.eq(positionAfterN), (AsyncCallbacks.ReadEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        streamingEntryReader.asyncReadEntries(6, (length * 2) + 1, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        Assert.assertEquals(arrayList.size(), 2);
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        Mockito.reset(new ManagedLedgerImpl[]{this.ledger});
        atomicBoolean.set(false);
        streamingEntryReader.asyncReadEntries(6, (length * 2) + 1, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        atomicBoolean.set(false);
        streamingEntryReader.asyncReadEntries(6, (length * 2) + 1, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        atomicBoolean.set(false);
        streamingEntryReader.asyncReadEntries(6, (length * 2) + 1, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        Assert.assertEquals(arrayList.size(), 8);
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(String.format("mmmmmmmmmmessage-%d", Integer.valueOf(i2)), (String) arrayList.get(i2));
        }
    }

    @Test
    public void testCanReadEntryFromMLedgerWaitingForNewEntry() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Stack stack = new Stack();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 7; i++) {
            this.ledger.addEntry(String.format("message-%d", Integer.valueOf(i)).getBytes(Encoding));
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader(this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher) Mockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry) invocationOnMock.getArgument(0, Entry.class);
            stack.push(entry.getPosition());
            arrayList.add(new String(entry.getData()));
            atomicInteger.getAndIncrement();
            this.cursor.seek(this.ledger.getNextValidPosition(entry.getPosition()));
            atomicBoolean.set(true);
            return null;
        }).when(this.mockDispatcher)).readEntryComplete((Entry) ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest) ArgumentMatchers.any(PendingReadEntryRequest.class));
        streamingEntryReader.asyncReadEntries(5, 100L, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 5);
        });
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        streamingEntryReader.asyncReadEntries(5, 100L, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 7);
        });
        atomicBoolean.set(false);
        this.ledger.addEntry("message-7".getBytes(Encoding));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        Assert.assertEquals(arrayList.size(), 8);
        atomicBoolean.set(false);
        this.ledger.addEntry("message-8".getBytes(Encoding));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        Assert.assertEquals(arrayList.size(), 9);
        atomicBoolean.set(false);
        this.ledger.addEntry("message-9".getBytes(Encoding));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        Assert.assertEquals(arrayList.size(), 10);
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(String.format("message-%d", Integer.valueOf(i2)), (String) arrayList.get(i2));
        }
    }

    @Test
    public void testCanCancelReadEntryRequestAndResumeReading() throws Exception {
        final HashMap hashMap = new HashMap();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Stack stack = new Stack();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 20; i++) {
            String format = String.format("message-%d", Integer.valueOf(i));
            hashMap.put(this.ledger.addEntry(format.getBytes(Encoding)), format);
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader(this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher) Mockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry) invocationOnMock.getArgument(0, Entry.class);
            stack.push(entry.getPosition());
            arrayList.add(new String(entry.getData()));
            this.cursor.seek(this.ledger.getNextValidPosition(entry.getPosition()));
            return null;
        }).when(this.mockDispatcher)).readEntryComplete((Entry) ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest) ArgumentMatchers.any(PendingReadEntryRequest.class));
        ((ManagedLedgerImpl) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReaderTests.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m129answer(InvocationOnMock invocationOnMock2) {
                AsyncCallbacks.ReadEntryCallback readEntryCallback = (AsyncCallbacks.ReadEntryCallback) invocationOnMock2.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
                PositionImpl positionImpl = (PositionImpl) invocationOnMock2.getArgument(0, PositionImpl.class);
                if (atomicInteger.getAndIncrement() >= 5) {
                    return null;
                }
                readEntryCallback.readEntryComplete(EntryImpl.create(positionImpl.getLedgerId(), positionImpl.getEntryId(), ((String) hashMap.get(positionImpl)).getBytes()), invocationOnMock2.getArgument(2));
                return null;
            }
        }).when(this.ledger)).asyncReadEntry((PositionImpl) ArgumentMatchers.any(), (AsyncCallbacks.ReadEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        streamingEntryReader.asyncReadEntries(20, 200L, (Object) null);
        streamingEntryReader.cancelReadRequests();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(streamingEntryReader.getState() == StreamingEntryReader.State.Canceled);
        });
        Assert.assertEquals(arrayList.size(), 5);
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        Mockito.reset(new ManagedLedgerImpl[]{this.ledger});
        streamingEntryReader.asyncReadEntries(15, 200L, (Object) null);
        streamingEntryReader.cancelReadRequests();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
        });
        Assert.assertEquals(arrayList.size(), 20);
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(String.format("message-%d", Integer.valueOf(i2)), (String) arrayList.get(i2));
        }
    }

    @Test
    public void testCanHandleExceptionAndRetry() throws Exception {
        final HashMap hashMap = new HashMap();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Stack stack = new Stack();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 12; i++) {
            String format = String.format("message-%d", Integer.valueOf(i));
            hashMap.put(this.ledger.addEntry(format.getBytes(Encoding)), format);
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader(this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher) Mockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry) invocationOnMock.getArgument(0, Entry.class);
            stack.push(entry.getPosition());
            arrayList.add(new String(entry.getData()));
            this.cursor.seek(this.ledger.getNextValidPosition(entry.getPosition()));
            if (arrayList.size() != 6 && arrayList.size() != 12) {
                return null;
            }
            atomicBoolean.set(true);
            return null;
        }).when(this.mockDispatcher)).readEntryComplete((Entry) ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest) ArgumentMatchers.any(PendingReadEntryRequest.class));
        ((ManagedLedgerImpl) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReaderTests.4
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m130answer(InvocationOnMock invocationOnMock2) throws Throwable {
                AsyncCallbacks.ReadEntryCallback readEntryCallback = (AsyncCallbacks.ReadEntryCallback) invocationOnMock2.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
                PositionImpl positionImpl = (PositionImpl) invocationOnMock2.getArgument(0, PositionImpl.class);
                int andIncrement = atomicInteger.getAndIncrement();
                if ((andIncrement < 3 || andIncrement >= 5) && (andIncrement < 9 || andIncrement >= 11)) {
                    readEntryCallback.readEntryComplete(EntryImpl.create(positionImpl.getLedgerId(), positionImpl.getEntryId(), ((String) hashMap.get(positionImpl)).getBytes()), invocationOnMock2.getArgument(2));
                    return null;
                }
                readEntryCallback.readEntryFailed(new ManagedLedgerException.TooManyRequestsException("Fake exception."), invocationOnMock2.getArgument(2));
                return null;
            }
        }).when(this.ledger)).asyncReadEntry((PositionImpl) ArgumentMatchers.any(), (AsyncCallbacks.ReadEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        streamingEntryReader.asyncReadEntries(6, 100L, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        Assert.assertEquals(arrayList.size(), 6);
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        atomicBoolean.set(false);
        streamingEntryReader.asyncReadEntries(6, 100L, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        Assert.assertEquals(arrayList.size(), 12);
        Assert.assertEquals(this.cursor.getReadPosition(), this.ledger.getNextValidPosition((PositionImpl) stack.peek()));
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(String.format("message-%d", Integer.valueOf(i2)), (String) arrayList.get(i2));
        }
    }

    @Test
    public void testWillCancelReadAfterExhaustingRetry() throws Exception {
        final HashMap hashMap = new HashMap();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Stack stack = new Stack();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 12; i++) {
            String format = String.format("message-%d", Integer.valueOf(i));
            hashMap.put(this.ledger.addEntry(format.getBytes(Encoding)), format);
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader(this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher) Mockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry) invocationOnMock.getArgument(0, Entry.class);
            stack.push(entry.getPosition());
            this.cursor.seek(this.ledger.getNextValidPosition(entry.getPosition()));
            arrayList.add(new String(entry.getData()));
            return null;
        }).when(this.mockDispatcher)).readEntryComplete((Entry) ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest) ArgumentMatchers.any(PendingReadEntryRequest.class));
        ((ManagedLedgerImpl) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReaderTests.5
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m131answer(InvocationOnMock invocationOnMock2) throws Throwable {
                AsyncCallbacks.ReadEntryCallback readEntryCallback = (AsyncCallbacks.ReadEntryCallback) invocationOnMock2.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
                PositionImpl positionImpl = (PositionImpl) invocationOnMock2.getArgument(0, PositionImpl.class);
                if (atomicInteger.getAndIncrement() >= 3) {
                    readEntryCallback.readEntryFailed(new ManagedLedgerException.TooManyRequestsException("Fake exception."), invocationOnMock2.getArgument(2));
                    return null;
                }
                readEntryCallback.readEntryComplete(EntryImpl.create(positionImpl.getLedgerId(), positionImpl.getEntryId(), ((String) hashMap.get(positionImpl)).getBytes()), invocationOnMock2.getArgument(2));
                return null;
            }
        }).when(this.ledger)).asyncReadEntry((PositionImpl) ArgumentMatchers.any(), (AsyncCallbacks.ReadEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        streamingEntryReader.asyncReadEntries(5, 100L, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
        });
        Assert.assertEquals(arrayList.size(), 3);
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(String.format("message-%d", Integer.valueOf(i2)), (String) arrayList.get(i2));
        }
        Mockito.reset(new ManagedLedgerImpl[]{this.ledger});
        streamingEntryReader.asyncReadEntries(5, 100L, (Object) null);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
        });
        Assert.assertEquals(arrayList.size(), 8);
        for (int i3 = 0; i3 < arrayList.size(); i3++) {
            Assert.assertEquals(String.format("message-%d", Integer.valueOf(i3)), (String) arrayList.get(i3));
        }
    }
}
