package org.apache.hadoop.hbase.regionserver.regionreplication;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

@Category({RegionServerTests.class, LargeTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.class */
public class TestRegionReplicationSinkCallbackAndFlushConcurrently {
    private static final int NB_SERVERS = 2;

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestRegionReplicationSinkCallbackAndFlushConcurrently.class);
    private static final byte[] FAMILY = Bytes.toBytes("family_test");
    private static final byte[] QUAL = Bytes.toBytes("qualifier_test");
    private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
    private static TableName tableName = TableName.valueOf("testRegionReplicationSinkSuspend");
    private static volatile boolean startTest = false;

    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently$ErrorReplayRSRpcServices.class */
    public static final class ErrorReplayRSRpcServices extends RSRpcServices {
        private static final AtomicInteger callCounter = new AtomicInteger(0);

        public ErrorReplayRSRpcServices(HRegionServer hRegionServer) throws IOException {
            super(hRegionServer);
        }

        public AdminProtos.ReplicateWALEntryResponse replicateToReplica(RpcController rpcController, AdminProtos.ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
            if (!TestRegionReplicationSinkCallbackAndFlushConcurrently.startTest) {
                return super.replicateToReplica(rpcController, replicateWALEntryRequest);
            }
            List entryList = replicateWALEntryRequest.getEntryList();
            if (CollectionUtils.isEmpty(entryList)) {
                return AdminProtos.ReplicateWALEntryResponse.getDefaultInstance();
            }
            try {
                HRegion regionByEncodedName = this.server.getRegionByEncodedName(((AdminProtos.WALEntry) entryList.get(0)).getKey().getEncodedRegionName().toStringUtf8());
                if (!regionByEncodedName.getRegionInfo().getTable().equals(TestRegionReplicationSinkCallbackAndFlushConcurrently.tableName) || regionByEncodedName.getRegionInfo().getReplicaId() != 1) {
                    return super.replicateToReplica(rpcController, replicateWALEntryRequest);
                }
                if (callCounter.incrementAndGet() > 1) {
                    return super.replicateToReplica(rpcController, replicateWALEntryRequest);
                }
                throw new ServiceException(new DoNotRetryIOException("Inject error!"));
            } catch (NotServingRegionException e) {
                throw new ServiceException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently$HRegionForTest.class */
    public static final class HRegionForTest extends HRegion {
        static final String USER_THREAD_NAME = "TestReplicationHang";
        final CyclicBarrier cyclicBarrier;
        volatile boolean prepareFlush;

        public HRegionForTest(HRegionFileSystem hRegionFileSystem, WAL wal, Configuration configuration, TableDescriptor tableDescriptor, RegionServerServices regionServerServices) {
            super(hRegionFileSystem, wal, configuration, tableDescriptor, regionServerServices);
            this.cyclicBarrier = new CyclicBarrier(2);
            this.prepareFlush = false;
        }

        public HRegionForTest(Path path, WAL wal, FileSystem fileSystem, Configuration configuration, RegionInfo regionInfo, TableDescriptor tableDescriptor, RegionServerServices regionServerServices) {
            super(path, wal, fileSystem, configuration, regionInfo, tableDescriptor, regionServerServices);
            this.cyclicBarrier = new CyclicBarrier(2);
            this.prepareFlush = false;
        }

        public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) {
            this.regionReplicationSink = Optional.of(regionReplicationSink);
        }

        protected HRegion.PrepareFlushResult internalPrepareFlushCache(WAL wal, long j, Collection<HStore> collection, MonitoredTask monitoredTask, boolean z, FlushLifeCycleTracker flushLifeCycleTracker) throws IOException {
            if (!TestRegionReplicationSinkCallbackAndFlushConcurrently.startTest) {
                return super.internalPrepareFlushCache(wal, j, collection, monitoredTask, z, flushLifeCycleTracker);
            }
            if (getRegionInfo().getReplicaId() == 0 && Thread.currentThread().getName().equals(USER_THREAD_NAME)) {
                this.prepareFlush = true;
            }
            try {
                HRegion.PrepareFlushResult internalPrepareFlushCache = super.internalPrepareFlushCache(wal, j, collection, monitoredTask, z, flushLifeCycleTracker);
                if (getRegionInfo().getReplicaId() == 0 && Thread.currentThread().getName().equals(USER_THREAD_NAME)) {
                    this.prepareFlush = false;
                }
                return internalPrepareFlushCache;
            } catch (Throwable th) {
                if (getRegionInfo().getReplicaId() == 0 && Thread.currentThread().getName().equals(USER_THREAD_NAME)) {
                    this.prepareFlush = false;
                }
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently$RSForTest.class */
    public static final class RSForTest extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
        public RSForTest(Configuration configuration) throws IOException, InterruptedException {
            super(configuration);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: createRpcServices, reason: merged with bridge method [inline-methods] */
        public RSRpcServices m1118createRpcServices() throws IOException {
            return new ErrorReplayRSRpcServices(this);
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        Configuration configuration = HTU.getConfiguration();
        configuration.setBoolean("hbase.region.replica.replication.enabled", true);
        configuration.setClass("hbase.hregion.impl", HRegionForTest.class, HRegion.class);
        configuration.setInt("hbase.region.read-replica.sink.retries.number", 15);
        configuration.setLong("hbase.region.read-replica.sink.rpc.timeout.ms", 600000L);
        configuration.setLong("hbase.region.read-replica.sink.operation.timeout.ms", 1200000L);
        configuration.setLong("hbase.region.read-replica.sink.meta-edit.rpc.timeout.ms", 600000L);
        configuration.setLong("hbase.region.read-replica.sink.meta-edit.operation.timeout.ms", 1200000L);
        configuration.setBoolean("hbase.region.replica.wait.for.primary.flush", false);
        HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class).numRegionServers(2).build());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        HTU.shutdownMiniCluster();
    }

    @Test
    public void test() throws Exception {
        HRegionForTest[] createTable = createTable();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        RegionReplicationSink regionReplicationSink = (RegionReplicationSink) createTable[0].getRegionReplicationSink().get();
        Assert.assertTrue(regionReplicationSink != null);
        RegionReplicationSink upSpiedRegionReplicationSink = setUpSpiedRegionReplicationSink(regionReplicationSink, createTable[0], atomicBoolean);
        String name = Thread.currentThread().getName();
        Thread.currentThread().setName("TestReplicationHang");
        try {
            startTest = true;
            createTable[0].put(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
            createTable[0].flushcache(true, true, FlushLifeCycleTracker.DUMMY);
            HTU.waitFor(120000L, () -> {
                return atomicBoolean.get();
            });
            Assert.assertTrue(upSpiedRegionReplicationSink.getFailedReplicas().isEmpty());
            startTest = false;
            Thread.currentThread().setName(name);
        } catch (Throwable th) {
            startTest = false;
            Thread.currentThread().setName(name);
            throw th;
        }
    }

    private RegionReplicationSink setUpSpiedRegionReplicationSink(RegionReplicationSink regionReplicationSink, HRegionForTest hRegionForTest, AtomicBoolean atomicBoolean) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        RegionReplicationSink regionReplicationSink2 = (RegionReplicationSink) Mockito.spy(regionReplicationSink);
        ((RegionReplicationSink) Mockito.doAnswer(invocationOnMock -> {
            if (!startTest) {
                invocationOnMock.callRealMethod();
                return null;
            }
            if (atomicInteger.incrementAndGet() != 1) {
                invocationOnMock.callRealMethod();
                return null;
            }
            hRegionForTest.cyclicBarrier.await();
            invocationOnMock.callRealMethod();
            atomicBoolean.set(true);
            return null;
        }).when(regionReplicationSink2)).onComplete(Mockito.anyList(), Mockito.anyMap());
        ((RegionReplicationSink) Mockito.doAnswer(invocationOnMock2 -> {
            if (!startTest) {
                return invocationOnMock2.callRealMethod();
            }
            if (!hRegionForTest.prepareFlush || !Thread.currentThread().getName().equals("TestReplicationHang") || atomicInteger2.incrementAndGet() != 1) {
                return invocationOnMock2.callRealMethod();
            }
            hRegionForTest.cyclicBarrier.await();
            return invocationOnMock2.callRealMethod();
        }).when(regionReplicationSink2)).getStartFlushAllDescriptor((Cell) Mockito.any());
        hRegionForTest.setRegionReplicationSink(regionReplicationSink2);
        return regionReplicationSink2;
    }

    private HRegionForTest[] createTable() throws Exception {
        HTU.getAdmin().createTable(TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(2).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
        HRegionForTest[] hRegionForTestArr = new HRegionForTest[2];
        for (int i = 0; i < 2; i++) {
            for (HRegion hRegion : HTU.getMiniHBaseCluster().getRegionServer(i).getRegions(tableName)) {
                Assert.assertTrue(hRegionForTestArr[hRegion.getRegionInfo().getReplicaId()] == null);
                hRegionForTestArr[hRegion.getRegionInfo().getReplicaId()] = (HRegionForTest) hRegion;
            }
        }
        for (HRegionForTest hRegionForTest : hRegionForTestArr) {
            Assert.assertNotNull(hRegionForTest);
        }
        return hRegionForTestArr;
    }
}
