package org.apache.hadoop.hbase.regionserver.regionreplication;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Category({RegionServerTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.class */
public class TestRegionReplicationSink {

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestRegionReplicationSink.class);
    private Configuration conf;
    private TableDescriptor td;
    private RegionInfo primary;
    private Runnable flushRequester;
    private AsyncClusterConnection conn;
    private RegionReplicationBufferManager manager;
    private RegionReplicationSink sink;

    @Rule
    public final TableNameTestRule name = new TableNameTestRule();

    @Before
    public void setUp() {
        this.conf = HBaseConfiguration.create();
        this.conf.setLong("hbase.region.read-replica.sink.nb.capacity", 5L);
        this.conf.setLong("hbase.region.read-replica.sink.size.capacity", SpaceQuotaHelperForTests.ONE_MEGABYTE);
        this.td = TableDescriptorBuilder.newBuilder(this.name.getTableName()).setColumnFamily(ColumnFamilyDescriptorBuilder.of(MockMasterServices.DEFAULT_COLUMN_FAMILY_NAME)).setRegionReplication(3).build();
        this.primary = RegionInfoBuilder.newBuilder(this.name.getTableName()).build();
        this.flushRequester = (Runnable) Mockito.mock(Runnable.class);
        this.conn = (AsyncClusterConnection) Mockito.mock(AsyncClusterConnection.class);
        this.manager = (RegionReplicationBufferManager) Mockito.mock(RegionReplicationBufferManager.class);
        this.sink = new RegionReplicationSink(this.conf, this.primary, this.td, this.manager, this.flushRequester, this.conn);
    }

    @After
    public void tearDown() throws InterruptedException {
        this.sink.stop();
        this.sink.waitUntilStopped();
    }

    @Test
    public void testNormal() {
        MutableInt mutableInt = new MutableInt(0);
        List asList = Arrays.asList(new CompletableFuture(), new CompletableFuture());
        Mockito.when(this.conn.replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).then(invocationOnMock -> {
            return (CompletableFuture) asList.get(mutableInt.getAndIncrement());
        });
        ServerCall serverCall = (ServerCall) Mockito.mock(ServerCall.class);
        WALKeyImpl wALKeyImpl = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(Long.valueOf(wALKeyImpl.estimatedSerializedSizeOf())).thenReturn(100L);
        WALEdit wALEdit = (WALEdit) Mockito.mock(WALEdit.class);
        Mockito.when(Long.valueOf(wALEdit.estimatedSerializedSizeOf())).thenReturn(1000L);
        Mockito.when(Boolean.valueOf(this.manager.increase(ArgumentMatchers.anyLong()))).thenReturn(true);
        this.sink.add(wALKeyImpl, wALEdit, serverCall);
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.times(1))).increase(ArgumentMatchers.anyLong());
        ((ServerCall) Mockito.verify(serverCall, Mockito.times(1))).retainByWAL();
        Assert.assertEquals(1100L, this.sink.pendingSize());
        ((CompletableFuture) asList.get(0)).complete(null);
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.never())).decrease(ArgumentMatchers.anyLong());
        ((ServerCall) Mockito.verify(serverCall, Mockito.never())).releaseByWAL();
        Assert.assertEquals(1100L, this.sink.pendingSize());
        ((CompletableFuture) asList.get(1)).complete(null);
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.times(1))).decrease(ArgumentMatchers.anyLong());
        ((ServerCall) Mockito.verify(serverCall, Mockito.times(1))).releaseByWAL();
        Assert.assertEquals(0L, this.sink.pendingSize());
    }

    @Test
    public void testDropEdits() {
        MutableInt mutableInt = new MutableInt(0);
        List asList = Arrays.asList(new CompletableFuture(), new CompletableFuture());
        Mockito.when(this.conn.replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).then(invocationOnMock -> {
            return (CompletableFuture) asList.get(mutableInt.getAndIncrement());
        });
        ServerCall serverCall = (ServerCall) Mockito.mock(ServerCall.class);
        WALKeyImpl wALKeyImpl = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(Long.valueOf(wALKeyImpl.estimatedSerializedSizeOf())).thenReturn(100L);
        WALEdit wALEdit = (WALEdit) Mockito.mock(WALEdit.class);
        Mockito.when(Long.valueOf(wALEdit.estimatedSerializedSizeOf())).thenReturn(1000L);
        Mockito.when(Boolean.valueOf(this.manager.increase(ArgumentMatchers.anyLong()))).thenReturn(true);
        this.sink.add(wALKeyImpl, wALEdit, serverCall);
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.times(1))).increase(ArgumentMatchers.anyLong());
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.never())).decrease(ArgumentMatchers.anyLong());
        ((ServerCall) Mockito.verify(serverCall, Mockito.times(1))).retainByWAL();
        Assert.assertEquals(1100L, this.sink.pendingSize());
        ServerCall serverCall2 = (ServerCall) Mockito.mock(ServerCall.class);
        WALKeyImpl wALKeyImpl2 = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(Long.valueOf(wALKeyImpl2.estimatedSerializedSizeOf())).thenReturn(200L);
        WALEdit wALEdit2 = (WALEdit) Mockito.mock(WALEdit.class);
        Mockito.when(Long.valueOf(wALEdit2.estimatedSerializedSizeOf())).thenReturn(2000L);
        this.sink.add(wALKeyImpl2, wALEdit2, serverCall2);
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.times(2))).increase(ArgumentMatchers.anyLong());
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.never())).decrease(ArgumentMatchers.anyLong());
        ((ServerCall) Mockito.verify(serverCall2, Mockito.times(1))).retainByWAL();
        Assert.assertEquals(3300L, this.sink.pendingSize());
        ServerCall serverCall3 = (ServerCall) Mockito.mock(ServerCall.class);
        WALKeyImpl wALKeyImpl3 = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(Long.valueOf(wALKeyImpl3.estimatedSerializedSizeOf())).thenReturn(200L);
        WALEdit wALEdit3 = (WALEdit) Mockito.mock(WALEdit.class);
        Mockito.when(Long.valueOf(wALEdit3.estimatedSerializedSizeOf())).thenReturn(3000L);
        Mockito.when(Boolean.valueOf(this.manager.increase(ArgumentMatchers.anyLong()))).thenReturn(false);
        this.sink.add(wALKeyImpl3, wALEdit3, serverCall3);
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.times(3))).increase(ArgumentMatchers.anyLong());
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.times(1))).decrease(ArgumentMatchers.anyLong());
        ((ServerCall) Mockito.verify(serverCall3, Mockito.times(1))).retainByWAL();
        ((ServerCall) Mockito.verify(serverCall3, Mockito.times(1))).releaseByWAL();
        ((ServerCall) Mockito.verify(serverCall2, Mockito.times(1))).releaseByWAL();
        Assert.assertEquals(1100L, this.sink.pendingSize());
        ((Runnable) Mockito.verify(this.flushRequester, Mockito.times(1))).run();
        asList.forEach(completableFuture -> {
            completableFuture.complete(null);
        });
        ((RegionReplicationBufferManager) Mockito.verify(this.manager, Mockito.times(2))).decrease(ArgumentMatchers.anyLong());
        ((ServerCall) Mockito.verify(serverCall, Mockito.times(1))).releaseByWAL();
        Assert.assertEquals(0L, this.sink.pendingSize());
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(2))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
    }

    @Test
    public void testNotAddToFailedReplicas() {
        MutableInt mutableInt = new MutableInt(0);
        List list = (List) Stream.generate(() -> {
            return new CompletableFuture();
        }).limit(4L).collect(Collectors.toList());
        Mockito.when(this.conn.replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).then(invocationOnMock -> {
            return (CompletableFuture) list.get(mutableInt.getAndIncrement());
        });
        ServerCall serverCall = (ServerCall) Mockito.mock(ServerCall.class);
        WALKeyImpl wALKeyImpl = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(Long.valueOf(wALKeyImpl.estimatedSerializedSizeOf())).thenReturn(100L);
        Mockito.when(Long.valueOf(wALKeyImpl.getSequenceId())).thenReturn(1L);
        WALEdit wALEdit = (WALEdit) Mockito.mock(WALEdit.class);
        Mockito.when(Long.valueOf(wALEdit.estimatedSerializedSizeOf())).thenReturn(1000L);
        Mockito.when(Boolean.valueOf(this.manager.increase(ArgumentMatchers.anyLong()))).thenReturn(true);
        this.sink.add(wALKeyImpl, wALEdit, serverCall);
        ServerCall serverCall2 = (ServerCall) Mockito.mock(ServerCall.class);
        WALKeyImpl wALKeyImpl2 = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(Long.valueOf(wALKeyImpl2.estimatedSerializedSizeOf())).thenReturn(200L);
        Mockito.when(Long.valueOf(wALKeyImpl2.getSequenceId())).thenReturn(3L);
        this.sink.add(wALKeyImpl2, WALEdit.createFlushWALEdit(this.primary, ProtobufUtil.toFlushDescriptor(WALProtos.FlushDescriptor.FlushAction.START_FLUSH, this.primary, 2L, (Map) this.td.getColumnFamilyNames().stream().collect(Collectors.toMap(Function.identity(), bArr -> {
            return Collections.emptyList();
        }, (list2, list3) -> {
            throw new IllegalStateException();
        }, () -> {
            return new TreeMap(Bytes.BYTES_COMPARATOR);
        })))), serverCall2);
        ((CompletableFuture) list.get(0)).complete(null);
        ((CompletableFuture) list.get(1)).completeExceptionally(new IOException("inject error"));
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(4))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CompletableFuture) list.get(2)).complete(null);
        ((CompletableFuture) list.get(3)).complete(null);
        Assert.assertEquals(0L, this.sink.pendingSize());
    }

    @Test
    public void testAddToFailedReplica() {
        MutableInt mutableInt = new MutableInt(0);
        List list = (List) Stream.generate(() -> {
            return new CompletableFuture();
        }).limit(5L).collect(Collectors.toList());
        Mockito.when(this.conn.replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).then(invocationOnMock -> {
            return (CompletableFuture) list.get(mutableInt.getAndIncrement());
        });
        ServerCall serverCall = (ServerCall) Mockito.mock(ServerCall.class);
        WALKeyImpl wALKeyImpl = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(Long.valueOf(wALKeyImpl.estimatedSerializedSizeOf())).thenReturn(100L);
        Mockito.when(Long.valueOf(wALKeyImpl.getSequenceId())).thenReturn(1L);
        WALEdit wALEdit = (WALEdit) Mockito.mock(WALEdit.class);
        Mockito.when(Long.valueOf(wALEdit.estimatedSerializedSizeOf())).thenReturn(1000L);
        Mockito.when(Boolean.valueOf(this.manager.increase(ArgumentMatchers.anyLong()))).thenReturn(true);
        this.sink.add(wALKeyImpl, wALEdit, serverCall);
        ServerCall serverCall2 = (ServerCall) Mockito.mock(ServerCall.class);
        WALKeyImpl wALKeyImpl2 = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(Long.valueOf(wALKeyImpl2.estimatedSerializedSizeOf())).thenReturn(200L);
        Mockito.when(Long.valueOf(wALKeyImpl2.getSequenceId())).thenReturn(1L);
        WALEdit wALEdit2 = (WALEdit) Mockito.mock(WALEdit.class);
        Mockito.when(Long.valueOf(wALEdit2.estimatedSerializedSizeOf())).thenReturn(2000L);
        Mockito.when(Boolean.valueOf(this.manager.increase(ArgumentMatchers.anyLong()))).thenReturn(true);
        this.sink.add(wALKeyImpl2, wALEdit2, serverCall2);
        ((CompletableFuture) list.get(0)).complete(null);
        ((CompletableFuture) list.get(1)).completeExceptionally(new IOException("inject error"));
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(3))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CompletableFuture) list.get(2)).complete(null);
        Assert.assertEquals(0L, this.sink.pendingSize());
        ServerCall serverCall3 = (ServerCall) Mockito.mock(ServerCall.class);
        WALKeyImpl wALKeyImpl3 = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(Long.valueOf(wALKeyImpl3.estimatedSerializedSizeOf())).thenReturn(200L);
        Mockito.when(Long.valueOf(wALKeyImpl3.getSequenceId())).thenReturn(3L);
        this.sink.add(wALKeyImpl3, WALEdit.createFlushWALEdit(this.primary, ProtobufUtil.toFlushDescriptor(WALProtos.FlushDescriptor.FlushAction.START_FLUSH, this.primary, 2L, (Map) this.td.getColumnFamilyNames().stream().collect(Collectors.toMap(Function.identity(), bArr -> {
            return Collections.emptyList();
        }, (list2, list3) -> {
            throw new IllegalStateException();
        }, () -> {
            return new TreeMap(Bytes.BYTES_COMPARATOR);
        })))), serverCall3);
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(5))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CompletableFuture) list.get(3)).complete(null);
        ((CompletableFuture) list.get(4)).complete(null);
        Assert.assertEquals(0L, this.sink.pendingSize());
    }

    @Test
    public void testSizeCapacity() {
        MutableInt mutableInt = new MutableInt(0);
        List list = (List) Stream.generate(() -> {
            return new CompletableFuture();
        }).limit(6L).collect(Collectors.toList());
        Mockito.when(this.conn.replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).then(invocationOnMock -> {
            return (CompletableFuture) list.get(mutableInt.getAndIncrement());
        });
        for (int i = 0; i < 3; i++) {
            ServerCall serverCall = (ServerCall) Mockito.mock(ServerCall.class);
            WALKeyImpl wALKeyImpl = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
            Mockito.when(Long.valueOf(wALKeyImpl.estimatedSerializedSizeOf())).thenReturn(100L);
            Mockito.when(Long.valueOf(wALKeyImpl.getSequenceId())).thenReturn(Long.valueOf(i + 1));
            WALEdit wALEdit = (WALEdit) Mockito.mock(WALEdit.class);
            Mockito.when(Long.valueOf(wALEdit.estimatedSerializedSizeOf())).thenReturn(Long.valueOf((i + 1) * 600 * SpaceQuotaHelperForTests.ONE_KILOBYTE));
            Mockito.when(Boolean.valueOf(this.manager.increase(ArgumentMatchers.anyLong()))).thenReturn(true);
            this.sink.add(wALKeyImpl, wALEdit, serverCall);
        }
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(2))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CompletableFuture) list.get(0)).complete(null);
        ((CompletableFuture) list.get(1)).complete(null);
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(4))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CompletableFuture) list.get(2)).complete(null);
        ((CompletableFuture) list.get(3)).complete(null);
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(6))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CompletableFuture) list.get(4)).complete(null);
        ((CompletableFuture) list.get(5)).complete(null);
        Assert.assertEquals(0L, this.sink.pendingSize());
    }

    @Test
    public void testCountCapacity() {
        MutableInt mutableInt = new MutableInt(0);
        List list = (List) Stream.generate(() -> {
            return new CompletableFuture();
        }).limit(6L).collect(Collectors.toList());
        Mockito.when(this.conn.replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).then(invocationOnMock -> {
            return (CompletableFuture) list.get(mutableInt.getAndIncrement());
        });
        for (int i = 0; i < 7; i++) {
            ServerCall serverCall = (ServerCall) Mockito.mock(ServerCall.class);
            WALKeyImpl wALKeyImpl = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
            Mockito.when(Long.valueOf(wALKeyImpl.estimatedSerializedSizeOf())).thenReturn(100L);
            Mockito.when(Long.valueOf(wALKeyImpl.getSequenceId())).thenReturn(Long.valueOf(i + 1));
            WALEdit wALEdit = (WALEdit) Mockito.mock(WALEdit.class);
            Mockito.when(Long.valueOf(wALEdit.estimatedSerializedSizeOf())).thenReturn(1000L);
            Mockito.when(Boolean.valueOf(this.manager.increase(ArgumentMatchers.anyLong()))).thenReturn(true);
            this.sink.add(wALKeyImpl, wALEdit, serverCall);
        }
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(2))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CompletableFuture) list.get(0)).complete(null);
        ((CompletableFuture) list.get(1)).complete(null);
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(4))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CompletableFuture) list.get(2)).complete(null);
        ((CompletableFuture) list.get(3)).complete(null);
        ((AsyncClusterConnection) Mockito.verify(this.conn, Mockito.times(6))).replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CompletableFuture) list.get(4)).complete(null);
        ((CompletableFuture) list.get(5)).complete(null);
        Assert.assertEquals(0L, this.sink.pendingSize());
    }
}
