/*
 * Decompiled with CFR 0.152.
 */
package org.voltdb.client;

import com.google_voltpatches.common.collect.ImmutableMap;
import io.netty.handler.ssl.SslContext;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import org.voltcore.utils.CoreUtils;
import org.voltcore.utils.ssl.SSLConfiguration;
import org.voltdb.ClientResponseImpl;
import org.voltdb.VoltTable;
import org.voltdb.client.AllPartitionProcedureCallback;
import org.voltdb.client.Client;
import org.voltdb.client.ClientAuthScheme;
import org.voltdb.client.ClientConfig;
import org.voltdb.client.ClientFactory;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ClientResponseWithPartitionKey;
import org.voltdb.client.ClientStats;
import org.voltdb.client.ClientStatsContext;
import org.voltdb.client.ClientStatusListenerExt;
import org.voltdb.client.ClientUtils;
import org.voltdb.client.ConnectionUtil;
import org.voltdb.client.Distributer;
import org.voltdb.client.NoConnectionsException;
import org.voltdb.client.NullCallback;
import org.voltdb.client.ProcCallException;
import org.voltdb.client.ProcedureArgumentCacher;
import org.voltdb.client.ProcedureCallback;
import org.voltdb.client.ProcedureInvocation;
import org.voltdb.client.ReconnectStatusListener;
import org.voltdb.client.VoltBulkLoader.BulkLoaderFailureCallBack;
import org.voltdb.client.VoltBulkLoader.BulkLoaderState;
import org.voltdb.client.VoltBulkLoader.BulkLoaderSuccessCallback;
import org.voltdb.client.VoltBulkLoader.VoltBulkLoader;
import org.voltdb.common.Constants;
import org.voltdb.utils.Encoder;

