package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.class */
public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
    private static String CLIENT_RETRIES_NUMBER = "hbase.region.replica.replication.client.retries.number";
    private Configuration conf;
    private AsyncClusterConnection connection;
    private TableDescriptors tableDescriptors;
    private int numRetries;
    private long operationTimeoutNs;
    private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache;
    private Cache<TableName, TableName> disabledTableCache;
    private final RetryCounterFactory retryCounterFactory = new RetryCounterFactory(HFile.MAXIMUM_KEY_LENGTH, 1000, 60000);

    @Override // org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint, org.apache.hadoop.hbase.replication.BaseReplicationEndpoint, org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = context.getConfiguration();
        this.tableDescriptors = context.getTableDescriptors();
        this.tableDescriptorCache = CacheBuilder.newBuilder().expireAfterWrite(this.conf.getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000), TimeUnit.MILLISECONDS).initialCapacity(10).maximumSize(1000L).build(new CacheLoader<TableName, Optional<TableDescriptor>>() { // from class: org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.1
            public Optional<TableDescriptor> load(TableName tableName) throws Exception {
                return Optional.ofNullable(RegionReplicaReplicationEndpoint.this.tableDescriptors.get(tableName));
            }
        });
        this.disabledTableCache = CacheBuilder.newBuilder().expireAfterWrite(this.conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000), TimeUnit.MILLISECONDS).initialCapacity(10).maximumSize(1000L).build();
        int i = this.conf.getInt("hbase.client.retries.number", 15);
        if (i > 10) {
            i /= this.conf.getInt("hbase.client.serverside.retries.multiplier", 3);
        }
        this.numRetries = this.conf.getInt(CLIENT_RETRIES_NUMBER, i);
        this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(this.conf.getInt("hbase.client.operation.timeout", 1200000));
        this.connection = context.getServer().getAsyncClusterConnection();
    }

    private boolean requiresReplication(Optional<TableDescriptor> optional, WAL.Entry entry) {
        if (entry.getEdit().isEmpty() || !optional.isPresent()) {
            return false;
        }
        return entry.getEdit().isMetaEdit() || optional.get().hasRegionMemStoreReplication();
    }

    private void getRegionLocations(CompletableFuture<RegionLocations> completableFuture, TableDescriptor tableDescriptor, byte[] bArr, byte[] bArr2, boolean z) {
        FutureUtils.addListener(this.connection.getRegionLocations(tableDescriptor.getTableName(), bArr2, z), (regionLocations, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            if (z) {
                completableFuture.complete(regionLocations);
            } else if (regionLocations.size() == tableDescriptor.getRegionReplication() && regionLocations.getDefaultRegionLocation() != null && Bytes.equals(regionLocations.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(), bArr)) {
                completableFuture.complete(regionLocations);
            } else {
                getRegionLocations(completableFuture, tableDescriptor, bArr, bArr2, true);
            }
        });
    }

    private void replicate(CompletableFuture<Long> completableFuture, RegionLocations regionLocations, TableDescriptor tableDescriptor, byte[] bArr, byte[] bArr2, List<WAL.Entry> list) {
        if (regionLocations.size() == 1) {
            LOG.info("Only one location for {}.{}, refresh the location cache only for meta now", tableDescriptor.getTableName(), Bytes.toString(bArr));
            if (tableDescriptor.isMetaTable()) {
                this.connection.getRegionLocator(tableDescriptor.getTableName()).clearRegionLocationCache();
            }
            completableFuture.complete(Long.valueOf(list.size()));
            return;
        }
        RegionInfo region = regionLocations.getDefaultRegionLocation().getRegion();
        if (!Bytes.equals(region.getEncodedNameAsBytes(), bArr)) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Skipping {} entries in table {} because located region {} is different than the original region {} from WALEdit", new Object[]{tableDescriptor.getTableName(), region.getEncodedName(), Bytes.toStringBinary(bArr)});
            }
            completableFuture.complete(Long.valueOf(list.size()));
            return;
        }
        AtomicReference atomicReference = new AtomicReference();
        AtomicInteger atomicInteger = new AtomicInteger(regionLocations.size() - 1);
        AtomicLong atomicLong = new AtomicLong(0L);
        int size = regionLocations.size();
        for (int i = 1; i < size; i++) {
            RegionInfo regionInfoForReplica = RegionReplicaUtil.getRegionInfoForReplica(region, i);
            FutureUtils.addListener(this.connection.replay(tableDescriptor.getTableName(), regionInfoForReplica.getEncodedNameAsBytes(), bArr2, list, regionInfoForReplica.getReplicaId(), this.numRetries, this.operationTimeoutNs), (l, th) -> {
                if (th != null) {
                    LOG.warn("Failed to replicate to {}", regionInfoForReplica, th);
                    atomicReference.compareAndSet(null, th);
                } else {
                    AtomicUtils.updateMax(atomicLong, l.longValue());
                }
                if (atomicInteger.decrementAndGet() == 0) {
                    if (atomicReference.get() != null) {
                        completableFuture.completeExceptionally((Throwable) atomicReference.get());
                    } else {
                        completableFuture.complete(Long.valueOf(atomicLong.get()));
                    }
                }
            });
        }
    }

    private void logSkipped(TableName tableName, List<WAL.Entry> list, String str) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Skipping {} entries because table {} is {}", new Object[]{Integer.valueOf(list.size()), tableName, str});
            Iterator<WAL.Entry> it = list.iterator();
            while (it.hasNext()) {
                LOG.trace("Skipping : {}", it.next());
            }
        }
    }

    private CompletableFuture<Long> replicate(TableDescriptor tableDescriptor, byte[] bArr, List<WAL.Entry> list) {
        if (this.disabledTableCache.getIfPresent(tableDescriptor.getTableName()) != null) {
            logSkipped(tableDescriptor.getTableName(), list, "cached as a disabled table");
            return CompletableFuture.completedFuture(Long.valueOf(list.size()));
        }
        byte[] cloneRow = CellUtil.cloneRow(list.get(0).getEdit().getCells().get(0));
        CompletableFuture<RegionLocations> completableFuture = new CompletableFuture<>();
        getRegionLocations(completableFuture, tableDescriptor, bArr, cloneRow, false);
        CompletableFuture<Long> completableFuture2 = new CompletableFuture<>();
        FutureUtils.addListener(completableFuture, (regionLocations, th) -> {
            if (th != null) {
                completableFuture2.completeExceptionally(th);
            } else if (regionLocations.getDefaultRegionLocation() == null) {
                completableFuture2.completeExceptionally(new HBaseIOException("No location found for default replica of table=" + tableDescriptor.getTableName() + " row='" + Bytes.toStringBinary(cloneRow) + "'"));
            } else {
                replicate(completableFuture2, regionLocations, tableDescriptor, bArr, cloneRow, list);
            }
        });
        return completableFuture2;
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
        TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
        long j = 0;
        RetryCounter create = this.retryCounterFactory.create();
        loop0: while (isRunning()) {
            treeMap.clear();
            j = 0;
            for (WAL.Entry entry : replicateContext.getEntries()) {
                try {
                    Optional<TableDescriptor> optional = (Optional) this.tableDescriptorCache.get(entry.getKey().getTableName());
                    if (requiresReplication(optional, entry)) {
                        ((List) ((Pair) treeMap.computeIfAbsent(entry.getKey().getEncodedRegionName(), bArr -> {
                            return Pair.newPair(optional.get(), new ArrayList());
                        })).getSecond()).add(entry);
                    } else {
                        j++;
                    }
                } catch (ExecutionException e) {
                    LOG.warn("Failed to load table descriptor for {}, attempts={}", new Object[]{entry.getKey().getTableName(), Integer.valueOf(create.getAttemptTimes()), e.getCause()});
                    if (!create.shouldRetry()) {
                        return false;
                    }
                    try {
                        create.sleepUntilNextRetry();
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        return false;
                    }
                }
            }
        }
        RetryCounter create2 = this.retryCounterFactory.create();
        while (isRunning()) {
            ArrayList<Pair> arrayList = new ArrayList();
            for (Map.Entry entry2 : treeMap.entrySet()) {
                arrayList.add(Pair.newPair(replicate((TableDescriptor) ((Pair) entry2.getValue()).getFirst(), (byte[]) entry2.getKey(), (List) ((Pair) entry2.getValue()).getSecond()), entry2.getKey()));
            }
            for (Pair pair : arrayList) {
                byte[] bArr2 = (byte[]) pair.getSecond();
                try {
                    j += ((Long) ((CompletableFuture) pair.getFirst()).get()).longValue();
                    treeMap.remove(bArr2);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                    return false;
                } catch (ExecutionException e4) {
                    Pair pair2 = (Pair) treeMap.get(bArr2);
                    TableName tableName = ((TableDescriptor) pair2.getFirst()).getTableName();
                    List<WAL.Entry> list = (List) pair2.getSecond();
                    if (e4.getCause() instanceof TableNotFoundException) {
                        this.tableDescriptorCache.put(tableName, Optional.empty());
                        logSkipped(tableName, list, "dropped");
                        j += list.size();
                        treeMap.remove(bArr2);
                    } else {
                        boolean z = false;
                        try {
                            z = ((Boolean) this.connection.getAdmin().isTableDisabled(tableName).get()).booleanValue();
                        } catch (InterruptedException e5) {
                            Thread.currentThread().interrupt();
                            return false;
                        } catch (ExecutionException e6) {
                            LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName, e6.getCause());
                        }
                        if (z) {
                            this.disabledTableCache.put(tableName, tableName);
                            logSkipped(tableName, list, "disabled");
                            j += list.size();
                            treeMap.remove(bArr2);
                        } else {
                            LOG.warn("Failed to replicate {} entries for region {} of table {}", new Object[]{Integer.valueOf(list.size()), Bytes.toStringBinary(bArr2), tableName});
                        }
                    }
                }
            }
            if (treeMap.isEmpty()) {
                this.ctx.getMetrics().incrLogEditsFiltered(j);
                return true;
            }
            LOG.warn("Failed to replicate all entries, retry={}", Integer.valueOf(create2.getAttemptTimes()));
            if (!create2.shouldRetry()) {
                return false;
            }
            try {
                create2.sleepUntilNextRetry();
            } catch (InterruptedException e7) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return false;
    }

    @Override // org.apache.hadoop.hbase.replication.BaseReplicationEndpoint, org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public boolean canReplicateToSameCluster() {
        return true;
    }

    @Override // org.apache.hadoop.hbase.replication.BaseReplicationEndpoint
    protected WALEntryFilter getScopeWALEntryFilter() {
        return null;
    }
}
