/*
 * Decompiled with CFR 0.152.
 */
package org.voltdb.client;

import com.google_voltpatches.common.base.Throwables;
import com.google_voltpatches.common.collect.ImmutableList;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import javax.security.auth.Subject;
import jsr166y.ThreadLocalRandom;
import org.cliffc_voltpatches.high_scale_lib.NonBlockingHashMap;
import org.json_voltpatches.JSONException;
import org.json_voltpatches.JSONObject;
import org.voltcore.network.Connection;
import org.voltcore.network.QueueMonitor;
import org.voltcore.network.VoltNetworkPool;
import org.voltcore.network.VoltProtocolHandler;
import org.voltcore.utils.CoreUtils;
import org.voltcore.utils.Pair;
import org.voltdb.ClientResponseImpl;
import org.voltdb.VoltTable;
import org.voltdb.client.ClientAffinityStats;
import org.voltdb.client.ClientAuthHashScheme;
import org.voltdb.client.ClientConfig;
import org.voltdb.client.ClientIOStats;
import org.voltdb.client.ClientImpl;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ClientStats;
import org.voltdb.client.ClientStatsContext;
import org.voltdb.client.ClientStatusListenerExt;
import org.voltdb.client.ConnectionUtil;
import org.voltdb.client.HashinatorLite;
import org.voltdb.client.NoConnectionsException;
import org.voltdb.client.ProcedureCallback;
import org.voltdb.client.ProcedureInvocation;
import org.voltdb.client.RateLimiter;

class Distributer {
    static int RESUBSCRIPTION_DELAY_MS = Integer.getInteger("RESUBSCRIPTION_DELAY_MS", 10000);
    static final long PING_HANDLE = Long.MAX_VALUE;
    public static final Long ASYNC_TOPO_HANDLE = 0x7FFFFFFFFFFFFFFEL;
    static final long USE_DEFAULT_TIMEOUT = 0L;
    public final AtomicLong m_sysHandle = new AtomicLong(-1L);
    private final CopyOnWriteArrayList<NodeConnection> m_connections = new CopyOnWriteArrayList();
    private final ArrayList<ClientStatusListenerExt> m_listeners = new ArrayList();
    private final VoltNetworkPool m_network;
    private int m_nextConnection = 0;
    private final boolean m_useMultipleThreads;
    private final boolean m_useClientAffinity;
    private final Map<Integer, NodeConnection> m_partitionMasters = new HashMap<Integer, NodeConnection>();
    private final Map<Integer, NodeConnection[]> m_partitionReplicas = new HashMap<Integer, NodeConnection[]>();
    private final Map<Integer, NodeConnection> m_hostIdToConnection = new HashMap<Integer, NodeConnection>();
    private final Map<String, Procedure> m_procedureInfo = new HashMap<String, Procedure>();
    private HashinatorLite m_hashinator = null;
    private final long m_procedureCallTimeoutNanos;
    private static final long MINIMUM_LONG_RUNNING_SYSTEM_CALL_TIMEOUT_MS = 1800000L;
    private final long m_connectionResponseTimeoutNanos;
    private final Map<Integer, ClientAffinityStats> m_clientAffinityStats = new HashMap<Integer, ClientAffinityStats>();
    public final RateLimiter m_rateLimiter = new RateLimiter();
    private final ScheduledExecutorService m_ex = Executors.newSingleThreadScheduledExecutor(CoreUtils.getThreadFactory("VoltDB Client Reaper Thread"));
    ScheduledFuture<?> m_timeoutReaperHandle;
    private Object[] m_clusterInstanceId;
    private String m_buildString;
    private NodeConnection m_subscribedConnection = null;
    private boolean m_subscriptionRequestPending = false;
    private boolean m_fetchedCatalog = false;
    private final Subject m_subject;

    private static boolean isLongOp(String procName) {
        return procName.startsWith("@") && (procName.equals("@UpdateApplicationCatalog") || procName.equals("@SnapshotSave"));
    }

    void drain() throws InterruptedException {
        boolean more;
        long sleep = 500L;
        do {
            more = false;
            for (NodeConnection cxn : this.m_connections) {
                more = more || cxn.m_callbacksToInvoke.get() > 0;
            }
            if (!more) continue;
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            LockSupport.parkNanos(TimeUnit.MICROSECONDS.toNanos(sleep));
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            if (sleep >= 5000L) continue;
            sleep += 500L;
        } while (more);
    }

    Distributer() {
        this(false, ClientConfig.DEFAULT_PROCEDURE_TIMOUT_NANOS, 120000L, false, null);
    }