public final class ClientImpl
implements Client {
    static long PARTITION_KEYS_INFO_REFRESH_FREQUENCY = 1000L;
    private final AtomicLong m_handle = new AtomicLong(0L);
    private boolean m_credentialsSet = false;
    private final ReentrantLock m_credentialComparisonLock = new ReentrantLock();
    private String m_createConnectionUsername = null;
    private byte[] m_hashedPassword = null;
    private int m_passwordHashCode = 0;
    final InternalClientStatusListener m_listener = new InternalClientStatusListener();
    ClientStatusListenerExt m_clientStatusListener = null;
    private ScheduledExecutorService m_ex = null;
    private final String m_username;
    private final byte[] m_passwordHash;
    private final ClientAuthScheme m_hashScheme;
    private final SslContext m_sslContext;
    private final CopyOnWriteArrayList<Long> m_blessedThreadIds = new CopyOnWriteArrayList();
    private BulkLoaderState m_vblGlobals = new BulkLoaderState(this);
    private static final ProcedureCallback NULL_CALLBACK = new NullCallback();
    private volatile boolean m_isShutdown = false;
    static final Logger LOG = Logger.getLogger(ClientImpl.class.getName());
    private final Distributer m_distributer;
    private final Object m_backpressureLock = new Object();
    private boolean m_backpressure = false;
    private boolean m_blockingQueue = true;
    private final ReconnectStatusListener m_reconnectStatusListener;

    ClientImpl(ClientConfig config) {
        if (config.m_topologyChangeAware && !config.m_useClientAffinity) {
            throw new IllegalArgumentException("The client affinity must be enabled to enable topology awareness.");
        }
        this.m_sslContext = config.m_enableSSL ? SSLConfiguration.createClientSslContext(config.m_sslConfig) : null;
        this.m_distributer = new Distributer(config.m_heavyweight, config.m_procedureCallTimeoutNanos, config.m_connectionResponseTimeoutMS, config.m_useClientAffinity, config.m_sendReadsToReplicasBytDefaultIfCAEnabled, config.m_subject, this.m_sslContext);
        this.m_distributer.addClientStatusListener(this.m_listener);
        String username = config.m_username;
        if (config.m_subject != null) {
            username = ClientConfig.getUserNameFromSubject(config.m_subject);
        }
        this.m_username = username;
        this.m_distributer.setTopologyChangeAware(config.m_topologyChangeAware);
        if (config.m_topologyChangeAware) {
            this.m_ex = Executors.newSingleThreadScheduledExecutor(CoreUtils.getThreadFactory("Topoaware thread"));
        }
        if (config.m_reconnectOnConnectionLoss) {
            this.m_reconnectStatusListener = new ReconnectStatusListener(this, config.m_initialConnectionRetryIntervalMS, config.m_maxConnectionRetryIntervalMS);
            this.m_distributer.addClientStatusListener(this.m_reconnectStatusListener);
        } else {
            this.m_reconnectStatusListener = null;
        }
        this.m_hashScheme = config.m_hashScheme;
        this.m_passwordHash = config.m_cleartext ? ConnectionUtil.getHashedPassword(this.m_hashScheme, config.m_password) : Encoder.hexDecode(config.m_password);
        if (config.m_listener != null) {
            this.m_distributer.addClientStatusListener(config.m_listener);
            this.m_clientStatusListener = config.m_listener;
        }
        assert (config.m_maxOutstandingTxns > 0);
        this.m_blessedThreadIds.addAll(this.m_distributer.getThreadIds());
        if (config.m_autoTune) {
            this.m_distributer.m_rateLimiter.enableAutoTuning(config.m_autoTuneTargetInternalLatency);
        } else {
            this.m_distributer.m_rateLimiter.setLimits(config.m_maxTransactionsPerSecond, config.m_maxOutstandingTxns);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean verifyCredentialsAreAlwaysTheSame(String username, byte[] hashedPassword) {
        assert (username != null);
        this.m_credentialComparisonLock.lock();
        try {
            if (!this.m_credentialsSet) {
                this.m_credentialsSet = true;
                this.m_createConnectionUsername = username;
                if (hashedPassword != null) {
                    this.m_hashedPassword = Arrays.copyOf(hashedPassword, hashedPassword.length);
                    this.m_passwordHashCode = Arrays.hashCode(hashedPassword);
                }
                boolean bl = true;
                return bl;
            }
            if (!this.m_createConnectionUsername.equals(username)) {
                boolean bl = false;
                return bl;
            }
            if (hashedPassword == null) {
                boolean bl = this.m_hashedPassword == null;
                return bl;
            }
            for (int i = 0; i < hashedPassword.length; ++i) {
                if (hashedPassword[i] == this.m_hashedPassword[i]) continue;
                boolean bl = false;
                return bl;
            }
            boolean bl = true;
            return bl;
        }
        finally {
            this.m_credentialComparisonLock.unlock();
        }
    }

    public String getUsername() {
        return this.m_createConnectionUsername;
    }

    public int getPasswordHashCode() {
        return this.m_passwordHashCode;
    }

    public SslContext getSSLContext() {
        return this.m_sslContext;
    }

    public void createConnectionWithHashedCredentials(String host, int port, String program, byte[] hashedPassword) throws IOException {
        byte[] subPassword;
        if (this.m_isShutdown) {
            throw new IOException("Client instance is shutdown");
        }
        String subProgram = program == null ? "" : program;
        byte[] byArray = subPassword = hashedPassword == null ? ConnectionUtil.getHashedPassword(this.m_hashScheme, "") : hashedPassword;
        if (!this.verifyCredentialsAreAlwaysTheSame(subProgram, subPassword)) {
            throw new IOException("New connection authorization credentials do not match previous credentials for client.");
        }
        this.m_distributer.createConnectionWithHashedCredentials(host, subProgram, subPassword, port, this.m_hashScheme);
    }

    @Override
    public final ClientResponse callProcedure(String procName, Object ... parameters) throws IOException, NoConnectionsException, ProcCallException {
        return this.callProcedureWithClientTimeoutImpl(-1, procName, 0L, TimeUnit.SECONDS, parameters);
    }

    @Override
    public ClientResponse callProcedureWithTimeout(int batchTimeout, String procName, Object ... parameters) throws IOException, NoConnectionsException, ProcCallException {
        return this.callProcedureWithClientTimeout(batchTimeout, procName, 0L, TimeUnit.SECONDS, parameters);
    }

    public ClientResponse callProcedureWithClientTimeout(int batchTimeout, String procName, long clientTimeout, TimeUnit unit, Object ... parameters) throws IOException, ProcCallException {
        return this.callProcedureWithClientTimeoutImpl(batchTimeout, procName, clientTimeout, unit, parameters);
    }

    protected ClientResponse callProcedureWithClientTimeoutImpl(int batchTimeout, String procName, long clientTimeout, TimeUnit unit, Object ... parameters) throws IOException, NoConnectionsException, ProcCallException {
        long handle = this.m_handle.getAndIncrement();
        ProcedureInvocation invocation = new ProcedureInvocation(handle, batchTimeout, -1, procName, parameters);
        long nanos = unit.toNanos(clientTimeout);
        return this.internalSyncCallProcedure(nanos, invocation);
    }

    @Override
    public final boolean callProcedure(ProcedureCallback callback, String procName, Object ... parameters) throws IOException {
        return this.callProcedureWithClientTimeout(callback, -1, procName, 0L, TimeUnit.NANOSECONDS, parameters);
    }

    @Override
    public final boolean callProcedureWithTimeout(ProcedureCallback callback, int batchTimeout, String procName, Object ... parameters) throws IOException {
        return this.callProcedureWithClientTimeout(callback, batchTimeout, -1, procName, 0L, TimeUnit.NANOSECONDS, parameters);
    }

    public boolean callProcedureWithClientTimeout(ProcedureCallback callback, int batchTimeout, String procName, long clientTimeout, TimeUnit clientTimeoutUnit, Object ... parameters) throws IOException {
        return this.callProcedureWithClientTimeout(callback, batchTimeout, -1, procName, clientTimeout, clientTimeoutUnit, parameters);
    }

    public boolean callProcedureWithClientTimeout(ProcedureCallback callback, int batchTimeout, int partitionDestination, String procName, long clientTimeout, TimeUnit clientTimeoutUnit, Object ... parameters) throws IOException {
        if (callback instanceof ProcedureArgumentCacher) {
            ((ProcedureArgumentCacher)((Object)callback)).setArgs(parameters);
        }
        long handle = this.m_handle.getAndIncrement();
        ProcedureInvocation invocation = new ProcedureInvocation(handle, batchTimeout, partitionDestination, procName, parameters);
        if (this.m_isShutdown) {
            return false;
        }
        if (callback == null) {
            callback = NULL_CALLBACK;
        }
        return this.internalAsyncCallProcedure(callback, clientTimeoutUnit.toNanos(clientTimeout), invocation);
    }

    @Override
    @Deprecated
    public int calculateInvocationSerializedSize(String procName, Object ... parameters) {
        ProcedureInvocation invocation = new ProcedureInvocation(0L, procName, parameters);
        return invocation.getSerializedSize();
    }

    @Override
    @Deprecated
    public final boolean callProcedure(ProcedureCallback callback, int expectedSerializedSize, String procName, Object ... parameters) throws IOException {
        return this.callProcedure(callback, procName, parameters);
    }

    private final ClientResponse internalSyncCallProcedure(long clientTimeoutNanos, ProcedureInvocation invocation) throws ProcCallException, IOException {
        if (this.m_isShutdown) {
            throw new NoConnectionsException("Client instance is shutdown");
        }
        if (this.m_blessedThreadIds.contains(Thread.currentThread().getId())) {
            throw new IOException("Can't invoke a procedure synchronously from with the client callback thread  without deadlocking the client library");
        }
        SyncCallbackLight cb = new SyncCallbackLight();
        boolean success = this.internalAsyncCallProcedure(cb, clientTimeoutNanos, invocation);
        if (!success) {
            ClientResponseImpl r = new ClientResponseImpl(-2, -128, "", new VoltTable[0], String.format("Unable to queue client request.", new Object[0]));
            throw new ProcCallException(r, "Unable to queue client request.", null);
        }
        try {
            cb.waitForResponse();
        }
        catch (InterruptedException e) {
            throw new InterruptedIOException("Interrupted while waiting for response");
        }
        if (cb.getResponse().getStatus() != 1) {
            throw new ProcCallException(cb.getResponse(), cb.getResponse().getStatusString(), null);
        }
        return cb.getResponse();
    }

    private final boolean internalAsyncCallProcedure(ProcedureCallback callback, long clientTimeoutNanos, ProcedureInvocation invocation) throws IOException {
        assert (!this.m_isShutdown);
        assert (callback != null);
        long nowNanos = System.nanoTime();
        boolean isBlessed = this.m_blessedThreadIds.contains(Thread.currentThread().getId());
        while (!this.m_distributer.queue(invocation, callback, isBlessed, nowNanos, clientTimeoutNanos)) {
            if (!this.m_blockingQueue) {
                return false;
            }
            long delta = Math.max(1L, System.nanoTime() - nowNanos);
            long timeout = clientTimeoutNanos == 0L ? this.m_distributer.getProcedureTimeoutNanos() : clientTimeoutNanos;
            try {
                if (!this.backpressureBarrier(nowNanos, timeout - delta)) continue;
                ClientResponseImpl response = new ClientResponseImpl(-6, -128, "", new VoltTable[0], String.format("No response received in the allotted time (set to %d ms).", TimeUnit.NANOSECONDS.toMillis(clientTimeoutNanos)));
                try {
                    callback.clientCallback(response);
                }
                catch (Throwable thrown) {
                    this.m_distributer.uncaughtException(callback, response, thrown);
                }
            }
            catch (InterruptedException e) {
                throw new InterruptedIOException("Interrupted while invoking procedure asynchronously");
            }
        }
        return true;
    }

    private Object[] getUpdateCatalogParams(File catalogPath, File deploymentPath) throws IOException {
        Object[] params = new Object[]{catalogPath != null ? (Object)ClientUtils.fileToBytes(catalogPath) : null, deploymentPath != null ? new String(ClientUtils.fileToBytes(deploymentPath), Constants.UTF8ENCODING) : null};
        return params;
    }

    @Override
    public ClientResponse updateApplicationCatalog(File catalogPath, File deploymentPath) throws IOException, ProcCallException {
        Object[] params = this.getUpdateCatalogParams(catalogPath, deploymentPath);
        return this.callProcedure("@UpdateApplicationCatalog", params);
    }

    @Override
    public boolean updateApplicationCatalog(ProcedureCallback callback, File catalogPath, File deploymentPath) throws IOException {
        Object[] params = this.getUpdateCatalogParams(catalogPath, deploymentPath);
        return this.callProcedure(callback, "@UpdateApplicationCatalog", params);
    }

    @Override
    public ClientResponse updateClasses(File jarPath, String classesToDelete) throws IOException, ProcCallException {
        byte[] jarbytes = null;
        if (jarPath != null) {
            jarbytes = ClientUtils.fileToBytes(jarPath);
        }
        return this.callProcedure("@UpdateClasses", jarbytes, classesToDelete);
    }

    @Override
    public boolean updateClasses(ProcedureCallback callback, File jarPath, String classesToDelete) throws IOException {
        byte[] jarbytes = null;
        if (jarPath != null) {
            jarbytes = ClientUtils.fileToBytes(jarPath);
        }
        return this.callProcedure(callback, "@UpdateClasses", jarbytes, classesToDelete);
    }

    @Override
    public void drain() throws InterruptedException {
        if (this.m_isShutdown) {
            return;
        }
        if (this.m_blessedThreadIds.contains(Thread.currentThread().getId())) {
            throw new RuntimeException("Can't invoke backpressureBarrier from within the client callback thread  without deadlocking the client library");
        }
        this.m_distributer.drain();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws InterruptedException {
        if (this.m_blessedThreadIds.contains(Thread.currentThread().getId())) {
            throw new RuntimeException("Can't invoke backpressureBarrier from within the client callback thread  without deadlocking the client library");
        }
        this.m_isShutdown = true;
        Object object = this.m_backpressureLock;
        synchronized (object) {
            this.m_backpressureLock.notifyAll();
        }
        if (this.m_reconnectStatusListener != null) {
            this.m_distributer.removeClientStatusListener(this.m_reconnectStatusListener);
            this.m_reconnectStatusListener.close();
        }
        if (this.m_ex != null) {
            if (CoreUtils.isJunitTest()) {
                this.m_ex.shutdownNow();
            } else {
                this.m_ex.shutdown();
                this.m_ex.awaitTermination(365L, TimeUnit.DAYS);
            }
        }
        this.m_distributer.shutdown();
        ClientFactory.decreaseClientNum();
    }

    @Override
    public void backpressureBarrier() throws InterruptedException {
        this.backpressureBarrier(0L, 0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean backpressureBarrier(long start, long timeoutNanos) throws InterruptedException {
        if (this.m_isShutdown) {
            return false;
        }
        if (this.m_blessedThreadIds.contains(Thread.currentThread().getId())) {
            throw new RuntimeException("Can't invoke backpressureBarrier from within the client callback thread  without deadlocking the client library");
        }
        if (this.m_backpressure) {
            Object object = this.m_backpressureLock;
            synchronized (object) {
                if (this.m_backpressure) {
                    while (this.m_backpressure && !this.m_isShutdown) {
                        if (start != 0L) {
                            if (timeoutNanos <= 0L) {
                                return true;
                            }
                            this.m_backpressureLock.wait(timeoutNanos / TimeUnit.MILLISECONDS.toNanos(1L), (int)(timeoutNanos % TimeUnit.MILLISECONDS.toNanos(1L)));
                            if (!this.m_backpressure) break;
                            long nowNanos = System.nanoTime();
                            long deltaNanos = Math.max(1L, nowNanos - start);
                            if (deltaNanos >= timeoutNanos) {
                                return true;
                            }
                            timeoutNanos -= deltaNanos;
                            continue;
                        }
                        this.m_backpressureLock.wait();
                    }
                }
            }
        }
        return false;
    }

    @Override
    public void configureBlocking(boolean blocking) {
        this.m_blockingQueue = blocking;
    }

    @Override
    public ClientStatsContext createStatsContext() {
        return this.m_distributer.createStatsContext();
    }

    @Override
    public Object[] getInstanceId() {
        return this.m_distributer.getInstanceId();
    }

    public void resetInstanceId() {
        this.m_distributer.resetInstanceId();
    }

    @Override
    public String getBuildString() {
        return this.m_distributer.getBuildString();
    }

    @Override
    public boolean blocking() {
        return this.m_blockingQueue;
    }

    private static String getHostnameFromHostnameColonPort(String server) {
        String[] parts = (server = server.trim()).split(":");
        if (parts.length == 1) {
            return server;
        }
        assert (parts.length == 2);
        return parts[0].trim();
    }

    private static int getPortFromHostnameColonPort(String server, int defaultPort) {
        String[] parts = server.split(":");
        if (parts.length == 1) {
            return defaultPort;
        }
        assert (parts.length == 2);
        return Integer.parseInt(parts[1]);
    }

    @Override
    public void createConnection(String host) throws IOException {
        if (this.m_username == null) {
            throw new IllegalStateException("Attempted to use createConnection(String host) with a client that wasn't constructed with a username and password specified");
        }
        int port = ClientImpl.getPortFromHostnameColonPort(host, 21212);
        host = ClientImpl.getHostnameFromHostnameColonPort(host);
        this.createConnectionWithHashedCredentials(host, port, this.m_username, this.m_passwordHash);
    }

    @Override
    public void createConnection(String host, int port) throws IOException {
        if (this.m_username == null) {
            throw new IllegalStateException("Attempted to use createConnection(String host) with a client that wasn't constructed with a username and password specified");
        }
        this.createConnectionWithHashedCredentials(host, port, this.m_username, this.m_passwordHash);
    }

    @Override
    public List<InetSocketAddress> getConnectedHostList() {
        return this.m_distributer.getConnectedHostList();
    }

    @Override
    public int[] getThroughputAndOutstandingTxnLimits() {
        return this.m_distributer.m_rateLimiter.getLimits();
    }

    @Override
    public void writeSummaryCSV(ClientStats stats, String path) throws IOException {
        this.writeSummaryCSV(null, stats, path);
    }

    @Override
    public void writeSummaryCSV(String statsRowName, ClientStats stats, String path) throws IOException {
        if (path == null || path.length() == 0) {
            return;
        }
        FileWriter fw = new FileWriter(path, true);
        if (statsRowName != null && !statsRowName.isEmpty()) {
            fw.append(statsRowName).append(",");
        }
        fw.append(String.format("%d,%d,%d,%.2f,%.2f,%.2f,%.2f,%.2f,%.2f,%.2f,%d,%d,%d\n", stats.getStartTimestamp(), stats.getDuration(), stats.getInvocationsCompleted(), stats.kPercentileLatencyAsDouble(0.0), stats.kPercentileLatencyAsDouble(1.0), stats.kPercentileLatencyAsDouble(0.95), stats.kPercentileLatencyAsDouble(0.99), stats.kPercentileLatencyAsDouble(0.999), stats.kPercentileLatencyAsDouble(0.9999), stats.kPercentileLatencyAsDouble(0.99999), stats.getInvocationErrors(), stats.getInvocationAborts(), stats.getInvocationTimeouts()));
        fw.close();
    }

    public boolean isHashinatorInitialized() {
        return this.m_distributer.isHashinatorInitialized();
    }

    public long getPartitionForParameter(byte typeValue, Object value) {
        return this.m_distributer.getPartitionForParameter(typeValue, value);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VoltBulkLoader getNewBulkLoader(String tableName, int maxBatchSize, boolean upsertMode, BulkLoaderFailureCallBack failureCallback) throws Exception {
        BulkLoaderState bulkLoaderState = this.m_vblGlobals;
        synchronized (bulkLoaderState) {
            return new VoltBulkLoader(this.m_vblGlobals, tableName, maxBatchSize, upsertMode, failureCallback, null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VoltBulkLoader getNewBulkLoader(String tableName, int maxBatchSize, BulkLoaderFailureCallBack failureCallback) throws Exception {
        BulkLoaderState bulkLoaderState = this.m_vblGlobals;
        synchronized (bulkLoaderState) {
            return new VoltBulkLoader(this.m_vblGlobals, tableName, maxBatchSize, failureCallback);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VoltBulkLoader getNewBulkLoader(String tableName, int maxBatchSize, boolean upsertMode, BulkLoaderFailureCallBack failureCallback, BulkLoaderSuccessCallback successCallback) throws Exception {
        BulkLoaderState bulkLoaderState = this.m_vblGlobals;
        synchronized (bulkLoaderState) {
            return new VoltBulkLoader(this.m_vblGlobals, tableName, maxBatchSize, upsertMode, failureCallback, successCallback);
        }
    }

    @Override
    public boolean isAutoReconnectEnabled() {
        return this.m_reconnectStatusListener != null;
    }

    @Override
    public ClientResponseWithPartitionKey[] callAllPartitionProcedure(String procedureName, Object ... params) throws IOException, ProcCallException {
        CountDownLatch latch = new CountDownLatch(1);
        SyncAllPartitionProcedureCallback callBack = new SyncAllPartitionProcedureCallback(latch);
        this.callAllPartitionProcedure(callBack, procedureName, params);
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            throw new InterruptedIOException("Interrupted while waiting for response");
        }
        return callBack.getResponse();
    }

    @Override
    public boolean callAllPartitionProcedure(AllPartitionProcedureCallback callback, String procedureName, Object ... params) throws IOException, ProcCallException {
        if (callback == null) {
            throw new IllegalArgumentException("AllPartitionProcedureCallback can not be null");
        }
        Object[] args = new Object[params.length + 1];
        System.arraycopy(params, 0, args, 1, params.length);
        ImmutableMap<Integer, Integer> partitionMap = this.m_distributer.getPartitionKeys();
        int partitionCount = partitionMap.size();
        AtomicInteger counter = new AtomicInteger(partitionCount);
        assert (partitionCount > 0);
        ClientResponseWithPartitionKey[] responses = new ClientResponseWithPartitionKey[partitionCount];
        for (Map.Entry entry : partitionMap.entrySet()) {
            args[0] = entry.getValue();
            OnePartitionProcedureCallback cb = new OnePartitionProcedureCallback(counter, args[0], --partitionCount, responses, callback);
            try {
                if (this.callProcedureWithClientTimeout((ProcedureCallback)cb, -1, (Integer)entry.getKey(), procedureName, 0L, TimeUnit.NANOSECONDS, args)) continue;
                ClientResponseImpl r = new ClientResponseImpl(-2, new VoltTable[0], "The procedure is not queued for execution.");
                throw new ProcCallException(r, null, null);
            }
            catch (Exception ex) {
                try {
                    cb.exceptionCallback(ex);
                }
                catch (Exception e) {
                    throw new IOException(e);
                }
            }
        }
        return true;
    }

    private class SyncAllPartitionProcedureCallback
    implements AllPartitionProcedureCallback {
        ClientResponseWithPartitionKey[] m_responses;
        final CountDownLatch m_latch;

        SyncAllPartitionProcedureCallback(CountDownLatch latch) {
            this.m_latch = latch;
        }

        @Override
        public void clientCallback(ClientResponseWithPartitionKey[] clientResponse) throws Exception {
            this.m_responses = clientResponse;
            this.m_latch.countDown();
        }

        public ClientResponseWithPartitionKey[] getResponse() {
            return this.m_responses;
        }
    }

    class OnePartitionProcedureCallback
    implements ProcedureCallback {
        final ClientResponseWithPartitionKey[] m_responses;
        final int m_index;
        final Object m_partitionKey;
        final AtomicInteger m_partitionCounter;
        final AllPartitionProcedureCallback m_cb;

        public OnePartitionProcedureCallback(AtomicInteger counter, Object partitionKey, int index, ClientResponseWithPartitionKey[] responses, AllPartitionProcedureCallback cb) {
            this.m_partitionCounter = counter;
            this.m_partitionKey = partitionKey;
            this.m_index = index;
            this.m_responses = responses;
            this.m_cb = cb;
        }

        @Override
        public void clientCallback(ClientResponse response) throws Exception {
            this.m_responses[this.m_index] = new ClientResponseWithPartitionKey(this.m_partitionKey, response);
            if (this.m_partitionCounter.decrementAndGet() == 0) {
                this.m_cb.clientCallback(this.m_responses);
            }
        }

        public void exceptionCallback(Exception e) throws Exception {
            if (e instanceof ProcCallException) {
                ProcCallException pe = (ProcCallException)e;
                this.m_responses[this.m_index] = new ClientResponseWithPartitionKey(this.m_partitionKey, pe.getClientResponse());
            } else {
                byte status = -2;
                if (e instanceof NoConnectionsException) {
                    status = -4;
                }
                ClientResponseImpl r = new ClientResponseImpl(status, new VoltTable[0], e.getMessage());
                this.m_responses[this.m_index] = new ClientResponseWithPartitionKey(this.m_partitionKey, r);
            }
            if (this.m_partitionCounter.decrementAndGet() == 0) {
                this.m_cb.clientCallback(this.m_responses);
            }
        }
    }

    private final class SyncCallbackLight
    implements ProcedureCallback {
        private final Semaphore m_lock = new Semaphore(1);
        private ClientResponse m_response = null;

        public SyncCallbackLight() {
            this.m_lock.acquireUninterruptibly();
        }

        @Override
        public void clientCallback(ClientResponse clientResponse) {
            this.m_response = clientResponse;
            this.m_lock.release();
        }

        public ClientResponse getResponse() {
            return this.m_response;
        }

        public void waitForResponse() throws InterruptedException {
            this.m_lock.acquire();
            this.m_lock.release();
        }
    }

    class CreateConnectionTask
    implements Runnable {
        final InternalClientStatusListener listener;
        final AtomicInteger connectionTaskCount;

        public CreateConnectionTask(InternalClientStatusListener listener, AtomicInteger connectionTaskCount) {
            this.listener = listener;
            this.connectionTaskCount = connectionTaskCount;
            connectionTaskCount.incrementAndGet();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            int failCount = 0;
            try {
                ClientResponse resp = ClientImpl.this.callProcedure("@SystemInformation", "OVERVIEW");
                if (resp.getStatus() == 1) {
                    Map<Integer, HostConfig> hosts = this.listener.buildUnconnectedHostConfigMap(resp.getResults()[0]);
                    for (Map.Entry<Integer, HostConfig> entry : hosts.entrySet()) {
                        HostConfig config = entry.getValue();
                        try {
                            ClientImpl.this.createConnection(config.m_ipAddress, config.getPort(this.listener.m_useAdminPort));
                            this.listener.nofifyClientConnectionCreation(config, ClientStatusListenerExt.AutoConnectionStatus.SUCCESS);
                        }
                        catch (Exception e) {
                            this.listener.nofifyClientConnectionCreation(config, ClientStatusListenerExt.AutoConnectionStatus.UNABLE_TO_CONNECT);
                            ++failCount;
                        }
                    }
                } else {
                    this.listener.nofifyClientConnectionCreation(null, ClientStatusListenerExt.AutoConnectionStatus.UNABLE_TO_QUERY_TOPOLOGY);
                    ++failCount;
                }
            }
            catch (Exception e) {
                this.listener.nofifyClientConnectionCreation(null, ClientStatusListenerExt.AutoConnectionStatus.UNABLE_TO_QUERY_TOPOLOGY);
                ++failCount;
            }
            finally {
                this.connectionTaskCount.decrementAndGet();
                this.listener.retryConnectionCreationIfNeeded(failCount);
            }
        }
    }

    class InternalClientStatusListener
    extends ClientStatusListenerExt {
        boolean m_useAdminPort = false;
        boolean m_adminPortChecked = false;
        AtomicInteger connectionTaskCount = new AtomicInteger(0);

        InternalClientStatusListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void backpressure(boolean status) {
            Object object = ClientImpl.this.m_backpressureLock;
            synchronized (object) {
                if (status) {
                    ClientImpl.this.m_backpressure = true;
                } else {
                    ClientImpl.this.m_backpressure = false;
                    ClientImpl.this.m_backpressureLock.notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void connectionLost(String hostname, int port, int connectionsLeft, ClientStatusListenerExt.DisconnectCause cause) {
            if (connectionsLeft == 0) {
                Object object = ClientImpl.this.m_backpressureLock;
                synchronized (object) {
                    ClientImpl.this.m_backpressure = false;
                    ClientImpl.this.m_backpressureLock.notifyAll();
                }
            }
        }

        Map<Integer, HostConfig> buildUnconnectedHostConfigMap(VoltTable vt) {
            HashMap<Integer, HostConfig> unconnectedMap = new HashMap<Integer, HostConfig>();
            HashMap<Integer, HostConfig> connectedMap = new HashMap<Integer, HostConfig>();
            while (vt.advanceRow()) {
                Integer hid = (int)vt.getLong("HOST_ID");
                HostConfig config = null;
                if (!ClientImpl.this.m_distributer.isHostConnected(hid)) {
                    config = (HostConfig)unconnectedMap.get(hid);
                    if (config == null) {
                        config = new HostConfig();
                        unconnectedMap.put(hid, config);
                    }
                } else if (!this.m_adminPortChecked && (config = (HostConfig)connectedMap.get(hid)) == null) {
                    config = new HostConfig();
                    connectedMap.put(hid, config);
                }
                if (config == null) continue;
                config.setValue(vt.getString("KEY"), vt.getString("VALUE"));
            }
            if (!this.m_adminPortChecked) {
                Map<String, Integer> connectedIpPortPairs = ClientImpl.this.m_distributer.getConnectedHostIPAndPort();
                int admintPortCount = 0;
                for (HostConfig config : connectedMap.values()) {
                    Integer connectedPort = connectedIpPortPairs.get(config.m_ipAddress);
                    if (connectedPort == null || config.m_adminPort != connectedPort) continue;
                    ++admintPortCount;
                }
                this.m_useAdminPort = admintPortCount == connectedMap.values().size();
            }
            this.m_adminPortChecked = true;
            return unconnectedMap;
        }

        void nofifyClientConnectionCreation(HostConfig host, ClientStatusListenerExt.AutoConnectionStatus status) {
            if (ClientImpl.this.m_clientStatusListener != null) {
                ClientImpl.this.m_clientStatusListener.connectionCreated(host != null ? host.m_hostName : "", host != null ? host.m_clientPort : -1, status);
            }
        }

        void retryConnectionCreationIfNeeded(int failCount) {
            if (failCount == 0) {
                try {
                    ClientImpl.this.m_distributer.setCreateConnectionsUponTopologyChangeComplete();
                }
                catch (Exception e) {
                    this.nofifyClientConnectionCreation(null, ClientStatusListenerExt.AutoConnectionStatus.UNABLE_TO_CONNECT);
                }
            } else if (this.connectionTaskCount.get() < 2) {
                ClientImpl.this.m_ex.schedule(new CreateConnectionTask(this, this.connectionTaskCount), 10L, TimeUnit.SECONDS);
            }
        }

        public void createConnectionsUponTopologyChange() {
            ClientImpl.this.m_ex.execute(new CreateConnectionTask(this, this.connectionTaskCount));
        }
    }

    class HostConfig {
        String m_ipAddress;
        String m_hostName;
        int m_clientPort;
        int m_adminPort;

        HostConfig() {
        }

        void setValue(String param, String value) {
            if ("IPADDRESS".equalsIgnoreCase(param)) {
                this.m_ipAddress = value;
            } else if ("HOSTNAME".equalsIgnoreCase(param)) {
                this.m_hostName = value;
            } else if ("CLIENTPORT".equalsIgnoreCase(param)) {
                this.m_clientPort = Integer.parseInt(value);
            } else if ("ADMINPORT".equalsIgnoreCase(param)) {
                this.m_adminPort = Integer.parseInt(value);
            }
        }

        int getPort(boolean isAdmin) {
            return isAdmin ? this.m_adminPort : this.m_clientPort;
        }
    }
}

