package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.class */
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint implements Abortable {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
    protected Configuration conf;
    private AsyncClusterConnection conn;
    public static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
    public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
    private float ratio;
    private int badSinkThreshold;
    private Map<ServerName, Integer> badReportCounts;
    private ZKWatcher zkw = null;
    private final Object zkwLock = new Object();
    private List<ServerName> sinkServers = new ArrayList(0);

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint$PeerRegionServerListener.class */
    public static class PeerRegionServerListener extends ZKListener {
        private final HBaseReplicationEndpoint replicationEndpoint;
        private final String regionServerListNode;

        public PeerRegionServerListener(HBaseReplicationEndpoint hBaseReplicationEndpoint) {
            super(hBaseReplicationEndpoint.zkw);
            this.replicationEndpoint = hBaseReplicationEndpoint;
            this.regionServerListNode = hBaseReplicationEndpoint.zkw.getZNodePaths().rsZNode;
        }

        public synchronized void nodeChildrenChanged(String str) {
            if (str.equals(this.regionServerListNode)) {
                HBaseReplicationEndpoint.LOG.info("Detected change to peer region servers, fetching updated list");
                this.replicationEndpoint.chooseSinks();
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint$SinkPeer.class */
    public static class SinkPeer {
        private ServerName serverName;
        private AsyncRegionServerAdmin regionServer;

        public SinkPeer(ServerName serverName, AsyncRegionServerAdmin asyncRegionServerAdmin) {
            this.serverName = serverName;
            this.regionServer = asyncRegionServerAdmin;
        }

        ServerName getServerName() {
            return this.serverName;
        }

        public AsyncRegionServerAdmin getRegionServer() {
            return this.regionServer;
        }
    }

    protected AsyncClusterConnection createConnection(Configuration configuration) throws IOException {
        return ClusterConnectionFactory.createAsyncClusterConnection(configuration, null, User.getCurrent());
    }

    @Override // org.apache.hadoop.hbase.replication.BaseReplicationEndpoint, org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = HBaseConfiguration.create(this.ctx.getConfiguration());
        this.ratio = this.ctx.getConfiguration().getFloat("replication.source.ratio", 0.5f);
        this.badSinkThreshold = this.ctx.getConfiguration().getInt("replication.bad.sink.threshold", 3);
        this.badReportCounts = Maps.newHashMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void disconnect() {
        synchronized (this.zkwLock) {
            if (this.zkw != null) {
                this.zkw.close();
            }
        }
        if (this.conn != null) {
            try {
                this.conn.close();
                this.conn = null;
            } catch (IOException e) {
                LOG.warn("{} Failed to close the connection", this.ctx.getPeerId());
            }
        }
    }

    private void reconnect(KeeperException keeperException) {
        if ((keeperException instanceof KeeperException.ConnectionLossException) || (keeperException instanceof KeeperException.SessionExpiredException) || (keeperException instanceof KeeperException.AuthFailedException)) {
            String clusterKey = this.ctx.getPeerConfig().getClusterKey();
            LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, keeperException);
            try {
                reloadZkWatcher();
            } catch (IOException e) {
                LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, e);
            }
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public void start() {
        startAsync();
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public void stop() {
        stopAsync();
    }

    protected void doStart() {
        try {
            reloadZkWatcher();
            connectPeerCluster();
            notifyStarted();
        } catch (IOException e) {
            notifyFailed(e);
        }
    }

    protected void doStop() {
        disconnect();
        notifyStopped();
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public UUID getPeerUUID() {
        UUID uuid = null;
        try {
            synchronized (this.zkwLock) {
                uuid = ZKClusterId.getUUIDForCluster(this.zkw);
            }
        } catch (KeeperException e) {
            reconnect(e);
        }
        return uuid;
    }

    private void reloadZkWatcher() throws IOException {
        synchronized (this.zkwLock) {
            if (this.zkw != null) {
                this.zkw.close();
            }
            this.zkw = new ZKWatcher(this.ctx.getConfiguration(), "connection to cluster: " + this.ctx.getPeerId(), this);
            this.zkw.registerListener(new PeerRegionServerListener(this));
        }
    }

    private void connectPeerCluster() throws IOException {
        try {
            this.conn = createConnection(this.conf);
        } catch (IOException e) {
            LOG.warn("{} Failed to create connection for peer cluster", this.ctx.getPeerId(), e);
            throw e;
        }
    }

    public void abort(String str, Throwable th) {
        LOG.error("The HBaseReplicationEndpoint corresponding to peer " + this.ctx.getPeerId() + " was aborted for the following reason(s):" + str, th);
    }

    public boolean isAborted() {
        return false;
    }

    protected List<ServerName> fetchSlavesAddresses() {
        List list = null;
        try {
            synchronized (this.zkwLock) {
                list = ZKUtil.listChildrenAndWatchForNewChildren(this.zkw, this.zkw.getZNodePaths().rsZNode);
            }
        } catch (KeeperException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Fetch slaves addresses failed", e);
            }
            reconnect(e);
        }
        if (list == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(list.size());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(ServerName.parseServerName((String) it.next()));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void chooseSinks() {
        List<ServerName> fetchSlavesAddresses = fetchSlavesAddresses();
        if (fetchSlavesAddresses.isEmpty()) {
            LOG.warn("No sinks available at peer. Will not be able to replicate");
        }
        Collections.shuffle(fetchSlavesAddresses, ThreadLocalRandom.current());
        this.sinkServers = fetchSlavesAddresses.subList(0, (int) Math.ceil(fetchSlavesAddresses.size() * this.ratio));
        this.badReportCounts.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized int getNumSinks() {
        return this.sinkServers.size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized SinkPeer getReplicationSink() throws IOException {
        if (this.sinkServers.isEmpty()) {
            LOG.info("Current list of sinks is out of date or empty, updating");
            chooseSinks();
        }
        if (this.sinkServers.isEmpty()) {
            throw new IOException("No replication sinks are available");
        }
        ServerName serverName = this.sinkServers.get(ThreadLocalRandom.current().nextInt(this.sinkServers.size()));
        return new SinkPeer(serverName, this.conn.getRegionServerAdmin(serverName));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void reportBadSink(SinkPeer sinkPeer) {
        ServerName serverName = sinkPeer.getServerName();
        if (this.badReportCounts.compute(serverName, (serverName2, num) -> {
            return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
        }).intValue() > this.badSinkThreshold) {
            this.sinkServers.remove(serverName);
            if (this.sinkServers.isEmpty()) {
                chooseSinks();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
        this.badReportCounts.remove(sinkPeer.getServerName());
    }

    List<ServerName> getSinkServers() {
        return this.sinkServers;
    }
}
