/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.AbstractReconnectionHandler;
import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ClusterNameMismatchException;
import com.datastax.driver.core.Connection;
import com.datastax.driver.core.DefaultResultSetFuture;
import com.datastax.driver.core.EndPoint;
import com.datastax.driver.core.GuavaCompatibility;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.ProtocolEvent;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Requests;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SchemaElement;
import com.datastax.driver.core.SchemaParser;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.SystemProperties;
import com.datastax.driver.core.Token;
import com.datastax.driver.core.VersionNumber;
import com.datastax.driver.core.exceptions.BusyConnectionException;
import com.datastax.driver.core.exceptions.ConnectionException;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.DriverInternalError;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.ServerError;
import com.datastax.driver.core.exceptions.UnsupportedProtocolVersionException;
import com.datastax.driver.core.utils.MoreFutures;
import com.datastax.driver.core.utils.MoreObjects;
import cz.o2.proxima.cassandra.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.cassandra.shaded.com.google.common.util.concurrent.FutureCallback;
import cz.o2.proxima.cassandra.shaded.com.google.common.util.concurrent.ListenableFuture;
import cz.o2.proxima.cassandra.shaded.com.google.common.util.concurrent.SettableFuture;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ControlConnection
implements Connection.Owner {
    private static final Logger logger = LoggerFactory.getLogger(ControlConnection.class);
    private static final boolean EXTENDED_PEER_CHECK = SystemProperties.getBoolean("com.datastax.driver.EXTENDED_PEER_CHECK", true);
    private static final InetAddress bindAllAddress;
    private static final String SELECT_PEERS = "SELECT * FROM system.peers";
    private static final String SELECT_PEERS_V2 = "SELECT * FROM system.peers_v2";
    private static final String SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'";
    private static final String SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version, host_id FROM system.peers";
    private static final String SELECT_SCHEMA_LOCAL = "SELECT schema_version, host_id FROM system.local WHERE key='local'";
    private static final VersionNumber _3_11;
    @VisibleForTesting
    final AtomicReference<Connection> connectionRef = new AtomicReference();
    private final Cluster.Manager cluster;
    private final AtomicReference<ListenableFuture<?>> reconnectionAttempt = new AtomicReference();
    private volatile boolean isShutdown;
    private volatile boolean isPeersV2 = true;
    private volatile boolean isCloud = false;

    public ControlConnection(Cluster.Manager manager) {
        this.cluster = manager;
    }

    void connect() throws UnsupportedProtocolVersionException {
        if (this.isShutdown) {
            return;
        }
        ArrayList<Host> hosts = new ArrayList<Host>(this.cluster.metadata.getContactPoints());
        Collections.shuffle(hosts);
        this.setNewConnection(this.reconnectInternal(hosts.iterator(), true));
    }

    CloseFuture closeAsync() {
        Connection connection;
        this.isShutdown = true;
        ListenableFuture<?> r = this.reconnectionAttempt.get();
        if (r != null) {
            r.cancel(false);
        }
        return (connection = this.connectionRef.get()) == null ? CloseFuture.immediateFuture() : connection.closeAsync().force();
    }

    Host connectedHost() {
        Connection current = this.connectionRef.get();
        return current == null ? null : this.cluster.metadata.getHost(current.endPoint);
    }

    void triggerReconnect() {
        this.backgroundReconnect(0L);
    }

    private void backgroundReconnect(long initialDelayMs) {
        if (this.isShutdown) {
            return;
        }
        ListenableFuture<?> reconnection = this.reconnectionAttempt.get();
        if (reconnection != null && !reconnection.isDone()) {
            return;
        }
        new AbstractReconnectionHandler("Control connection", this.cluster.reconnectionExecutor, this.cluster.reconnectionPolicy().newSchedule(), this.reconnectionAttempt, initialDelayMs){

            @Override
            protected Connection tryReconnect() throws ConnectionException {
                if (ControlConnection.this.isShutdown) {
                    throw new ConnectionException(null, "Control connection was shut down");
                }
                try {
                    return ControlConnection.this.reconnectInternal(ControlConnection.this.queryPlan(), false);
                }
                catch (NoHostAvailableException e) {
                    throw new ConnectionException(null, e.getMessage());
                }
                catch (UnsupportedProtocolVersionException e) {
                    throw new AssertionError();
                }
            }

            @Override
            protected void onReconnection(Connection connection) {
                if (ControlConnection.this.isShutdown) {
                    connection.closeAsync().force();
                    return;
                }
                ControlConnection.this.setNewConnection(connection);
            }

            @Override
            protected boolean onConnectionException(ConnectionException e, long nextDelayMs) {
                if (ControlConnection.this.isShutdown) {
                    return false;
                }
                logger.error("[Control connection] Cannot connect to any host, scheduling retry in {} milliseconds", (Object)nextDelayMs);
                return true;
            }

            @Override
            protected boolean onUnknownException(Exception e, long nextDelayMs) {
                if (ControlConnection.this.isShutdown) {
                    return false;
                }
                logger.error(String.format("[Control connection] Unknown error during reconnection, scheduling retry in %d milliseconds", nextDelayMs), (Throwable)e);
                return true;
            }
        }.start();
    }

    private Iterator<Host> queryPlan() {
        return this.cluster.loadBalancingPolicy().newQueryPlan(null, Statement.DEFAULT);
    }

    private void signalError() {
        Connection connection = this.connectionRef.get();
        if (connection != null) {
            connection.closeAsync().force();
        }
        this.backgroundReconnect(0L);
    }

    private void setNewConnection(Connection newConnection) {
        Host.statesLogger.debug("[Control connection] established to {}", (Object)newConnection.endPoint);
        newConnection.setOwner(this);
        Connection old = this.connectionRef.getAndSet(newConnection);
        if (old != null && !old.isClosed()) {
            old.closeAsync().force();
        }
    }

    private Connection reconnectInternal(Iterator<Host> iter, boolean isInitialConnection) throws UnsupportedProtocolVersionException {
        Map<EndPoint, Throwable> errors = null;
        Host host = null;
        try {
            while (iter.hasNext()) {
                host = iter.next();
                if (!host.convictionPolicy.canReconnectNow()) continue;
                try {
                    return this.tryConnect(host, isInitialConnection);
                }
                catch (ConnectionException e) {
                    errors = ControlConnection.logError(host, e, errors, iter);
                    if (!isInitialConnection) continue;
                    host.setDown();
                }
                catch (ExecutionException e) {
                    errors = ControlConnection.logError(host, e.getCause(), errors, iter);
                }
                catch (UnsupportedProtocolVersionException e) {
                    if (isInitialConnection) {
                        throw e;
                    }
                    logger.debug("Ignoring host {}: {}", (Object)host, (Object)e.getMessage());
                    errors = ControlConnection.logError(host, e, errors, iter);
                }
                catch (ClusterNameMismatchException e) {
                    logger.debug("Ignoring host {}: {}", (Object)host, (Object)e.getMessage());
                    errors = ControlConnection.logError(host, e, errors, iter);
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            errors = ControlConnection.logError(host, new DriverException("Connection thread interrupted"), errors, iter);
            while (iter.hasNext()) {
                errors = ControlConnection.logError(iter.next(), new DriverException("Connection thread interrupted"), errors, iter);
            }
        }
        throw new NoHostAvailableException(errors == null ? Collections.emptyMap() : errors);
    }

    private static Map<EndPoint, Throwable> logError(Host host, Throwable exception, Map<EndPoint, Throwable> errors, Iterator<Host> iter) {
        if (errors == null) {
            errors = new HashMap<EndPoint, Throwable>();
        }
        errors.put(host.getEndPoint(), exception);
        if (logger.isDebugEnabled()) {
            if (iter.hasNext()) {
                logger.debug(String.format("[Control connection] error on %s connection, trying next host", host), exception);
            } else {
                logger.debug(String.format("[Control connection] error on %s connection, no more host to try", host), exception);
            }
        }
        return errors;
    }

    private Connection tryConnect(Host host, boolean isInitialConnection) throws ConnectionException, ExecutionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException {
        Connection connection = this.cluster.connectionFactory.open(host);
        String productType = (String)connection.optionsQuery().get();
        if (productType.equals("DATASTAX_APOLLO")) {
            this.isCloud = true;
        }
        if (this.cluster.connectionFactory.protocolVersion == null) {
            this.cluster.connectionFactory.protocolVersion = ProtocolVersion.NEWEST_SUPPORTED;
        }
        try {
            logger.trace("[Control connection] Registering for events");
            List<ProtocolEvent.Type> evs = Arrays.asList(ProtocolEvent.Type.TOPOLOGY_CHANGE, ProtocolEvent.Type.STATUS_CHANGE, ProtocolEvent.Type.SCHEMA_CHANGE);
            connection.write(new Requests.Register(evs));
            this.refreshNodeListAndTokenMap(connection, this.cluster, isInitialConnection, true);
            logger.debug("[Control connection] Refreshing schema");
            ControlConnection.refreshSchema(connection, null, null, null, null, this.cluster);
            return connection;
        }
        catch (BusyConnectionException e) {
            connection.closeAsync().force();
            throw new DriverInternalError("Newly created connection should not be busy");
        }
        catch (InterruptedException e) {
            connection.closeAsync().force();
            throw e;
        }
        catch (ConnectionException e) {
            connection.closeAsync().force();
            throw e;
        }
        catch (ExecutionException e) {
            connection.closeAsync().force();
            throw e;
        }
        catch (RuntimeException e) {
            connection.closeAsync().force();
            throw e;
        }
    }

    public void refreshSchema(SchemaElement targetType, String targetKeyspace, String targetName, List<String> signature) throws InterruptedException {
        logger.debug("[Control connection] Refreshing schema for {}{}", (Object)(targetType == null ? "everything" : targetKeyspace), (Object)(targetType == SchemaElement.KEYSPACE ? "" : "." + targetName + " (" + (Object)((Object)targetType) + ")"));
        try {
            Connection c = this.connectionRef.get();
            if (c == null || c.isClosed()) {
                return;
            }
            ControlConnection.refreshSchema(c, targetType, targetKeyspace, targetName, signature, this.cluster);
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refreshing schema ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            if (!this.isShutdown) {
                logger.error("[Control connection] Unexpected error while refreshing schema", (Throwable)e);
            }
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
    }

    static void refreshSchema(Connection connection, SchemaElement targetType, String targetKeyspace, String targetName, List<String> targetSignature, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        SchemaParser schemaParser;
        VersionNumber cassandraVersion;
        Host host = cluster.metadata.getHost(connection.endPoint);
        if (host == null || host.getCassandraVersion() == null) {
            cassandraVersion = cluster.protocolVersion().minCassandraVersion();
            logger.warn("Cannot find Cassandra version for host {} to parse the schema, using {} based on protocol version in use. If parsing the schema fails, this could be the cause", (Object)connection.endPoint, (Object)cassandraVersion);
        } else {
            cassandraVersion = host.getCassandraVersion();
        }
        if (host == null) {
            schemaParser = SchemaParser.forVersion(cassandraVersion);
        } else {
            VersionNumber dseVersion = host.getDseVersion();
            SchemaParser schemaParser2 = schemaParser = dseVersion == null ? SchemaParser.forVersion(cassandraVersion) : SchemaParser.forDseVersion(dseVersion);
            if (dseVersion != null && dseVersion.getMajor() == 6 && dseVersion.getMinor() < 8) {
                cassandraVersion = _3_11;
            }
        }
        schemaParser.refresh(cluster.getCluster(), targetType, targetKeyspace, targetName, targetSignature, connection, cassandraVersion);
    }

    void refreshNodeListAndTokenMap() {
        Connection c = this.connectionRef.get();
        if (c == null || c.isClosed()) {
            return;
        }
        try {
            this.refreshNodeListAndTokenMap(c, this.cluster, false, true);
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refreshing node list and token map ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            if (!this.isShutdown) {
                logger.error("[Control connection] Unexpected error while refreshing node list and token map", (Throwable)e);
            }
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("[Control connection] Interrupted while refreshing node list and token map, skipping it.");
        }
    }

    private static EndPoint endPointForPeerHost(Row peersRow, EndPoint connectedEndPoint, Cluster.Manager cluster) {
        EndPoint endPoint = cluster.configuration.getPolicies().getEndPointFactory().create(peersRow);
        if (connectedEndPoint.equals(endPoint)) {
            logger.debug("System.peers on node {} has a line for itself. This is not normal but is a known problem of some DSE versions. Ignoring the entry.", (Object)connectedEndPoint);
            return null;
        }
        return endPoint;
    }

    private Row fetchNodeInfo(Host host, Connection c) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        boolean isConnectedHost = c.endPoint.equals(host.getEndPoint());
        if (isConnectedHost || host.getBroadcastSocketAddress() != null) {
            String query;
            if (isConnectedHost) {
                query = SELECT_LOCAL;
            } else {
                InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress();
                query = this.isPeersV2 ? "SELECT * FROM system.peers_v2 WHERE peer='" + broadcastAddress.getAddress().getHostAddress() + "' AND peer_port=" + broadcastAddress.getPort() : "SELECT * FROM system.peers WHERE peer='" + broadcastAddress.getAddress().getHostAddress() + "'";
            }
            DefaultResultSetFuture future = new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query(query));
            c.write(future);
            Row row = ((ResultSet)future.get()).one();
            if (row != null) {
                return row;
            }
            InetSocketAddress address = host.getBroadcastSocketAddress();
            String addressToUse = address.getPort() != 0 ? address.toString() : address.getAddress().toString();
            logger.debug("Could not find peer with broadcast address {}, falling back to a full system.peers scan to fetch info for {} (this can happen if the broadcast address changed)", (Object)addressToUse, (Object)host);
        }
        ListenableFuture<ResultSet> future = this.selectPeersFuture(c);
        for (Row row : (ResultSet)future.get()) {
            UUID rowId = row.getUUID("host_id");
            if (!host.getHostId().equals(rowId)) continue;
            return row;
        }
        return null;
    }

    boolean refreshNodeInfo(Host host) {
        Connection c = this.connectionRef.get();
        if (c == null || c.isClosed()) {
            return true;
        }
        logger.debug("[Control connection] Refreshing node info on {}", (Object)host);
        try {
            Row row = this.fetchNodeInfo(host, c);
            if (row == null) {
                if (c.isDefunct()) {
                    logger.debug("Control connection is down, could not refresh node info");
                    return true;
                }
                logger.warn("No row found for host {} in {}'s peers system table. {} will be ignored.", new Object[]{host.getEndPoint(), c.endPoint, host.getEndPoint()});
                return false;
            }
            if (!c.endPoint.equals(host.getEndPoint()) && !this.isValidPeer(row, true)) {
                return false;
            }
            ControlConnection.updateInfo(host, row, this.cluster, false);
            return true;
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refreshing node info ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            if (!this.isShutdown) {
                logger.debug("[Control connection] Unexpected error while refreshing node info", (Throwable)e);
            }
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("[Control connection] Interrupted while refreshing node info, skipping it.");
        }
        catch (Exception e) {
            logger.debug("[Control connection] Unexpected error while refreshing node info", (Throwable)e);
            this.signalError();
        }
        return true;
    }

    private static void updateInfo(Host host, Row row, Cluster.Manager cluster, boolean isInitialConnection) {
        int broadcastPort;
        if (!row.isNull("data_center") || !row.isNull("rack")) {
            ControlConnection.updateLocationInfo(host, row.getString("data_center"), row.getString("rack"), isInitialConnection, cluster);
        }
        String version = row.getString("release_version");
        host.setVersion(version);
        InetSocketAddress broadcastRpcAddress = null;
        if (row.getColumnDefinitions().contains("native_address")) {
            InetAddress nativeAddress = row.getInet("native_address");
            int nativePort = row.getInt("native_port");
            broadcastRpcAddress = new InetSocketAddress(nativeAddress, nativePort);
        } else if (row.getColumnDefinitions().contains("rpc_address")) {
            InetAddress rpcAddress = row.getInet("rpc_address");
            broadcastRpcAddress = new InetSocketAddress(rpcAddress, cluster.connectionFactory.getPort());
        }
        host.setBroadcastRpcAddress(broadcastRpcAddress);
        InetSocketAddress broadcastSocketAddress = null;
        if (row.getColumnDefinitions().contains("peer")) {
            broadcastPort = row.getColumnDefinitions().contains("peer_port") ? row.getInt("peer_port") : 0;
            broadcastSocketAddress = new InetSocketAddress(row.getInet("peer"), broadcastPort);
        } else if (row.getColumnDefinitions().contains("broadcast_address")) {
            broadcastPort = row.getColumnDefinitions().contains("broadcast_port") ? row.getInt("broadcast_port") : 0;
            broadcastSocketAddress = new InetSocketAddress(row.getInet("broadcast_address"), broadcastPort);
        }
        host.setBroadcastSocketAddress(broadcastSocketAddress);
        InetSocketAddress listenAddress = null;
        if (row.getColumnDefinitions().contains("listen_address")) {
            int listenPort = row.getColumnDefinitions().contains("listen_port") ? row.getInt("listen_port") : 0;
            listenAddress = new InetSocketAddress(row.getInet("listen_address"), listenPort);
        }
        host.setListenSocketAddress(listenAddress);
        if (row.getColumnDefinitions().contains("workload")) {
            String dseWorkload = row.getString("workload");
            host.setDseWorkload(dseWorkload);
        }
        if (row.getColumnDefinitions().contains("graph")) {
            boolean isDseGraph = row.getBool("graph");
            host.setDseGraphEnabled(isDseGraph);
        }
        if (row.getColumnDefinitions().contains("dse_version")) {
            String dseVersion = row.getString("dse_version");
            host.setDseVersion(dseVersion);
        }
        host.setHostId(row.getUUID("host_id"));
        host.setSchemaVersion(row.getUUID("schema_version"));
    }

    private static void updateLocationInfo(Host host, String datacenter, String rack, boolean isInitialConnection, Cluster.Manager cluster) {
        if (MoreObjects.equal(host.getDatacenter(), datacenter) && MoreObjects.equal(host.getRack(), rack)) {
            return;
        }
        if (!isInitialConnection) {
            cluster.loadBalancingPolicy().onRemove(host);
        }
        host.setLocationInfo(datacenter, rack);
        if (!isInitialConnection) {
            cluster.loadBalancingPolicy().onAdd(host);
        }
    }

    private ListenableFuture<ResultSet> selectPeersFuture(final Connection connection) {
        if (this.isPeersV2) {
            DefaultResultSetFuture peersV2Future = new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query(SELECT_PEERS_V2));
            connection.write(peersV2Future);
            final SettableFuture<ResultSet> peersFuture = SettableFuture.create();
            GuavaCompatibility.INSTANCE.addCallback(peersV2Future, new FutureCallback<ResultSet>(){

                @Override
                public void onSuccess(ResultSet result) {
                    peersFuture.set(result);
                }

                @Override
                public void onFailure(Throwable t) {
                    if (t instanceof InvalidQueryException || t instanceof ServerError && t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)")) {
                        ControlConnection.this.isPeersV2 = false;
                        MoreFutures.propagateFuture(peersFuture, ControlConnection.this.selectPeersFuture(connection));
                    } else {
                        peersFuture.setException(t);
                    }
                }
            });
            return peersFuture;
        }
        DefaultResultSetFuture peersFuture = new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
        connection.write(peersFuture);
        return peersFuture;
    }

    private void refreshNodeListAndTokenMap(Connection connection, Cluster.Manager cluster, boolean isInitialConnection, boolean logInvalidPeers) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        Host controlHost;
        logger.debug("[Control connection] Refreshing node list and token map");
        boolean metadataEnabled = cluster.configuration.getQueryOptions().isMetadataEnabled();
        DefaultResultSetFuture localFuture = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL));
        ListenableFuture<ResultSet> peersFuture = this.selectPeersFuture(connection);
        connection.write(localFuture);
        String partitioner = null;
        Token.Factory factory = null;
        HashMap<Host, Set<Token>> tokenMap = new HashMap<Host, Set<Token>>();
        Row localRow = ((ResultSet)localFuture.get()).one();
        if (localRow == null) {
            throw new IllegalStateException(String.format("system.local is empty on %s, this should not happen", connection.endPoint));
        }
        String clusterName = localRow.getString("cluster_name");
        if (clusterName != null) {
            cluster.metadata.clusterName = clusterName;
        }
        if ((partitioner = localRow.getString("partitioner")) != null) {
            cluster.metadata.partitioner = partitioner;
            factory = Token.getFactory(partitioner);
        }
        Host host = controlHost = isInitialConnection ? cluster.metadata.getContactPoint(connection.endPoint) : cluster.metadata.getHost(connection.endPoint);
        if (controlHost == null) {
            logger.debug("Host in local system table ({}) unknown to us (ok if said host just got removed)", (Object)connection.endPoint);
        } else {
            Set<String> tokensStr;
            ControlConnection.updateInfo(controlHost, localRow, cluster, isInitialConnection);
            if (metadataEnabled && factory != null && !(tokensStr = localRow.getSet("tokens", String.class)).isEmpty()) {
                Set<Token> tokens = ControlConnection.toTokens(factory, tokensStr);
                tokenMap.put(controlHost, tokens);
            }
            if (isInitialConnection) {
                cluster.metadata.addIfAbsent(controlHost);
            }
        }
        ArrayList<EndPoint> foundHosts = new ArrayList<EndPoint>();
        ArrayList<String> dcs = new ArrayList<String>();
        ArrayList<String> racks = new ArrayList<String>();
        ArrayList<String> cassandraVersions = new ArrayList<String>();
        ArrayList<InetSocketAddress> broadcastRpcAddresses = new ArrayList<InetSocketAddress>();
        ArrayList<InetSocketAddress> broadcastAddresses = new ArrayList<InetSocketAddress>();
        ArrayList<InetSocketAddress> listenAddresses = new ArrayList<InetSocketAddress>();
        ArrayList<Set<Token>> allTokens = new ArrayList<Set<Token>>();
        ArrayList<String> dseVersions = new ArrayList<String>();
        ArrayList<Boolean> dseGraphEnabled = new ArrayList<Boolean>();
        ArrayList<String> dseWorkloads = new ArrayList<String>();
        ArrayList<UUID> hostIds = new ArrayList<UUID>();
        ArrayList<UUID> schemaVersions = new ArrayList<UUID>();
        for (Row row : (ResultSet)peersFuture.get()) {
            InetSocketAddress broadcastRpcAddress;
            EndPoint endPoint;
            if (!this.isValidPeer(row, logInvalidPeers) || (endPoint = ControlConnection.endPointForPeerHost(row, connection.endPoint, cluster)) == null) continue;
            foundHosts.add(endPoint);
            dcs.add(row.getString("data_center"));
            racks.add(row.getString("rack"));
            cassandraVersions.add(row.getString("release_version"));
            if (row.getColumnDefinitions().contains("native_address")) {
                InetAddress nativeAddress = row.getInet("native_address");
                int nativePort = row.getInt("native_port");
                broadcastRpcAddress = new InetSocketAddress(nativeAddress, nativePort);
            } else {
                InetAddress rpcAddress = row.getInet("rpc_address");
                broadcastRpcAddress = new InetSocketAddress(rpcAddress, cluster.connectionFactory.getPort());
            }
            broadcastRpcAddresses.add(broadcastRpcAddress);
            int broadcastPort = row.getColumnDefinitions().contains("peer_port") ? row.getInt("peer_port") : 0;
            InetSocketAddress broadcastAddress = new InetSocketAddress(row.getInet("peer"), broadcastPort);
            broadcastAddresses.add(broadcastAddress);
            if (metadataEnabled && factory != null) {
                Set<String> tokensStr = row.getSet("tokens", String.class);
                Set<Token> tokens = null;
                if (!tokensStr.isEmpty()) {
                    tokens = ControlConnection.toTokens(factory, tokensStr);
                }
                allTokens.add(tokens);
            }
            if (row.getColumnDefinitions().contains("listen_address") && !row.isNull("listen_address")) {
                int listenPort = row.getColumnDefinitions().contains("listen_port") ? row.getInt("listen_port") : 0;
                InetSocketAddress listenAddress = new InetSocketAddress(row.getInet("listen_address"), listenPort);
                listenAddresses.add(listenAddress);
            } else {
                listenAddresses.add(null);
            }
            String dseWorkload = row.getColumnDefinitions().contains("workload") ? row.getString("workload") : null;
            dseWorkloads.add(dseWorkload);
            Boolean isDseGraph = row.getColumnDefinitions().contains("graph") ? Boolean.valueOf(row.getBool("graph")) : null;
            dseGraphEnabled.add(isDseGraph);
            String dseVersion = row.getColumnDefinitions().contains("dse_version") ? row.getString("dse_version") : null;
            dseVersions.add(dseVersion);
            hostIds.add(row.getUUID("host_id"));
            schemaVersions.add(row.getUUID("schema_version"));
        }
        for (int i = 0; i < foundHosts.size(); ++i) {
            Host peerHost = isInitialConnection ? cluster.metadata.getContactPoint((EndPoint)foundHosts.get(i)) : cluster.metadata.getHost((EndPoint)foundHosts.get(i));
            boolean isNew = false;
            if (peerHost == null) {
                Host newHost = cluster.metadata.newHost((EndPoint)foundHosts.get(i));
                newHost.setHostId((UUID)hostIds.get(i));
                Host previous = cluster.metadata.addIfAbsent(newHost);
                if (previous == null) {
                    peerHost = newHost;
                    isNew = true;
                } else {
                    peerHost = previous;
                    isNew = false;
                }
            }
            if (dcs.get(i) != null || racks.get(i) != null) {
                ControlConnection.updateLocationInfo(peerHost, (String)dcs.get(i), (String)racks.get(i), isInitialConnection, cluster);
            }
            if (cassandraVersions.get(i) != null) {
                peerHost.setVersion((String)cassandraVersions.get(i));
            }
            if (broadcastRpcAddresses.get(i) != null) {
                peerHost.setBroadcastRpcAddress((InetSocketAddress)broadcastRpcAddresses.get(i));
            }
            if (broadcastAddresses.get(i) != null) {
                peerHost.setBroadcastSocketAddress((InetSocketAddress)broadcastAddresses.get(i));
            }
            if (listenAddresses.get(i) != null) {
                peerHost.setListenSocketAddress((InetSocketAddress)listenAddresses.get(i));
            }
            if (dseVersions.get(i) != null) {
                peerHost.setDseVersion((String)dseVersions.get(i));
            }
            if (dseWorkloads.get(i) != null) {
                peerHost.setDseWorkload((String)dseWorkloads.get(i));
            }
            if (dseGraphEnabled.get(i) != null) {
                peerHost.setDseGraphEnabled((Boolean)dseGraphEnabled.get(i));
            }
            peerHost.setHostId((UUID)hostIds.get(i));
            if (schemaVersions.get(i) != null) {
                peerHost.setSchemaVersion((UUID)schemaVersions.get(i));
            }
            if (metadataEnabled && factory != null && allTokens.get(i) != null) {
                tokenMap.put(peerHost, (Set<Token>)allTokens.get(i));
            }
            if (!isNew && isInitialConnection) {
                cluster.metadata.addIfAbsent(peerHost);
            }
            if (!isNew || isInitialConnection) continue;
            cluster.triggerOnAdd(peerHost);
        }
        HashSet foundHostsSet = new HashSet(foundHosts);
        for (Host host2 : cluster.metadata.allHosts()) {
            if (host2.getEndPoint().equals(connection.endPoint) || foundHostsSet.contains(host2.getEndPoint())) continue;
            cluster.removeHost(host2, isInitialConnection);
        }
        if (metadataEnabled && factory != null && !tokenMap.isEmpty()) {
            cluster.metadata.rebuildTokenMap(factory, tokenMap);
        }
    }

    private static Set<Token> toTokens(Token.Factory factory, Set<String> tokensStr) {
        LinkedHashSet<Token> tokens = new LinkedHashSet<Token>(tokensStr.size());
        for (String tokenStr : tokensStr) {
            tokens.add(factory.fromString(tokenStr));
        }
        return tokens;
    }

    private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
        boolean isValid;
        boolean bl = isValid = peerRow.getColumnDefinitions().contains("host_id") && !peerRow.isNull("host_id");
        isValid = this.isPeersV2 ? (isValid &= peerRow.getColumnDefinitions().contains("native_address") && peerRow.getColumnDefinitions().contains("native_port") && !peerRow.isNull("native_address") && !peerRow.isNull("native_port")) : (isValid &= peerRow.getColumnDefinitions().contains("rpc_address") && !peerRow.isNull("rpc_address"));
        if (EXTENDED_PEER_CHECK) {
            isValid &= peerRow.getColumnDefinitions().contains("data_center") && !peerRow.isNull("data_center") && peerRow.getColumnDefinitions().contains("rack") && !peerRow.isNull("rack") && peerRow.getColumnDefinitions().contains("tokens") && !peerRow.isNull("tokens");
        }
        if (!isValid && logIfInvalid) {
            logger.warn("Found invalid row in system.peers: {}. This is likely a gossip or snitch issue, this host will be ignored.", (Object)this.formatInvalidPeer(peerRow));
        }
        return isValid;
    }

    private String formatInvalidPeer(Row peerRow) {
        StringBuilder sb = new StringBuilder("[peer=" + peerRow.getInet("peer"));
        if (this.isPeersV2) {
            ControlConnection.formatMissingOrNullColumn(peerRow, "native_address", sb);
            ControlConnection.formatMissingOrNullColumn(peerRow, "native_port", sb);
        } else {
            ControlConnection.formatMissingOrNullColumn(peerRow, "rpc_address", sb);
        }
        if (EXTENDED_PEER_CHECK) {
            ControlConnection.formatMissingOrNullColumn(peerRow, "host_id", sb);
            ControlConnection.formatMissingOrNullColumn(peerRow, "data_center", sb);
            ControlConnection.formatMissingOrNullColumn(peerRow, "rack", sb);
            ControlConnection.formatMissingOrNullColumn(peerRow, "tokens", sb);
        }
        sb.append("]");
        return sb.toString();
    }

    private static void formatMissingOrNullColumn(Row peerRow, String columnName, StringBuilder sb) {
        if (!peerRow.getColumnDefinitions().contains(columnName)) {
            sb.append(", missing ").append(columnName);
        } else if (peerRow.isNull(columnName)) {
            sb.append(", ").append(columnName).append("=null");
        }
    }

    static boolean waitForSchemaAgreement(Connection connection, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        long start = System.nanoTime();
        long elapsed = 0L;
        int maxSchemaAgreementWaitSeconds = cluster.configuration.getProtocolOptions().getMaxSchemaAgreementWaitSeconds();
        while (elapsed < (long)(maxSchemaAgreementWaitSeconds * 1000)) {
            if (ControlConnection.checkSchemaAgreement(connection, cluster)) {
                return true;
            }
            Thread.sleep(200L);
            elapsed = Cluster.timeSince(start, TimeUnit.MILLISECONDS);
        }
        return false;
    }

    private static boolean checkSchemaAgreement(Connection connection, Cluster.Manager cluster) throws InterruptedException, ExecutionException {
        DefaultResultSetFuture peersFuture = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_SCHEMA_PEERS));
        DefaultResultSetFuture localFuture = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_SCHEMA_LOCAL));
        connection.write(peersFuture);
        connection.write(localFuture);
        HashSet<UUID> versions = new HashSet<UUID>();
        Row localRow = ((ResultSet)localFuture.get()).one();
        if (localRow != null && !localRow.isNull("schema_version")) {
            versions.add(localRow.getUUID("schema_version"));
        }
        for (Row row : (ResultSet)peersFuture.get()) {
            Host peer;
            UUID hostId = row.getUUID("host_id");
            if (row.isNull("schema_version") || (peer = cluster.metadata.getHost(hostId)) == null || !peer.isUp()) continue;
            versions.add(row.getUUID("schema_version"));
        }
        logger.debug("Checking for schema agreement: versions are {}", versions);
        return versions.size() <= 1;
    }

    boolean checkSchemaAgreement() throws ConnectionException, BusyConnectionException, InterruptedException, ExecutionException {
        Connection connection = this.connectionRef.get();
        return connection != null && !connection.isClosed() && ControlConnection.checkSchemaAgreement(connection, this.cluster);
    }

    boolean isOpen() {
        Connection c = this.connectionRef.get();
        return c != null && !c.isClosed();
    }

    boolean isCloud() {
        return this.isCloud;
    }

    public void onUp(Host host) {
    }

    public void onAdd(Host host) {
    }

    public void onDown(Host host) {
        this.onHostGone(host);
    }

    public void onRemove(Host host) {
        this.onHostGone(host);
    }

    private void onHostGone(Host host) {
        Connection current = this.connectionRef.get();
        if (current != null && current.endPoint.equals(host.getEndPoint())) {
            logger.debug("[Control connection] {} is down/removed and it was the control host, triggering reconnect", (Object)current.endPoint);
            if (!current.isClosed()) {
                current.closeAsync().force();
            }
            this.backgroundReconnect(0L);
        }
    }

    @Override
    public void onConnectionDefunct(Connection connection) {
        if (connection == this.connectionRef.get()) {
            this.backgroundReconnect(0L);
        }
    }

    static {
        try {
            bindAllAddress = InetAddress.getByAddress(new byte[4]);
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
        _3_11 = VersionNumber.parse("3.11.0");
    }
}

