/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.streamingdispatch;

import com.google.common.base.Charsets;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Stack;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.annotations.Test;

@PowerMockIgnore(value={"javax.management.*", "javax.xml.parsers.*", "com.sun.org.apache.xerces.internal.jaxp.*", "ch.qos.logback.*", "org.slf4j.*", "org.apache.logging.*"})
@Test(groups={"flaky"})
@PrepareForTest(value={ManagedLedgerImpl.class})
public class StreamingEntryReaderTests
extends MockedBookKeeperTestCase {
    private static final Charset Encoding = Charsets.UTF_8;
    private PersistentTopic mockTopic;
    private StreamingDispatcher mockDispatcher;
    private BrokerService mockBrokerService;
    private EventLoopGroup eventLoopGroup;
    private OrderedExecutor orderedExecutor;
    private ManagedLedgerConfig config;
    private ManagedLedgerImpl ledger;
    private ManagedCursor cursor;

    protected void setUpTestCase() throws Exception {
        this.eventLoopGroup = new NioEventLoopGroup(1);
        this.orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("StreamingEntryReaderTests").build();
        this.mockTopic = (PersistentTopic)PowerMockito.mock(PersistentTopic.class);
        this.mockBrokerService = (BrokerService)PowerMockito.mock(BrokerService.class);
        this.mockDispatcher = (StreamingDispatcher)PowerMockito.mock(StreamingDispatcher.class);
        this.config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
        this.ledger = (ManagedLedgerImpl)PowerMockito.spy((Object)((ManagedLedgerImpl)this.factory.open("my_test_ledger", this.config)));
        this.cursor = this.ledger.openCursor("test");
        PowerMockito.when((Object)this.mockTopic.getBrokerService()).thenReturn((Object)this.mockBrokerService);
        PowerMockito.when((Object)this.mockBrokerService.executor()).thenReturn((Object)this.eventLoopGroup);
        PowerMockito.when((Object)this.mockBrokerService.getTopicOrderedExecutor()).thenReturn((Object)this.orderedExecutor);
        ((StreamingDispatcher)PowerMockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocationOnMock) {
                return null;
            }
        }).when((Object)this.mockDispatcher)).notifyConsumersEndOfTopic();
    }

    protected void cleanUpTestCase() {
        if (this.eventLoopGroup != null) {
            this.eventLoopGroup.shutdownNow();
            this.eventLoopGroup = null;
        }
        if (this.orderedExecutor != null) {
            this.orderedExecutor.shutdownNow();
            this.orderedExecutor = null;
        }
    }

    @Test
    public void testCanReadEntryFromMLedgerHappyPath() throws Exception {
        AtomicInteger entryCount = new AtomicInteger(0);
        Stack positions = new Stack();
        for (int i = 0; i < 150; ++i) {
            this.ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl)this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher)PowerMockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry)invocationOnMock.getArgument(0, Entry.class);
            positions.push(entry.getPosition());
            Assert.assertEquals((String)new String(entry.getData()), (String)String.format("message-%d", entryCount.getAndIncrement()));
            this.cursor.seek((Position)this.ledger.getNextValidPosition((PositionImpl)entry.getPosition()));
            return null;
        }).when((Object)this.mockDispatcher)).readEntryComplete((Entry)ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest)ArgumentMatchers.any(PendingReadEntryRequest.class));
        streamingEntryReader.asyncReadEntries(50, 700, null);
        Awaitility.await().until(() -> entryCount.get() == 50);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        streamingEntryReader.asyncReadEntries(50, 700, null);
        Awaitility.await().until(() -> entryCount.get() == 100);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        streamingEntryReader.asyncReadEntries(50, 700, null);
        Awaitility.await().until(() -> entryCount.get() == 150);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
    }

    @Test
    public void testCanReadEntryFromMLedgerSizeExceededLimit() throws Exception {
        AtomicBoolean readComplete = new AtomicBoolean(false);
        Stack positions = new Stack();
        ArrayList entries = new ArrayList();
        int size = "mmmmmmmmmmessage-0".getBytes().length;
        for (int i = 0; i < 15; ++i) {
            this.ledger.addEntry(String.format("mmmmmmmmmmessage-%d", i).getBytes(Encoding));
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl)this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher)PowerMockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry)invocationOnMock.getArgument(0, Entry.class);
            positions.push(entry.getPosition());
            entries.add(new String(entry.getData()));
            this.cursor.seek((Position)this.ledger.getNextValidPosition((PositionImpl)entry.getPosition()));
            return null;
        }).when((Object)this.mockDispatcher)).readEntryComplete((Entry)ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest)ArgumentMatchers.any(PendingReadEntryRequest.class));
        ((StreamingDispatcher)PowerMockito.doAnswer(invocationOnMock -> {
            readComplete.set(true);
            return null;
        }).when((Object)this.mockDispatcher)).canReadMoreEntries(ArgumentMatchers.anyBoolean());
        final PositionImpl position = this.ledger.getPositionAfterN(this.ledger.getFirstPosition(), 3L, ManagedLedgerImpl.PositionBound.startExcluded);
        ((ManagedLedgerImpl)PowerMockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                AsyncCallbacks.ReadEntryCallback cb = (AsyncCallbacks.ReadEntryCallback)invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
                StreamingEntryReaderTests.this.executor.schedule(() -> cb.readEntryComplete((Entry)EntryImpl.create((long)position.getLedgerId(), (long)position.getEntryId(), (byte[])"mmmmmmmmmmessage-2".getBytes()), invocationOnMock.getArgument(2)), 200L, TimeUnit.MILLISECONDS);
                return null;
            }
        }).when((Object)this.ledger)).asyncReadEntry((PositionImpl)ArgumentMatchers.eq((Object)position), (AsyncCallbacks.ReadEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
        Awaitility.await().until(() -> readComplete.get());
        Assert.assertEquals((int)entries.size(), (int)2);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        Mockito.reset((Object[])new ManagedLedgerImpl[]{this.ledger});
        readComplete.set(false);
        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
        Awaitility.await().until(() -> readComplete.get());
        readComplete.set(false);
        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
        Awaitility.await().until(() -> readComplete.get());
        readComplete.set(false);
        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
        Awaitility.await().until(() -> readComplete.get());
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        Assert.assertEquals((int)entries.size(), (int)8);
        for (int i = 0; i < entries.size(); ++i) {
            Assert.assertEquals((String)String.format("mmmmmmmmmmessage-%d", i), (String)((String)entries.get(i)));
        }
    }

    @Test
    public void testCanReadEntryFromMLedgerWaitingForNewEntry() throws Exception {
        AtomicInteger entryCount = new AtomicInteger(0);
        AtomicBoolean entryProcessed = new AtomicBoolean(false);
        Stack positions = new Stack();
        ArrayList entries = new ArrayList();
        for (int i = 0; i < 7; ++i) {
            this.ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl)this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher)PowerMockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry)invocationOnMock.getArgument(0, Entry.class);
            positions.push(entry.getPosition());
            entries.add(new String(entry.getData()));
            entryCount.getAndIncrement();
            this.cursor.seek((Position)this.ledger.getNextValidPosition((PositionImpl)entry.getPosition()));
            entryProcessed.set(true);
            return null;
        }).when((Object)this.mockDispatcher)).readEntryComplete((Entry)ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest)ArgumentMatchers.any(PendingReadEntryRequest.class));
        streamingEntryReader.asyncReadEntries(5, 100, null);
        Awaitility.await().until(() -> entryCount.get() == 5);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        streamingEntryReader.asyncReadEntries(5, 100, null);
        Awaitility.await().until(() -> entryCount.get() == 7);
        entryProcessed.set(false);
        this.ledger.addEntry("message-7".getBytes(Encoding));
        Awaitility.await().until(() -> entryProcessed.get());
        Assert.assertEquals((int)entries.size(), (int)8);
        entryProcessed.set(false);
        this.ledger.addEntry("message-8".getBytes(Encoding));
        Awaitility.await().until(() -> entryProcessed.get());
        Assert.assertEquals((int)entries.size(), (int)9);
        entryProcessed.set(false);
        this.ledger.addEntry("message-9".getBytes(Encoding));
        Awaitility.await().until(() -> entryProcessed.get());
        Assert.assertEquals((int)entries.size(), (int)10);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        for (int i = 0; i < entries.size(); ++i) {
            Assert.assertEquals((String)String.format("message-%d", i), (String)((String)entries.get(i)));
        }
    }

    @Test
    public void testCanCancelReadEntryRequestAndResumeReading() throws Exception {
        final HashMap<Position, String> messages = new HashMap<Position, String>();
        final AtomicInteger count = new AtomicInteger(0);
        Stack positions = new Stack();
        ArrayList entries = new ArrayList();
        for (int i = 0; i < 20; ++i) {
            String msg = String.format("message-%d", i);
            messages.put(this.ledger.addEntry(msg.getBytes(Encoding)), msg);
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl)this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher)PowerMockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry)invocationOnMock.getArgument(0, Entry.class);
            positions.push(entry.getPosition());
            entries.add(new String(entry.getData()));
            this.cursor.seek((Position)this.ledger.getNextValidPosition((PositionImpl)entry.getPosition()));
            return null;
        }).when((Object)this.mockDispatcher)).readEntryComplete((Entry)ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest)ArgumentMatchers.any(PendingReadEntryRequest.class));
        ((ManagedLedgerImpl)PowerMockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocationOnMock) {
                AsyncCallbacks.ReadEntryCallback cb = (AsyncCallbacks.ReadEntryCallback)invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
                PositionImpl position = (PositionImpl)invocationOnMock.getArgument(0, PositionImpl.class);
                int c = count.getAndIncrement();
                if (c < 5) {
                    cb.readEntryComplete((Entry)EntryImpl.create((long)position.getLedgerId(), (long)position.getEntryId(), (byte[])((String)messages.get(position)).getBytes()), invocationOnMock.getArgument(2));
                }
                return null;
            }
        }).when((Object)this.ledger)).asyncReadEntry((PositionImpl)ArgumentMatchers.any(), (AsyncCallbacks.ReadEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        streamingEntryReader.asyncReadEntries(20, 200, null);
        streamingEntryReader.cancelReadRequests();
        Awaitility.await().until(() -> streamingEntryReader.getState() == StreamingEntryReader.State.Canceled);
        Assert.assertEquals((int)entries.size(), (int)5);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        Mockito.reset((Object[])new ManagedLedgerImpl[]{this.ledger});
        streamingEntryReader.asyncReadEntries(15, 200, null);
        streamingEntryReader.cancelReadRequests();
        Awaitility.await().until(() -> streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
        Assert.assertEquals((int)entries.size(), (int)20);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        for (int i = 0; i < entries.size(); ++i) {
            Assert.assertEquals((String)String.format("message-%d", i), (String)((String)entries.get(i)));
        }
    }

    @Test
    public void testCanHandleExceptionAndRetry() throws Exception {
        final HashMap<Position, String> messages = new HashMap<Position, String>();
        AtomicBoolean entryProcessed = new AtomicBoolean(false);
        final AtomicInteger count = new AtomicInteger(0);
        Stack positions = new Stack();
        ArrayList entries = new ArrayList();
        for (int i = 0; i < 12; ++i) {
            String msg = String.format("message-%d", i);
            messages.put(this.ledger.addEntry(msg.getBytes(Encoding)), msg);
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl)this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher)PowerMockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry)invocationOnMock.getArgument(0, Entry.class);
            positions.push(entry.getPosition());
            entries.add(new String(entry.getData()));
            this.cursor.seek((Position)this.ledger.getNextValidPosition((PositionImpl)entry.getPosition()));
            if (entries.size() == 6 || entries.size() == 12) {
                entryProcessed.set(true);
            }
            return null;
        }).when((Object)this.mockDispatcher)).readEntryComplete((Entry)ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest)ArgumentMatchers.any(PendingReadEntryRequest.class));
        ((ManagedLedgerImpl)PowerMockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                AsyncCallbacks.ReadEntryCallback cb = (AsyncCallbacks.ReadEntryCallback)invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
                PositionImpl position = (PositionImpl)invocationOnMock.getArgument(0, PositionImpl.class);
                int c = count.getAndIncrement();
                if (c >= 3 && c < 5 || c >= 9 && c < 11) {
                    cb.readEntryFailed((ManagedLedgerException)new ManagedLedgerException.TooManyRequestsException("Fake exception."), invocationOnMock.getArgument(2));
                } else {
                    cb.readEntryComplete((Entry)EntryImpl.create((long)position.getLedgerId(), (long)position.getEntryId(), (byte[])((String)messages.get(position)).getBytes()), invocationOnMock.getArgument(2));
                }
                return null;
            }
        }).when((Object)this.ledger)).asyncReadEntry((PositionImpl)ArgumentMatchers.any(), (AsyncCallbacks.ReadEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        streamingEntryReader.asyncReadEntries(6, 100, null);
        Awaitility.await().until(() -> entryProcessed.get());
        Assert.assertEquals((int)entries.size(), (int)6);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        entryProcessed.set(false);
        streamingEntryReader.asyncReadEntries(6, 100, null);
        Awaitility.await().until(() -> entryProcessed.get());
        Assert.assertEquals((int)entries.size(), (int)12);
        Assert.assertEquals((Object)this.cursor.getReadPosition(), (Object)this.ledger.getNextValidPosition((PositionImpl)positions.peek()));
        for (int i = 0; i < entries.size(); ++i) {
            Assert.assertEquals((String)String.format("message-%d", i), (String)((String)entries.get(i)));
        }
    }

    @Test
    public void testWillCancelReadAfterExhaustingRetry() throws Exception {
        int i;
        final HashMap<Position, String> messages = new HashMap<Position, String>();
        final AtomicInteger count = new AtomicInteger(0);
        Stack positions = new Stack();
        ArrayList entries = new ArrayList();
        for (int i2 = 0; i2 < 12; ++i2) {
            String msg = String.format("message-%d", i2);
            messages.put(this.ledger.addEntry(msg.getBytes(Encoding)), msg);
        }
        StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl)this.cursor, this.mockDispatcher, this.mockTopic);
        ((StreamingDispatcher)PowerMockito.doAnswer(invocationOnMock -> {
            Entry entry = (Entry)invocationOnMock.getArgument(0, Entry.class);
            positions.push(entry.getPosition());
            this.cursor.seek((Position)this.ledger.getNextValidPosition((PositionImpl)entry.getPosition()));
            entries.add(new String(entry.getData()));
            return null;
        }).when((Object)this.mockDispatcher)).readEntryComplete((Entry)ArgumentMatchers.any(Entry.class), (PendingReadEntryRequest)ArgumentMatchers.any(PendingReadEntryRequest.class));
        ((ManagedLedgerImpl)PowerMockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                AsyncCallbacks.ReadEntryCallback cb = (AsyncCallbacks.ReadEntryCallback)invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
                PositionImpl position = (PositionImpl)invocationOnMock.getArgument(0, PositionImpl.class);
                int c = count.getAndIncrement();
                if (c >= 3) {
                    cb.readEntryFailed((ManagedLedgerException)new ManagedLedgerException.TooManyRequestsException("Fake exception."), invocationOnMock.getArgument(2));
                } else {
                    cb.readEntryComplete((Entry)EntryImpl.create((long)position.getLedgerId(), (long)position.getEntryId(), (byte[])((String)messages.get(position)).getBytes()), invocationOnMock.getArgument(2));
                }
                return null;
            }
        }).when((Object)this.ledger)).asyncReadEntry((PositionImpl)ArgumentMatchers.any(), (AsyncCallbacks.ReadEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        streamingEntryReader.asyncReadEntries(5, 100, null);
        Awaitility.await().until(() -> streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
        Assert.assertEquals((int)entries.size(), (int)3);
        for (i = 0; i < entries.size(); ++i) {
            Assert.assertEquals((String)String.format("message-%d", i), (String)((String)entries.get(i)));
        }
        Mockito.reset((Object[])new ManagedLedgerImpl[]{this.ledger});
        streamingEntryReader.asyncReadEntries(5, 100, null);
        Awaitility.await().until(() -> streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
        Assert.assertEquals((int)entries.size(), (int)8);
        for (i = 0; i < entries.size(); ++i) {
            Assert.assertEquals((String)String.format("message-%d", i), (String)((String)entries.get(i)));
        }
    }
}

