package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({MediumTests.class, ClientTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/client/TestAsyncBufferMutator.class */
public class TestAsyncBufferMutator {

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
    private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
    private static TableName TABLE_NAME = TableName.valueOf("async");
    private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region");
    private static byte[] CF = Bytes.toBytes(MockMasterServices.DEFAULT_COLUMN_FAMILY_NAME);
    private static byte[] CQ = Bytes.toBytes("cq");
    private static int COUNT = 100;
    private static byte[] VALUE = new byte[1024];
    private static AsyncConnection CONN;

    /* loaded from: input_file:org/apache/hadoop/hbase/client/TestAsyncBufferMutator$AsyncBufferMutatorForTest.class */
    private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
        private int flushCount;

        AsyncBufferMutatorForTest(HashedWheelTimer hashedWheelTimer, AsyncTable<?> asyncTable, long j, long j2, int i) {
            super(hashedWheelTimer, asyncTable, j, j2, i);
        }

        protected void internalFlush() {
            this.flushCount++;
            super.internalFlush();
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        TEST_UTIL.startMiniCluster(1);
        TEST_UTIL.createTable(TABLE_NAME, CF);
        TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF);
        CONN = (AsyncConnection) ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
        ThreadLocalRandom.current().nextBytes(VALUE);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        CONN.close();
        TEST_UTIL.shutdownMiniCluster();
    }

    @Test
    public void testWithMultiRegionTable() throws InterruptedException {
        test(MULTI_REGION_TABLE_NAME);
    }

    @Test
    public void testWithSingleRegionTable() throws InterruptedException {
        test(TABLE_NAME);
    }

    private void test(TableName tableName) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        AsyncBufferedMutator build = CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16384L).build();
        Throwable th = null;
        try {
            try {
                build.mutate((List) IntStream.range(0, COUNT / 2).mapToObj(i -> {
                    return new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE);
                }).collect(Collectors.toList())).forEach(completableFuture -> {
                });
                IntStream.range(COUNT / 2, COUNT).forEach(i2 -> {
                    arrayList.add(build.mutate(new Put(Bytes.toBytes(i2)).addColumn(CF, CQ, VALUE)));
                });
                ((CompletableFuture) arrayList.get(0)).join();
                Thread.sleep(2000L);
                Assert.assertFalse(((CompletableFuture) arrayList.get(arrayList.size() - 1)).isDone());
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
                arrayList.forEach(completableFuture2 -> {
                });
                AsyncTable table = CONN.getTable(tableName);
                IntStream.range(0, COUNT).mapToObj(i3 -> {
                    return new Get(Bytes.toBytes(i3));
                }).map(get -> {
                    return (Result) table.get(get).join();
                }).forEach(result -> {
                    Assert.assertArrayEquals(VALUE, result.getValue(CF, CQ));
                });
            } finally {
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testClosedMutate() throws InterruptedException {
        AsyncBufferedMutator bufferedMutator = CONN.getBufferedMutator(TABLE_NAME);
        bufferedMutator.close();
        Put addColumn = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
        try {
            bufferedMutator.mutate(addColumn).get();
            Assert.fail("Close check failed");
        } catch (ExecutionException e) {
            MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class));
            Assert.assertTrue(e.getCause().getMessage().startsWith("Already closed"));
        }
        Iterator it = bufferedMutator.mutate(Arrays.asList(addColumn)).iterator();
        while (it.hasNext()) {
            try {
                ((CompletableFuture) it.next()).get();
                Assert.fail("Close check failed");
            } catch (ExecutionException e2) {
                MatcherAssert.assertThat(e2.getCause(), CoreMatchers.instanceOf(IOException.class));
                Assert.assertTrue(e2.getCause().getMessage().startsWith("Already closed"));
            }
        }
    }

    @Test
    public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
        AsyncBufferedMutator build = CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build();
        Throwable th = null;
        try {
            CompletableFuture mutate = build.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
            Thread.sleep(2000L);
            Assert.assertFalse(mutate.isDone());
            build.flush();
            mutate.get();
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    build.close();
                }
            }
            Assert.assertArrayEquals(VALUE, ((Result) CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get()).getValue(CF, CQ));
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPeriodicFlush() throws InterruptedException, ExecutionException {
        CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1L, TimeUnit.SECONDS).build().mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)).get();
        Assert.assertArrayEquals(VALUE, ((Result) CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get()).getValue(CF, CQ));
    }

    @Test
    public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
        Put addColumn = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
        AsyncBufferedMutatorImpl build = CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1L, TimeUnit.SECONDS).setWriteBufferSize(10 * addColumn.heapSize()).build();
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                arrayList.add(build.mutate(addColumn));
                Timeout timeout = build.periodicFlushTask;
                Assert.assertNotNull(timeout);
                int i = 1;
                while (true) {
                    arrayList.add(build.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
                    if (build.periodicFlushTask == null) {
                        break;
                    } else {
                        i++;
                    }
                }
                Assert.assertTrue(timeout.isCancelled());
                CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).join();
                AsyncTable table = CONN.getTable(TABLE_NAME);
                for (int i2 = 0; i2 < arrayList.size(); i2++) {
                    Assert.assertArrayEquals(VALUE, ((Result) table.get(new Get(Bytes.toBytes(i2))).get()).getValue(CF, CQ));
                }
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCancelPeriodicFlushByManuallyFlush() throws InterruptedException, ExecutionException {
        AsyncBufferedMutatorImpl build = CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1L, TimeUnit.SECONDS).build();
        Throwable th = null;
        try {
            CompletableFuture mutate = build.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
            Timeout timeout = build.periodicFlushTask;
            Assert.assertNotNull(timeout);
            build.flush();
            Assert.assertTrue(timeout.isCancelled());
            mutate.get();
            Assert.assertArrayEquals(VALUE, ((Result) CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get()).getValue(CF, CQ));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
        AsyncBufferedMutatorImpl build = CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1L, TimeUnit.SECONDS).build();
        Throwable th = null;
        try {
            CompletableFuture mutate = build.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
            Timeout timeout = build.periodicFlushTask;
            Assert.assertNotNull(timeout);
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    build.close();
                }
            }
            Assert.assertTrue(timeout.isCancelled());
            mutate.get();
            Assert.assertArrayEquals(VALUE, ((Result) CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get()).getValue(CF, CQ));
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRaceBetweenNormalFlushAndPeriodicFlush() throws InterruptedException, ExecutionException {
        Mutation addColumn = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
        AsyncBufferMutatorForTest asyncBufferMutatorForTest = new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME), 10 * addColumn.heapSize(), TimeUnit.MILLISECONDS.toNanos(200L), 1048576);
        Throwable th = null;
        try {
            CompletableFuture mutate = asyncBufferMutatorForTest.mutate(addColumn);
            Timeout timeout = asyncBufferMutatorForTest.periodicFlushTask;
            Assert.assertNotNull(timeout);
            synchronized (asyncBufferMutatorForTest) {
                Thread.sleep(500L);
                Assert.assertTrue(timeout.isExpired());
                Assert.assertEquals(0L, asyncBufferMutatorForTest.flushCount);
                Assert.assertFalse(mutate.isDone());
                asyncBufferMutatorForTest.flush();
            }
            Assert.assertFalse(timeout.isCancelled());
            mutate.get();
            Assert.assertArrayEquals(VALUE, ((Result) CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get()).getValue(CF, CQ));
            Assert.assertEquals(1L, asyncBufferMutatorForTest.flushCount);
            if (asyncBufferMutatorForTest != null) {
                if (0 == 0) {
                    asyncBufferMutatorForTest.close();
                    return;
                }
                try {
                    asyncBufferMutatorForTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (asyncBufferMutatorForTest != null) {
                if (0 != 0) {
                    try {
                        asyncBufferMutatorForTest.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    asyncBufferMutatorForTest.close();
                }
            }
            throw th3;
        }
    }
}
