package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ReplicationTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.class */
public class TestReplicationWithWALExtendedAttributes {
    private static Admin replicationAdmin;
    private static Connection connection1;
    private static Table htable1;
    private static Table htable2;
    private static HBaseTestingUtil utility1;
    private static HBaseTestingUtil utility2;
    private static final long SLEEP_TIME = 500;
    private static final int NB_RETRIES = 10;

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationWithWALExtendedAttributes.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithWALExtendedAttributes.class);
    private static Configuration conf1 = HBaseConfiguration.create();
    private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithWALAnnotation");
    private static final byte[] FAMILY = Bytes.toBytes("f");
    private static final byte[] ROW = Bytes.toBytes("row");
    private static final byte[] ROW2 = Bytes.toBytes("row2");

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes$TestCoprocessorForWALAnnotationAtSink.class */
    public static class TestCoprocessorForWALAnnotationAtSink implements RegionCoprocessor, RegionObserver {
        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(this);
        }

        public void prePut(ObserverContext<RegionCoprocessorEnvironment> observerContext, Put put, WALEdit wALEdit) throws IOException {
            String bytes = Bytes.toString(put.getAttribute("extendedAttr1"));
            String bytes2 = Bytes.toString(put.getAttribute("extendedAttr2"));
            if (bytes == null || bytes2 == null) {
                throw new IOException("Failed to retrieve WAL annotations");
            }
            if (!bytes.equals("Value of Extended attribute 01") || !bytes2.equals("Value of Extended attribute 02")) {
                throw new IOException("Failed to retrieve WAL annotations..");
            }
        }

        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> observerContext, MiniBatchOperationInProgress<Mutation> miniBatchOperationInProgress) throws IOException {
            String bytes = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute("extendedAttr1"));
            String bytes2 = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute("extendedAttr2"));
            if (bytes == null || bytes2 == null) {
                throw new IOException("Failed to retrieve WAL annotations");
            }
            if (!bytes.equals("Value of Extended attribute 01") || !bytes2.equals("Value of Extended attribute 02")) {
                throw new IOException("Failed to retrieve WAL annotations..");
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes$TestCoprocessorForWALAnnotationAtSource.class */
    public static class TestCoprocessorForWALAnnotationAtSource implements RegionCoprocessor, RegionObserver {
        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(this);
        }

        public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> observerContext, WALKey wALKey, WALEdit wALEdit) throws IOException {
            wALKey.addExtendedAttribute("extendedAttr1", Bytes.toBytes("Value of Extended attribute 01"));
            wALKey.addExtendedAttribute("extendedAttr2", Bytes.toBytes("Value of Extended attribute 02"));
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes$TestReplicationSinkRegionServerEndpoint.class */
    public static final class TestReplicationSinkRegionServerEndpoint implements RegionServerCoprocessor, RegionServerObserver {
        public Optional<RegionServerObserver> getRegionServerObserver() {
            return Optional.of(this);
        }

        public void preReplicationSinkBatchMutate(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, AdminProtos.WALEntry wALEntry, Mutation mutation) throws IOException {
            super.preReplicationSinkBatchMutate(observerContext, wALEntry, mutation);
            attachWALExtendedAttributesToMutation(mutation, wALEntry.getKey().getExtendedAttributesList());
        }

        public void postReplicationSinkBatchMutate(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, AdminProtos.WALEntry wALEntry, Mutation mutation) throws IOException {
            super.postReplicationSinkBatchMutate(observerContext, wALEntry, mutation);
            TestReplicationWithWALExtendedAttributes.LOG.info("WALEntry extended attributes: {}", wALEntry.getKey().getExtendedAttributesList());
            TestReplicationWithWALExtendedAttributes.LOG.info("Mutation attributes: {}", mutation.getAttributesMap());
        }

        private void attachWALExtendedAttributesToMutation(Mutation mutation, List<WALProtos.Attribute> list) {
            if (list != null) {
                for (WALProtos.Attribute attribute : list) {
                    mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray());
                }
            }
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        Connection createConnection;
        Throwable th;
        conf1.set("zookeeper.znode.parent", "/1");
        conf1.setInt("replication.source.size.capacity", 10240);
        conf1.setLong("replication.source.sleepforretries", 100L);
        conf1.setInt("hbase.regionserver.maxlogs", 10);
        conf1.setLong("hbase.master.logcleaner.ttl", 10L);
        conf1.setInt("zookeeper.recovery.retry", 1);
        conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
        conf1.setLong("hbase.server.thread.wakefrequency", 100L);
        conf1.setInt("replication.stats.thread.period.seconds", 5);
        conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        conf1.setStrings("hbase.replication.rpc.codec", new String[]{KeyValueCodecWithTags.class.getName()});
        conf1.setStrings("hbase.coprocessor.user.region.classes", new String[]{TestCoprocessorForWALAnnotationAtSource.class.getName()});
        utility1 = new HBaseTestingUtil(conf1);
        utility1.startMiniZKCluster();
        MiniZooKeeperCluster zkCluster = utility1.getZkCluster();
        conf1 = utility1.getConfiguration();
        LOG.info("Setup first Zk");
        Configuration create = HBaseConfiguration.create(conf1);
        create.setInt("hfile.format.version", 3);
        create.set("zookeeper.znode.parent", "/2");
        create.setInt("hbase.client.retries.number", 6);
        create.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        create.setStrings("hbase.replication.rpc.codec", new String[]{KeyValueCodecWithTags.class.getName()});
        create.setStrings("hbase.coprocessor.user.region.classes", new String[]{TestCoprocessorForWALAnnotationAtSink.class.getName()});
        create.setStrings("hbase.coprocessor.regionserver.classes", new String[]{TestReplicationSinkRegionServerEndpoint.class.getName()});
        utility2 = new HBaseTestingUtil(create);
        utility2.setZkCluster(zkCluster);
        LOG.info("Setup second Zk");
        utility1.startMiniCluster(2);
        utility2.startMiniCluster(2);
        connection1 = ConnectionFactory.createConnection(conf1);
        replicationAdmin = connection1.getAdmin();
        replicationAdmin.addReplicationPeer("2", ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build());
        TableDescriptor build = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).setScope(1).build()).build();
        Connection createConnection2 = ConnectionFactory.createConnection(conf1);
        Throwable th2 = null;
        try {
            Admin admin = createConnection2.getAdmin();
            Throwable th3 = null;
            try {
                try {
                    admin.createTable(build, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
                    if (admin != null) {
                        if (0 != 0) {
                            try {
                                admin.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            admin.close();
                        }
                    }
                    createConnection = ConnectionFactory.createConnection(create);
                    th = null;
                } finally {
                }
                try {
                    admin = createConnection.getAdmin();
                    Throwable th5 = null;
                    try {
                        try {
                            admin.createTable(build, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
                            if (admin != null) {
                                if (0 != 0) {
                                    try {
                                        admin.close();
                                    } catch (Throwable th6) {
                                        th5.addSuppressed(th6);
                                    }
                                } else {
                                    admin.close();
                                }
                            }
                            htable1 = utility1.getConnection().getTable(TABLE_NAME);
                            htable2 = utility2.getConnection().getTable(TABLE_NAME);
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    if (createConnection != null) {
                        if (0 != 0) {
                            try {
                                createConnection.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        } else {
                            createConnection.close();
                        }
                    }
                }
            } finally {
            }
        } finally {
            if (createConnection2 != null) {
                if (0 != 0) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th8) {
                        th2.addSuppressed(th8);
                    }
                } else {
                    createConnection2.close();
                }
            }
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        Closeables.close(replicationAdmin, true);
        Closeables.close(connection1, true);
        utility2.shutdownMiniCluster();
        utility1.shutdownMiniCluster();
    }

    @Test
    public void testReplicationWithWALExtendedAttributes() throws Exception {
        Put put = new Put(ROW);
        put.addColumn(FAMILY, ROW, ROW);
        htable1 = utility1.getConnection().getTable(TABLE_NAME);
        htable1.put(put);
        Put put2 = new Put(ROW2);
        put2.addColumn(FAMILY, ROW2, ROW2);
        htable1.batch(Collections.singletonList(put2), new Object[1]);
        assertGetValues(new Get(ROW), ROW);
        assertGetValues(new Get(ROW2), ROW2);
    }

    private static void assertGetValues(Get get, byte[] bArr) throws IOException, InterruptedException {
        for (int i = 0; i < 10; i++) {
            if (i == 9) {
                Assert.fail("Waited too much time for put replication");
            }
            Result result = htable2.get(get);
            if (!result.isEmpty()) {
                Assert.assertArrayEquals(bArr, result.value());
                return;
            } else {
                LOG.info("Row not available");
                Thread.sleep(SLEEP_TIME);
            }
        }
    }
}