    Distributer(boolean useMultipleThreads, long procedureCallTimeoutNanos, long connectionResponseTimeoutMS, boolean useClientAffinity, Subject subject) {
        this.m_useMultipleThreads = useMultipleThreads;
        this.m_network = new VoltNetworkPool(this.m_useMultipleThreads ? Math.max(1, CoreUtils.availableProcessors() / 4) : 1, 1, null, "Client");
        this.m_network.start();
        this.m_procedureCallTimeoutNanos = procedureCallTimeoutNanos;
        this.m_connectionResponseTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(connectionResponseTimeoutMS);
        this.m_useClientAffinity = useClientAffinity;
        this.m_timeoutReaperHandle = this.m_ex.scheduleAtFixedRate(new CallExpiration(), 1L, 1L, TimeUnit.SECONDS);
        this.m_subject = subject;
    }

    void createConnection(String host, String program, String password, int port, ClientAuthHashScheme scheme) throws UnknownHostException, IOException {
        byte[] hashedPassword = ConnectionUtil.getHashedPassword(scheme, password);
        this.createConnectionWithHashedCredentials(host, program, hashedPassword, port, scheme);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void createConnectionWithHashedCredentials(String host, String program, byte[] hashedPassword, int port, ClientAuthHashScheme scheme) throws UnknownHostException, IOException {
        Object[] socketChannelAndInstanceIdAndBuildString = ConnectionUtil.getAuthenticatedConnection(host, program, hashedPassword, port, this.m_subject, scheme);
        InetSocketAddress address = new InetSocketAddress(host, port);
        SocketChannel aChannel = (SocketChannel)socketChannelAndInstanceIdAndBuildString[0];
        long[] instanceIdWhichIsTimestampAndLeaderIp = (long[])socketChannelAndInstanceIdAndBuildString[1];
        int hostId = (int)instanceIdWhichIsTimestampAndLeaderIp[0];
        NodeConnection cxn = new NodeConnection(instanceIdWhichIsTimestampAndLeaderIp);
        Connection c = this.m_network.registerChannel(aChannel, cxn);
        cxn.m_connection = c;
        Distributer distributer = this;
        synchronized (distributer) {
            if (this.m_connections.size() == 0) {
                this.m_clusterInstanceId = null;
            }
            if (this.m_clusterInstanceId == null) {
                long timestamp = instanceIdWhichIsTimestampAndLeaderIp[2];
                int addr = (int)instanceIdWhichIsTimestampAndLeaderIp[3];
                this.m_clusterInstanceId = new Object[]{timestamp, addr};
            } else if ((Long)this.m_clusterInstanceId[0] != instanceIdWhichIsTimestampAndLeaderIp[2] || ((Integer)this.m_clusterInstanceId[1]).longValue() != instanceIdWhichIsTimestampAndLeaderIp[3]) {
                c.unregister();
                throw new IOException("Cluster instance id mismatch. Current is " + this.m_clusterInstanceId[0] + "," + this.m_clusterInstanceId[1] + " and server's was " + instanceIdWhichIsTimestampAndLeaderIp[2] + "," + instanceIdWhichIsTimestampAndLeaderIp[3]);
            }
            this.m_buildString = (String)socketChannelAndInstanceIdAndBuildString[2];
            this.m_connections.add(cxn);
        }
        if (this.m_useClientAffinity) {
            distributer = this;
            synchronized (distributer) {
                this.m_hostIdToConnection.put(hostId, cxn);
            }
            if (this.m_subscribedConnection == null) {
                this.subscribeToNewNode();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeToNewNode() {
        NodeConnection cxn = null;
        Distributer distributer = this;
        synchronized (distributer) {
            this.m_subscribedConnection = null;
            if (this.m_connections.isEmpty()) {
                return;
            }
            cxn = this.m_connections.get(new Random().nextInt(this.m_connections.size()));
            this.m_subscriptionRequestPending = true;
            this.m_subscribedConnection = cxn;
        }
        try {
            ProcedureInvocation spi = new ProcedureInvocation(this.m_sysHandle.getAndDecrement(), "@Subscribe", "TOPOLOGY");
            ByteBuffer buf = this.serializeSPI(spi);
            cxn.createWork(System.nanoTime(), spi.getHandle(), spi.getProcName(), this.serializeSPI(spi), new SubscribeCallback(), true, 0L);
            spi = new ProcedureInvocation(this.m_sysHandle.getAndDecrement(), "@Statistics", "TOPO", 0);
            cxn.createWork(System.nanoTime(), spi.getHandle(), spi.getProcName(), this.serializeSPI(spi), new TopoUpdateCallback(), true, 0L);
            if (!this.m_fetchedCatalog) {
                spi = new ProcedureInvocation(this.m_sysHandle.getAndDecrement(), "@SystemCatalog", "PROCEDURES");
                cxn.createWork(System.nanoTime(), spi.getHandle(), spi.getProcName(), this.serializeSPI(spi), new ProcUpdateCallback(), true, 0L);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean queue(ProcedureInvocation invocation, ProcedureCallback cb, boolean ignoreBackpressure, long nowNanos, long timeoutNanos) throws NoConnectionsException {
        assert (invocation != null);
        assert (cb != null);
        NodeConnection cxn = null;
        boolean backpressure = true;
        Distributer distributer = this;
        synchronized (distributer) {
            int totalConnections = this.m_connections.size();
            if (totalConnections == 0) {
                throw new NoConnectionsException("No connections.");
            }
            if (this.m_useClientAffinity && this.m_hashinator != null) {
                ClientAffinityStats stats;
                Procedure procedureInfo = this.m_procedureInfo.get(invocation.getProcName());
                Integer hashedPartition = -1;
                if (procedureInfo != null) {
                    hashedPartition = 16383;
                    if (!procedureInfo.multiPart && procedureInfo.partitionParameter < invocation.getPassedParamCount()) {
                        hashedPartition = this.m_hashinator.getHashedPartitionForParameter(procedureInfo.partitionParameterType, invocation.getPartitionParamValue(procedureInfo.partitionParameter));
                    }
                    if (!procedureInfo.multiPart && procedureInfo.readOnly) {
                        NodeConnection[] partitionReplicas = this.m_partitionReplicas.get(hashedPartition);
                        if (partitionReplicas != null && partitionReplicas.length > 0) {
                            cxn = partitionReplicas[ThreadLocalRandom.current().nextInt(partitionReplicas.length)];
                            if (cxn.hadBackPressure()) {
                                for (NodeConnection nc : partitionReplicas) {
                                    if (nc.hadBackPressure() || !nc.m_isConnected) continue;
                                    cxn = nc;
                                    break;
                                }
                            }
                            if (!cxn.hadBackPressure() || ignoreBackpressure) {
                                backpressure = false;
                            }
                        }
                    } else {
                        cxn = this.m_partitionMasters.get(hashedPartition);
                        if (cxn != null && !cxn.hadBackPressure() || ignoreBackpressure) {
                            backpressure = false;
                        }
                    }
                }
                if (cxn != null && !cxn.m_isConnected) {
                    cxn = null;
                }
                if ((stats = this.m_clientAffinityStats.get(hashedPartition)) == null) {
                    stats = new ClientAffinityStats(hashedPartition, 0L, 0L, 0L, 0L);
                    this.m_clientAffinityStats.put(hashedPartition, stats);
                }
                if (cxn != null) {
                    if (procedureInfo != null && procedureInfo.readOnly) {
                        stats.addAffinityRead();
                    } else {
                        stats.addAffinityWrite();
                    }
                } else if (procedureInfo != null && procedureInfo.readOnly) {
                    stats.addRrRead();
                } else {
                    stats.addRrWrite();
                }
            }
            if (cxn == null) {
                for (int i = 0; i < totalConnections; ++i) {
                    if ((cxn = this.m_connections.get(Math.abs(++this.m_nextConnection % totalConnections))).hadBackPressure() && !ignoreBackpressure) continue;
                    backpressure = false;
                    break;
                }
            }
            if (backpressure) {
                cxn = null;
                for (ClientStatusListenerExt s : this.m_listeners) {
                    s.backpressure(true);
                }
            }
        }
        if (cxn != null) {
            ByteBuffer buf = null;
            try {
                buf = this.serializeSPI(invocation);
            }
            catch (Exception e) {
                Throwables.propagate(e);
            }
            cxn.createWork(nowNanos, invocation.getHandle(), invocation.getProcName(), buf, cb, ignoreBackpressure, timeoutNanos);
        }
        return !backpressure;
    }

    final void shutdown() throws InterruptedException {
        this.m_timeoutReaperHandle.cancel(false);
        this.m_ex.shutdown();
        this.m_ex.awaitTermination(1L, TimeUnit.SECONDS);
        this.m_network.shutdown();
    }

    void uncaughtException(ProcedureCallback cb, ClientResponse r, Throwable t) {
        boolean handledByClient = false;
        for (ClientStatusListenerExt csl : this.m_listeners) {
            if (csl instanceof ClientImpl.CSL) continue;
            try {
                csl.uncaughtException(cb, r, t);
                handledByClient = true;
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (!handledByClient) {
            t.printStackTrace();
        }
    }

    synchronized void addClientStatusListener(ClientStatusListenerExt listener) {
        if (!this.m_listeners.contains(listener)) {
            this.m_listeners.add(listener);
        }
    }

    synchronized boolean removeClientStatusListener(ClientStatusListenerExt listener) {
        return this.m_listeners.remove(listener);
    }

    ClientStatsContext createStatsContext() {
        return new ClientStatsContext(this, this.getStatsSnapshot(), this.getIOStatsSnapshot(), this.getAffinityStatsSnapshot());
    }

    Map<Long, Map<String, ClientStats>> getStatsSnapshot() {
        TreeMap<Long, Map<String, ClientStats>> retval = new TreeMap<Long, Map<String, ClientStats>>();
        for (NodeConnection conn : this.m_connections) {
            TreeMap connMap = new TreeMap();
            for (Map.Entry e : conn.m_stats.entrySet()) {
                connMap.put(e.getKey(), (ClientStats)((ClientStats)e.getValue()).clone());
            }
            retval.put(conn.connectionId(), connMap);
        }
        return retval;
    }

    Map<Long, ClientIOStats> getIOStatsSnapshot() {
        Map<Long, Pair<String, long[]>> ioStats;
        TreeMap<Long, ClientIOStats> retval = new TreeMap<Long, ClientIOStats>();
        try {
            ioStats = this.m_network.getIOStats(false, ImmutableList.<VoltNetworkPool.IOStatsIntf>of());
        }
        catch (Exception e) {
            return null;
        }
        for (NodeConnection conn : this.m_connections) {
            Pair<String, long[]> perConnIOStats = ioStats.get(conn.connectionId());
            if (perConnIOStats == null) continue;
            long read = perConnIOStats.getSecond()[0];
            long write = perConnIOStats.getSecond()[2];
            ClientIOStats cios = new ClientIOStats(conn.connectionId(), read, write);
            retval.put(conn.connectionId(), cios);
        }
        return retval;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Map<Integer, ClientAffinityStats> getAffinityStatsSnapshot() {
        HashMap<Integer, ClientAffinityStats> retval = new HashMap<Integer, ClientAffinityStats>();
        Distributer distributer = this;
        synchronized (distributer) {
            for (Map.Entry<Integer, ClientAffinityStats> e : this.m_clientAffinityStats.entrySet()) {
                retval.put(e.getKey(), (ClientAffinityStats)e.getValue().clone());
            }
        }
        return retval;
    }

    public synchronized Object[] getInstanceId() {
        return this.m_clusterInstanceId;
    }

    public synchronized void resetInstanceId() {
        this.m_clusterInstanceId = null;
    }

    public String getBuildString() {
        return this.m_buildString;
    }

    public List<Long> getThreadIds() {
        return this.m_network.getThreadIds();
    }

    public List<InetSocketAddress> getConnectedHostList() {
        ArrayList<InetSocketAddress> addressList = new ArrayList<InetSocketAddress>();
        for (NodeConnection conn : this.m_connections) {
            addressList.add(conn.getSocketAddress());
        }
        return Collections.unmodifiableList(addressList);
    }

    private void updateAffinityTopology(VoltTable[] tables) {
        VoltTable vt = tables[0];
        boolean cooked = false;
        if (tables.length == 1) {
            int numPartitions = vt.getRowCount() - 1;
            this.m_hashinator = new HashinatorLite(numPartitions);
        } else {
            boolean advanced = tables[1].advanceRow();
            if (!advanced) {
                System.err.println("Topology description received from Volt was incomplete performance will be lower because transactions can't be routed at this client");
                return;
            }
            this.m_hashinator = new HashinatorLite(HashinatorLite.HashinatorLiteType.valueOf(tables[1].getString("HASHTYPE")), tables[1].getVarbinary("HASHCONFIG"), cooked);
        }
        this.m_partitionMasters.clear();
        this.m_partitionReplicas.clear();
        while (vt.advanceRow()) {
            Integer partition = (int)vt.getLong("Partition");
            ArrayList<NodeConnection> connections = new ArrayList<NodeConnection>();
            for (String site : vt.getString("Sites").split(",")) {
                Integer hostId = Integer.valueOf((site = site.trim()).split(":")[0]);
                if (!this.m_hostIdToConnection.containsKey(hostId)) continue;
                connections.add(this.m_hostIdToConnection.get(hostId));
            }
            this.m_partitionReplicas.put(partition, connections.toArray(new NodeConnection[0]));
            Integer leaderHostId = Integer.valueOf(vt.getString("Leader").split(":")[0]);
            if (!this.m_hostIdToConnection.containsKey(leaderHostId)) continue;
            this.m_partitionMasters.put(partition, this.m_hostIdToConnection.get(leaderHostId));
        }
    }

    private void updateProcedurePartitioning(VoltTable vt) {
        this.m_procedureInfo.clear();
        while (vt.advanceRow()) {
            try {
                String jsString = vt.getString(6);
                String procedureName = vt.getString(2);
                JSONObject jsObj = new JSONObject(jsString);
                boolean readOnly = jsObj.getBoolean("readOnly");
                if (jsObj.getBoolean("singlePartition")) {
                    int partitionParameter = jsObj.getInt("partitionParameter");
                    int partitionParameterType = jsObj.getInt("partitionParameterType");
                    this.m_procedureInfo.put(procedureName, new Procedure(false, readOnly, partitionParameter, partitionParameterType));
                    continue;
                }
                this.m_procedureInfo.put(procedureName, new Procedure(true, readOnly, -1, -1));
            }
            catch (JSONException e) {
                e.printStackTrace();
            }
        }
    }

    public boolean isHashinatorInitialized() {
        return this.m_hashinator != null;
    }

    public long getPartitionForParameter(byte typeValue, Object value) {
        if (this.m_hashinator == null) {
            return -1L;
        }
        return this.m_hashinator.getHashedPartitionForParameter(typeValue, value);
    }

    public HashinatorLite.HashinatorLiteType getHashinatorType() {
        if (this.m_hashinator == null) {
            return HashinatorLite.HashinatorLiteType.LEGACY;
        }
        return this.m_hashinator.getConfigurationType();
    }

    private ByteBuffer serializeSPI(ProcedureInvocation pi) throws IOException {
        ByteBuffer buf = ByteBuffer.allocate(pi.getSerializedSize() + 4);
        buf.putInt(buf.capacity() - 4);
        pi.flattenToBuffer(buf);
        buf.flip();
        return buf;
    }

    long getProcedureTimeoutNanos() {
        return this.m_procedureCallTimeoutNanos;
    }

    class NodeConnection
    extends VoltProtocolHandler
    implements QueueMonitor {
        private final AtomicInteger m_callbacksToInvoke = new AtomicInteger(0);
        private final ConcurrentMap<Long, CallbackBookeeping> m_callbacks = new ConcurrentHashMap<Long, CallbackBookeeping>();
        private final NonBlockingHashMap<String, ClientStats> m_stats = new NonBlockingHashMap();
        private Connection m_connection;
        private volatile boolean m_isConnected = true;
        volatile long m_lastResponseTimeNanos = System.nanoTime();
        boolean m_outstandingPing = false;
        ClientStatusListenerExt.DisconnectCause m_closeCause = ClientStatusListenerExt.DisconnectCause.CONNECTION_CLOSED;
        private int m_queuedBytes = 0;
        private final int m_maxQueuedBytes = 262144;

        public NodeConnection(long[] ids) {
        }

        public void createWork(long nowNanos, long handle, String name, ByteBuffer c, ProcedureCallback callback, boolean ignoreBackpressure, long timeoutNanos) {
            assert (callback != null);
            timeoutNanos = timeoutNanos == 0L ? Distributer.this.m_procedureCallTimeoutNanos : timeoutNanos;
            long timeoutTime = nowNanos + timeoutNanos;
            long afterRateLimitNanos = 0L;
            try {
                afterRateLimitNanos = Distributer.this.m_rateLimiter.sendTxnWithOptionalBlockAndReturnCurrentTime(nowNanos, timeoutNanos, ignoreBackpressure);
            }
            catch (TimeoutException e) {
                long deltaNanos = Math.max(1L, System.nanoTime() - nowNanos);
                this.invokeCallbackWithTimeout(name, callback, deltaNanos, afterRateLimitNanos, timeoutNanos, handle, ignoreBackpressure);
                return;
            }
            assert (!this.m_callbacks.containsKey(handle));
            int callbacksToInvoke = this.m_callbacksToInvoke.incrementAndGet();
            assert (callbacksToInvoke >= 0);
            this.m_callbacks.put(handle, new CallbackBookeeping(nowNanos, callback, name, timeoutNanos, ignoreBackpressure));
            long timeoutRemaining = timeoutTime - afterRateLimitNanos;
            if (timeoutNanos < TimeUnit.SECONDS.toNanos(1L) && !Distributer.isLongOp(name)) {
                this.submitDiscreteTimeoutTask(handle, Math.max(0L, timeoutRemaining));
            }
            if (!this.m_isConnected) {
                if (this.m_callbacks.remove(handle) == null) {
                    return;
                }
                ClientResponseImpl r = new ClientResponseImpl(-4, new VoltTable[0], "Connection to database host (" + this.m_connection.getHostnameAndIPAndPort() + ") was lost before a response was received");
                try {
                    callback.clientCallback(r);
                }
                catch (Exception e) {
                    Distributer.this.uncaughtException(callback, r, e);
                }
                int remainingToInvoke = this.m_callbacksToInvoke.decrementAndGet();
                assert (remainingToInvoke >= 0);
                Distributer.this.m_rateLimiter.transactionResponseReceived(nowNanos, -1, ignoreBackpressure);
                return;
            }
            this.m_connection.writeStream().enqueue(c);
        }

        void submitDiscreteTimeoutTask(final long handle, long timeoutNanos) {
            Distributer.this.m_ex.schedule(new Runnable(){

                @Override
                public void run() {
                    NodeConnection.this.handleTimedoutCallback(handle, System.nanoTime());
                }
            }, timeoutNanos, TimeUnit.NANOSECONDS);
        }

        void handleTimedoutCallback(long handle, long nowNanos) {
            CallbackBookeeping cb = (CallbackBookeeping)this.m_callbacks.remove(handle);
            if (cb == null) {
                return;
            }
            long deltaNanos = Math.max(1L, nowNanos - cb.timestampNanos);
            this.invokeCallbackWithTimeout(cb.name, cb.callback, deltaNanos, nowNanos, cb.procedureTimeoutNanos, handle, cb.ignoreBackpressure);
        }

        void invokeCallbackWithTimeout(String procName, ProcedureCallback callback, long deltaNanos, long nowNanos, long timeoutNanos, long handle, boolean ignoreBackpressure) {
            ClientResponseImpl r = new ClientResponseImpl(-6, -128, "", new VoltTable[0], String.format("No response received in the allotted time (set to %d ms).", TimeUnit.NANOSECONDS.toMillis(timeoutNanos)));
            r.setClientHandle(handle);
            r.setClientRoundtrip(deltaNanos);
            r.setClusterRoundtrip((int)TimeUnit.NANOSECONDS.toMillis(deltaNanos));
            try {
                callback.clientCallback(r);
            }
            catch (Throwable e1) {
                Distributer.this.uncaughtException(callback, r, e1);
            }
            int remainingToInvoke = this.m_callbacksToInvoke.decrementAndGet();
            assert (remainingToInvoke >= 0);
            Distributer.this.m_rateLimiter.transactionResponseReceived(nowNanos, -1, ignoreBackpressure);
            this.updateStatsForTimeout(procName, r.getClientRoundtripNanos(), r.getClusterRoundtrip());
        }

        void sendPing() {
            ProcedureInvocation invocation = new ProcedureInvocation(Long.MAX_VALUE, "@Ping", new Object[0]);
            ByteBuffer buf = ByteBuffer.allocate(4 + invocation.getSerializedSize());
            buf.putInt(buf.capacity() - 4);
            try {
                invocation.flattenToBuffer(buf);
                buf.flip();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            this.m_connection.writeStream().enqueue(buf);
            this.m_outstandingPing = true;
        }

        private void updateStatsForTimeout(final String procName, final long roundTripNanos, final int clusterRoundTrip) {
            this.m_connection.queueTask(new Runnable(){

                @Override
                public void run() {
                    NodeConnection.this.updateStats(procName, roundTripNanos, clusterRoundTrip, false, false, true);
                }
            });
        }

        private void updateStats(String procName, long roundTripNanos, int clusterRoundTrip, boolean abort, boolean failure, boolean timeout) {
            ClientStats stats = this.m_stats.get(procName);
            if (stats == null) {
                stats = new ClientStats();
                stats.m_connectionId = this.connectionId();
                stats.m_hostname = this.m_connection.getHostnameOrIP();
                stats.m_port = this.m_connection.getRemotePort();
                stats.m_procName = procName;
                stats.m_startTS = System.currentTimeMillis();
                stats.m_endTS = Long.MIN_VALUE;
                this.m_stats.put(procName, stats);
            }
            stats.update(roundTripNanos, clusterRoundTrip, abort, failure, timeout);
        }

        @Override
        public void handleMessage(ByteBuffer buf, Connection c) {
            long nowNanos = System.nanoTime();
            ClientResponseImpl response = new ClientResponseImpl();
            try {
                response.initFromBuffer(buf);
            }
            catch (IOException e1) {
                e1.printStackTrace();
            }
            this.m_lastResponseTimeNanos = nowNanos;
            long handle = response.getClientHandle();
            if (handle == Long.MAX_VALUE) {
                this.m_outstandingPing = false;
                return;
            }
            if (handle == ASYNC_TOPO_HANDLE) {
                TopoUpdateCallback cb = new TopoUpdateCallback();
                try {
                    cb.clientCallback(response);
                }
                catch (Exception e) {
                    Distributer.this.uncaughtException(cb, response, e);
                }
                return;
            }
            CallbackBookeeping stuff = (CallbackBookeeping)this.m_callbacks.remove(response.getClientHandle());
            if (stuff == null) {
                if (handle >= 0L) {
                    for (ClientStatusListenerExt listener : Distributer.this.m_listeners) {
                        listener.lateProcedureResponse(response, this.m_connection.getHostnameOrIP(), this.m_connection.getRemotePort());
                    }
                }
            } else {
                long callTimeNanos = stuff.timestampNanos;
                long deltaNanos = Math.max(1L, nowNanos - callTimeNanos);
                ProcedureCallback cb = stuff.callback;
                assert (cb != null);
                byte status = response.getStatus();
                boolean abort = false;
                boolean error = false;
                if (status == -1 || status == -2) {
                    abort = true;
                } else if (status != 1) {
                    error = true;
                }
                int clusterRoundTrip = response.getClusterRoundtrip();
                Distributer.this.m_rateLimiter.transactionResponseReceived(nowNanos, clusterRoundTrip, stuff.ignoreBackpressure);
                this.updateStats(stuff.name, deltaNanos, clusterRoundTrip, abort, error, false);
                response.setClientRoundtrip(deltaNanos);
                assert (response.getHash() == null);
                try {
                    cb.clientCallback(response);
                }
                catch (Exception e) {
                    Distributer.this.uncaughtException(cb, response, e);
                }
                int remainingToInvoke = this.m_callbacksToInvoke.decrementAndGet();
                assert (remainingToInvoke >= 0);
            }
        }

        @Override
        public int getMaxRead() {
            return Integer.MAX_VALUE;
        }

        public boolean hadBackPressure() {
            return this.m_connection.writeStream().hadBackPressure();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void stopping(Connection c) {
            super.stopping(c);
            this.m_isConnected = false;
            Distributer distributer = Distributer.this;
            synchronized (distributer) {
                Map.Entry entry;
                Iterator i = Distributer.this.m_partitionMasters.entrySet().iterator();
                while (i.hasNext()) {
                    entry = i.next();
                    if (entry.getValue() != this) continue;
                    i.remove();
                }
                i = Distributer.this.m_hostIdToConnection.entrySet().iterator();
                while (i.hasNext()) {
                    entry = i.next();
                    if (entry.getValue() != this) continue;
                    i.remove();
                }
                Iterator i2 = Distributer.this.m_partitionReplicas.entrySet().iterator();
                ArrayList entriesToRewrite = new ArrayList();
                while (i2.hasNext()) {
                    Map.Entry entry2 = i2.next();
                    for (NodeConnection nc : (NodeConnection[])entry2.getValue()) {
                        if (nc != this) continue;
                        entriesToRewrite.add(Pair.of(entry2.getKey(), entry2.getValue()));
                    }
                }
                for (Pair pair : entriesToRewrite) {
                    Distributer.this.m_partitionReplicas.remove(pair.getFirst());
                    NodeConnection[] survivors = new NodeConnection[((NodeConnection[])pair.getSecond()).length - 1];
                    if (survivors.length == 0) break;
                    int zz = 0;
                    for (int ii = 0; ii < ((NodeConnection[])pair.getSecond()).length; ++ii) {
                        if (((NodeConnection[])pair.getSecond())[ii] == this) continue;
                        survivors[zz++] = ((NodeConnection[])pair.getSecond())[ii];
                    }
                    Distributer.this.m_partitionReplicas.put(pair.getFirst(), survivors);
                }
                Distributer.this.m_connections.remove(this);
                for (ClientStatusListenerExt clientStatusListenerExt : Distributer.this.m_listeners) {
                    clientStatusListenerExt.connectionLost(this.m_connection.getHostnameOrIP(), this.m_connection.getRemotePort(), Distributer.this.m_connections.size(), this.m_closeCause);
                }
                if (Distributer.this.m_useClientAffinity && Distributer.this.m_subscribedConnection == this && !Distributer.this.m_subscriptionRequestPending && !Distributer.this.m_ex.isShutdown()) {
                    Distributer.this.m_ex.schedule(new Runnable(){

                        @Override
                        public void run() {
                            Distributer.this.subscribeToNewNode();
                        }
                    }, (long)new Random().nextInt(RESUBSCRIPTION_DELAY_MS), TimeUnit.MILLISECONDS);
                }
            }
            ClientResponseImpl r = new ClientResponseImpl(-4, new VoltTable[0], "Connection to database host (" + this.m_connection.getHostnameAndIPAndPort() + ") was lost before a response was received");
            for (Map.Entry e : this.m_callbacks.entrySet()) {
                if (this.m_callbacks.remove(e.getKey()) == null) continue;
                CallbackBookeeping callBk = (CallbackBookeeping)e.getValue();
                try {
                    callBk.callback.clientCallback(r);
                }
                catch (Exception ex) {
                    Distributer.this.uncaughtException(callBk.callback, r, ex);
                }
                int remainingToInvoke = this.m_callbacksToInvoke.decrementAndGet();
                assert (remainingToInvoke >= 0);
                Distributer.this.m_rateLimiter.transactionResponseReceived(System.nanoTime(), -1, callBk.ignoreBackpressure);
            }
        }

        @Override
        public Runnable offBackPressure() {
            return new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Distributer distributer = Distributer.this;
                    synchronized (distributer) {
                        for (ClientStatusListenerExt csl : Distributer.this.m_listeners) {
                            csl.backpressure(false);
                        }
                    }
                }
            };
        }

        @Override
        public Runnable onBackPressure() {
            return null;
        }

        @Override
        public QueueMonitor writestreamMonitor() {
            return this;
        }

        @Override
        public boolean queue(int bytes) {
            this.m_queuedBytes += bytes;
            return this.m_queuedBytes > 262144;
        }

        public InetSocketAddress getSocketAddress() {
            return this.m_connection.getRemoteSocketAddress();
        }
    }

    class CallbackBookeeping {
        long timestampNanos;
        final long procedureTimeoutNanos;
        ProcedureCallback callback;
        String name;
        boolean ignoreBackpressure;

        public CallbackBookeeping(long timestampNanos, ProcedureCallback callback, String name, long timeoutNanos, boolean ignoreBackpressure) {
            assert (callback != null);
            this.timestampNanos = timestampNanos;
            this.callback = callback;
            this.name = name;
            this.procedureTimeoutNanos = timeoutNanos;
            this.ignoreBackpressure = ignoreBackpressure;
        }
    }

    class CallExpiration
    implements Runnable {
        CallExpiration() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                ArrayList connections = new ArrayList();
                Distributer distributer = Distributer.this;
                synchronized (distributer) {
                    connections.addAll(Distributer.this.m_connections);
                }
                long nowNanos = System.nanoTime();
                for (NodeConnection c : connections) {
                    long sinceLastResponse = Math.max(1L, nowNanos - c.m_lastResponseTimeNanos);
                    if (c.m_outstandingPing && sinceLastResponse > Distributer.this.m_connectionResponseTimeoutNanos) {
                        c.m_closeCause = ClientStatusListenerExt.DisconnectCause.TIMEOUT;
                        c.m_connection.unregister();
                    }
                    if (!c.m_outstandingPing && sinceLastResponse > Distributer.this.m_connectionResponseTimeoutNanos / 3L) {
                        c.sendPing();
                    }
                    for (Map.Entry e : c.m_callbacks.entrySet()) {
                        boolean isLongOp;
                        long handle = (Long)e.getKey();
                        CallbackBookeeping cb = (CallbackBookeeping)e.getValue();
                        long deltaNanos = Math.max(1L, nowNanos - cb.timestampNanos);
                        if (deltaNanos <= cb.procedureTimeoutNanos || (isLongOp = Distributer.isLongOp(cb.name)) && deltaNanos < TimeUnit.MILLISECONDS.toNanos(1800000L)) continue;
                        c.handleTimedoutCallback(handle, nowNanos);
                    }
                }
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }

    class ProcUpdateCallback
    implements ProcedureCallback {
        ProcUpdateCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void clientCallback(ClientResponse clientResponse) throws Exception {
            if (clientResponse.getStatus() != 1) {
                return;
            }
            try {
                Distributer distributer = Distributer.this;
                synchronized (distributer) {
                    VoltTable[] results = clientResponse.getResults();
                    if (results != null && results.length == 1) {
                        VoltTable vt = results[0];
                        Distributer.this.updateProcedurePartitioning(vt);
                    }
                    Distributer.this.m_fetchedCatalog = true;
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    class SubscribeCallback
    implements ProcedureCallback {
        SubscribeCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void clientCallback(ClientResponse response) throws Exception {
            if (response.getStatusString() != null && response.getStatusString().contains("@Subscribe was not found")) {
                Distributer distributer = Distributer.this;
                synchronized (distributer) {
                    Distributer.this.m_subscriptionRequestPending = false;
                }
                return;
            }
            if (response.getStatus() == -4 && !Distributer.this.m_connections.isEmpty()) {
                Distributer.this.subscribeToNewNode();
                return;
            }
            if (response.getStatus() == -4) {
                return;
            }
            if (response.getStatus() != 1 && !Distributer.this.m_ex.isShutdown()) {
                Distributer.this.m_ex.schedule(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            Distributer.this.subscribeToNewNode();
                        }
                        catch (Throwable t) {
                            t.printStackTrace();
                            Throwables.propagate(t);
                        }
                    }
                }, 2L, TimeUnit.MINUTES);
                return;
            }
            Distributer distributer = Distributer.this;
            synchronized (distributer) {
                Distributer.this.m_subscriptionRequestPending = false;
            }
        }
    }

    class TopoUpdateCallback
    implements ProcedureCallback {
        TopoUpdateCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void clientCallback(ClientResponse clientResponse) throws Exception {
            if (clientResponse.getStatus() != 1) {
                return;
            }
            try {
                Distributer distributer = Distributer.this;
                synchronized (distributer) {
                    VoltTable[] results = clientResponse.getResults();
                    if (results != null && results.length > 1) {
                        Distributer.this.updateAffinityTopology(results);
                    }
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static final class Procedure {
        static final int PARAMETER_NONE = -1;
        private final boolean multiPart;
        private final boolean readOnly;
        private final int partitionParameter;
        private final int partitionParameterType;

        private Procedure(boolean multiPart, boolean readOnly, int partitionParameter, int partitionParameterType) {
            this.multiPart = multiPart;
            this.readOnly = readOnly;
            this.partitionParameter = multiPart ? -1 : partitionParameter;
            this.partitionParameterType = multiPart ? -1 : partitionParameterType;
        }
    }
}

