package org.apache.bookkeeper.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.PendingReadOp;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/client/TestParallelRead.class */
public class TestParallelRead extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TestParallelRead.class);
    final BookKeeper.DigestType digestType;
    final byte[] passwd;

    public TestParallelRead() {
        super(6);
        this.passwd = "parallel-read".getBytes();
        this.digestType = BookKeeper.DigestType.CRC32;
    }

    long getLedgerToRead(int i, int i2, int i3, int i4) throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(i, i2, i3, this.digestType, this.passwd);
        for (int i5 = 0; i5 < i4; i5++) {
            createLedger.addEntry(("" + i5).getBytes());
        }
        createLedger.close();
        return createLedger.getId();
    }

    PendingReadOp createReadOp(LedgerHandle ledgerHandle, long j, long j2) {
        return new PendingReadOp(ledgerHandle, this.bkc.getClientCtx(), j, j2, false);
    }

    PendingReadOp createRecoveryReadOp(LedgerHandle ledgerHandle, long j, long j2) {
        return new PendingReadOp(ledgerHandle, this.bkc.getClientCtx(), j, j2, true);
    }

    @Test
    public void testNormalParallelRead() throws Exception {
        LedgerHandle openLedger = this.bkc.openLedger(getLedgerToRead(5, 2, 2, 10), this.digestType, this.passwd);
        for (int i = 0; i < 10; i++) {
            PendingReadOp createReadOp = createReadOp(openLedger, i, i);
            createReadOp.parallelRead(true).submit();
            Iterator it = ((LedgerEntries) createReadOp.future().get()).iterator();
            Assert.assertTrue(it.hasNext());
            LedgerEntry ledgerEntry = (LedgerEntry) it.next();
            Assert.assertNotNull(ledgerEntry);
            Assert.assertEquals(i, Integer.parseInt(new String(ledgerEntry.getEntryBytes())));
            ledgerEntry.close();
            Assert.assertFalse(it.hasNext());
        }
        PendingReadOp createReadOp2 = createReadOp(openLedger, 0L, 10 - 1);
        createReadOp2.parallelRead(true).submit();
        int i2 = 0;
        for (LedgerEntry ledgerEntry2 : (LedgerEntries) createReadOp2.future().get()) {
            Assert.assertNotNull(ledgerEntry2);
            Assert.assertEquals(i2, Integer.parseInt(new String(ledgerEntry2.getEntryBytes())));
            ledgerEntry2.close();
            i2++;
        }
        Assert.assertEquals(10, i2);
        openLedger.close();
    }

    private static <T> void expectFail(CompletableFuture<T> completableFuture, int i) {
        try {
            FutureUtils.result(completableFuture);
            Assert.fail("Expect to fail");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof BKException);
            Assert.assertEquals(i, e.getCode());
        }
    }

    @Test
    public void testParallelReadMissingEntries() throws Exception {
        LedgerHandle openLedger = this.bkc.openLedger(getLedgerToRead(5, 2, 2, 10), this.digestType, this.passwd);
        PendingReadOp createReadOp = createReadOp(openLedger, 11L, 11L);
        createReadOp.parallelRead(true).submit();
        expectFail(createReadOp.future(), -13);
        PendingReadOp createReadOp2 = createReadOp(openLedger, 8L, 11L);
        createReadOp2.parallelRead(true).submit();
        expectFail(createReadOp2.future(), -13);
        openLedger.close();
    }

    @Test
    public void testFailParallelRecoveryReadMissingEntryImmediately() throws Exception {
        long ledgerToRead = getLedgerToRead(5, 5, 3, 1);
        ClientConfiguration readEntryTimeout = new ClientConfiguration().setReadEntryTimeout(30000);
        readEntryTimeout.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(readEntryTimeout);
        LedgerHandle openLedger = this.bkc.openLedger(ledgerToRead, this.digestType, this.passwd);
        List ensembleAt = openLedger.getLedgerMetadata().getEnsembleAt(10L);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        sleepBookie((BookieId) ensembleAt.get(0), countDownLatch);
        sleepBookie((BookieId) ensembleAt.get(1), countDownLatch2);
        PendingReadOp createRecoveryReadOp = createRecoveryReadOp(openLedger, 10L, 10L);
        createRecoveryReadOp.parallelRead(true).submit();
        expectFail(createRecoveryReadOp.future(), -13);
        countDownLatch.countDown();
        countDownLatch2.countDown();
        openLedger.close();
        bookKeeper.close();
    }

    @Test
    public void testParallelReadWithFailedBookies() throws Exception {
        long ledgerToRead = getLedgerToRead(5, 3, 3, 10);
        ClientConfiguration readEntryTimeout = new ClientConfiguration().setReadEntryTimeout(30000);
        readEntryTimeout.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(readEntryTimeout);
        LedgerHandle openLedger = this.bkc.openLedger(ledgerToRead, this.digestType, this.passwd);
        List ensembleAt = openLedger.getLedgerMetadata().getEnsembleAt(5L);
        killBookie((BookieId) ensembleAt.get(0));
        killBookie((BookieId) ensembleAt.get(1));
        PendingReadOp createReadOp = createReadOp(openLedger, 0L, 10 - 1);
        createReadOp.parallelRead(true).submit();
        Iterator it = ((LedgerEntries) createReadOp.future().get()).iterator();
        int i = 0;
        while (it.hasNext()) {
            Assert.assertNotNull((LedgerEntry) it.next());
            Assert.assertEquals(i, Integer.parseInt(new String(r0.getEntryBytes())));
            i++;
        }
        Assert.assertEquals(10, i);
        openLedger.close();
        bookKeeper.close();
    }

    @Test
    public void testParallelReadFailureWithFailedBookies() throws Exception {
        long ledgerToRead = getLedgerToRead(5, 3, 3, 10);
        ClientConfiguration readEntryTimeout = new ClientConfiguration().setReadEntryTimeout(30000);
        readEntryTimeout.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(readEntryTimeout);
        LedgerHandle openLedger = this.bkc.openLedger(ledgerToRead, this.digestType, this.passwd);
        List ensembleAt = openLedger.getLedgerMetadata().getEnsembleAt(5L);
        killBookie((BookieId) ensembleAt.get(0));
        killBookie((BookieId) ensembleAt.get(1));
        killBookie((BookieId) ensembleAt.get(2));
        PendingReadOp createReadOp = createReadOp(openLedger, 0L, 10 - 1);
        createReadOp.parallelRead(true).submit();
        expectFail(createReadOp.future(), -8);
        openLedger.close();
        bookKeeper.close();
    }

    @Test
    public void testLedgerEntryRequestComplete() throws Exception {
        LedgerHandle ledgerHandle = (LedgerHandle) Mockito.mock(LedgerHandle.class);
        LedgerMetadata ledgerMetadata = (LedgerMetadata) Mockito.mock(LedgerMetadata.class);
        ClientContext clientContext = (ClientContext) Mockito.mock(ClientContext.class);
        ((ClientContext) Mockito.doReturn((ClientInternalConf) Mockito.mock(ClientInternalConf.class)).when(clientContext)).getConf();
        BookKeeperClientStats bookKeeperClientStats = (BookKeeperClientStats) Mockito.mock(BookKeeperClientStats.class);
        ((ClientContext) Mockito.doReturn(bookKeeperClientStats).when(clientContext)).getClientStats();
        ((BookKeeperClientStats) Mockito.doReturn((OpStatsLogger) Mockito.mock(OpStatsLogger.class)).when(bookKeeperClientStats)).getReadOpLogger();
        ((LedgerHandle) Mockito.doReturn(ledgerMetadata).when(ledgerHandle)).getLedgerMetadata();
        ((LedgerMetadata) Mockito.doReturn(2).when(ledgerMetadata)).getWriteQuorumSize();
        ((LedgerMetadata) Mockito.doReturn(1).when(ledgerMetadata)).getAckQuorumSize();
        ((LedgerMetadata) Mockito.doReturn(new TreeMap()).when(ledgerMetadata)).getAllEnsembles();
        ((LedgerHandle) Mockito.doReturn((DistributionSchedule.WriteSet) Mockito.mock(DistributionSchedule.WriteSet.class)).when(ledgerHandle)).getWriteSetForReadOperation(ArgumentMatchers.anyLong());
        PendingReadOp pendingReadOp = new PendingReadOp(ledgerHandle, clientContext, 1L, 2L, false);
        pendingReadOp.parallelRead(true);
        pendingReadOp.initiate();
        PendingReadOp.LedgerEntryRequest ledgerEntryRequest = (PendingReadOp.LedgerEntryRequest) pendingReadOp.seq.get(0);
        PendingReadOp.LedgerEntryRequest ledgerEntryRequest2 = (PendingReadOp.LedgerEntryRequest) pendingReadOp.seq.get(1);
        pendingReadOp.submitCallback(-105);
        Assert.assertEquals(-1L, ledgerEntryRequest.entryImpl.getEntryId());
        Assert.assertEquals(-1L, ledgerEntryRequest.entryImpl.getLedgerId());
        Assert.assertEquals(-1L, ledgerEntryRequest.entryImpl.getLength());
        Assert.assertNull(ledgerEntryRequest.entryImpl.getEntryBuffer());
        Assert.assertTrue(ledgerEntryRequest.complete.get());
        Assert.assertEquals(-1L, ledgerEntryRequest2.entryImpl.getEntryId());
        Assert.assertEquals(-1L, ledgerEntryRequest2.entryImpl.getLedgerId());
        Assert.assertEquals(-1L, ledgerEntryRequest2.entryImpl.getLength());
        Assert.assertNull(ledgerEntryRequest2.entryImpl.getEntryBuffer());
        Assert.assertTrue(ledgerEntryRequest2.complete.get());
        Method declaredMethod = PendingReadOp.class.getDeclaredMethod("createReadContext", Integer.TYPE, BookieId.class, PendingReadOp.LedgerEntryRequest.class);
        declaredMethod.setAccessible(true);
        ByteBuf buffer = Unpooled.buffer(10);
        pendingReadOp.readEntryComplete(0, 1L, 1L, Unpooled.buffer(10), declaredMethod.invoke(pendingReadOp, 1, BookieId.parse("test"), ledgerEntryRequest));
        Assert.assertEquals(buffer.refCnt(), 1L);
        Assert.assertNull(ledgerEntryRequest.entryImpl.getEntryBuffer());
        Assert.assertTrue(ledgerEntryRequest.complete.get());
        PendingReadOp pendingReadOp2 = new PendingReadOp(ledgerHandle, clientContext, 1L, 2L, false);
        pendingReadOp2.parallelRead(true);
        pendingReadOp2.initiate();
        pendingReadOp2.readEntryComplete(-105, 1L, 1L, Unpooled.buffer(10), declaredMethod.invoke(pendingReadOp2, 1, BookieId.parse("test"), ledgerEntryRequest));
        pendingReadOp2.readEntryComplete(-105, 1L, 1L, Unpooled.buffer(10), declaredMethod.invoke(pendingReadOp2, 1, BookieId.parse("test"), ledgerEntryRequest));
        ByteBuf buffer2 = Unpooled.buffer(10);
        pendingReadOp2.readEntryComplete(0, 1L, 1L, Unpooled.buffer(10), declaredMethod.invoke(pendingReadOp2, 1, BookieId.parse("test"), ledgerEntryRequest));
        Assert.assertEquals(1L, buffer2.refCnt());
    }
}
