/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.shaded.org.apache.zookeeper.server.quorum;

import io.mantisrx.shaded.org.apache.zookeeper.server.ZooKeeperThread;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.QuorumPeer;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QuorumCnxManager {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumCnxManager.class);
    static final int RECV_CAPACITY = 100;
    static final int SEND_CAPACITY = 1;
    static final int PACKETMAXSIZE = 524288;
    public static final int maxBuffer = 2048;
    private AtomicLong observerCounter = new AtomicLong(-1L);
    private int cnxTO = 5000;
    final long mySid;
    final int socketTimeout;
    final Map<Long, QuorumPeer.QuorumServer> view;
    final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive");
    final boolean listenOnAllIPs;
    private ThreadPoolExecutor connectionExecutor;
    private final Set<Long> inprogressConnections = Collections.synchronizedSet(new HashSet());
    private QuorumAuthServer authServer;
    private QuorumAuthLearner authLearner;
    private boolean quorumSaslAuthEnabled;
    private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
    public final ArrayBlockingQueue<Message> recvQueue;
    private final Object recvQLock = new Object();
    volatile boolean shutdown = false;
    public final Listener listener;
    private AtomicInteger threadCnt = new AtomicInteger(0);

    public QuorumCnxManager(long mySid, Map<Long, QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
        this(mySid, view, authServer, authLearner, socketTimeout, listenOnAllIPs, quorumCnxnThreadsSize, quorumSaslAuthEnabled, new ConcurrentHashMap<Long, SendWorker>());
    }

    public QuorumCnxManager(long mySid, Map<Long, QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled, ConcurrentHashMap<Long, SendWorker> senderWorkerMap) {
        this.senderWorkerMap = senderWorkerMap;
        this.recvQueue = new ArrayBlockingQueue(100);
        this.queueSendMap = new ConcurrentHashMap();
        this.lastMessageSent = new ConcurrentHashMap();
        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
        if (cnxToValue != null) {
            this.cnxTO = Integer.parseInt(cnxToValue);
        }
        this.mySid = mySid;
        this.socketTimeout = socketTimeout;
        this.view = view;
        this.listenOnAllIPs = listenOnAllIPs;
        this.initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, quorumSaslAuthEnabled);
        this.listener = new Listener();
    }

    private void initializeAuth(final long mySid, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
        this.authServer = authServer;
        this.authLearner = authLearner;
        this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
        if (!this.quorumSaslAuthEnabled) {
            LOG.debug("Not initializing connection executor as quorum sasl auth is disabled");
            return;
        }
        final AtomicInteger threadIndex = new AtomicInteger(1);
        SecurityManager s = System.getSecurityManager();
        final ThreadGroup group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        ThreadFactory daemonThFactory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r, "QuorumConnectionThread-[myid=" + mySid + "]-" + threadIndex.getAndIncrement());
                return t;
            }
        };
        this.connectionExecutor = new ThreadPoolExecutor(3, quorumCnxnThreadsSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), daemonThFactory);
        this.connectionExecutor.allowCoreThreadTimeOut(true);
    }

    public void testInitiateConnection(long sid) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Opening channel to server " + sid);
        }
        Socket sock = new Socket();
        this.setSockOpts(sock);
        sock.connect(QuorumPeer.viewToVotingView(this.view).get((Object)Long.valueOf((long)sid)).electionAddr, this.cnxTO);
        this.initiateConnection(sock, sid);
    }

    public void initiateConnection(Socket sock, Long sid) {
        try {
            this.startConnection(sock, sid);
        }
        catch (IOException e2) {
            LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection", (Object)new Object[]{sid, sock.getRemoteSocketAddress()}, (Object)e2);
            this.closeSocket(sock);
            return;
        }
    }

    public void initiateConnectionAsync(Socket sock, Long sid) {
        if (!this.inprogressConnections.add(sid)) {
            LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", (Object)sid);
            this.closeSocket(sock);
            return;
        }
        try {
            this.connectionExecutor.execute(new QuorumConnectionReqThread(sock, sid));
            this.connectionThreadCnt.incrementAndGet();
        }
        catch (Throwable e2) {
            this.inprogressConnections.remove(sid);
            LOG.error("Exception while submitting quorum connection request", e2);
            this.closeSocket(sock);
        }
    }

    private boolean startConnection(Socket sock, Long sid) throws IOException {
        DataOutputStream dout = null;
        DataInputStream din = null;
        try {
            dout = new DataOutputStream(sock.getOutputStream());
            dout.writeLong(this.mySid);
            dout.flush();
            din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
        }
        catch (IOException e2) {
            LOG.warn("Ignoring exception reading or writing challenge: ", e2);
            this.closeSocket(sock);
            return false;
        }
        this.authLearner.authenticate(sock, this.view.get((Object)sid).hostname);
        if (sid <= this.mySid) {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            this.senderWorkerMap.put(sid, sw);
            this.queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(1));
            sw.start();
            rw.start();
            return true;
        }
        LOG.info("Have smaller server identifier, so dropping the connection: (" + sid + ", " + this.mySid + ")");
        this.closeSocket(sock);
        return false;
    }

    public void receiveConnection(Socket sock) {
        DataInputStream din = null;
        try {
            din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
            this.handleConnection(sock, din);
        }
        catch (IOException e2) {
            LOG.error("Exception handling connection, addr: {}, closing server connection", (Object)sock.getRemoteSocketAddress());
            this.closeSocket(sock);
        }
    }

    public void receiveConnectionAsync(Socket sock) {
        try {
            this.connectionExecutor.execute(new QuorumConnectionReceiverThread(sock));
            this.connectionThreadCnt.incrementAndGet();
        }
        catch (Throwable e2) {
            LOG.error("Exception handling connection, addr: {}, closing server connection", (Object)sock.getRemoteSocketAddress());
            this.closeSocket(sock);
        }
    }

    private void handleConnection(Socket sock, DataInputStream din) throws IOException {
        Long sid = null;
        try {
            sid = din.readLong();
            if (sid < 0L) {
                sid = din.readLong();
                int num_remaining_bytes = din.readInt();
                if (num_remaining_bytes < 0 || num_remaining_bytes > 2048) {
                    LOG.error("Unreasonable buffer length: {}", (Object)num_remaining_bytes);
                    this.closeSocket(sock);
                    return;
                }
                byte[] b = new byte[num_remaining_bytes];
                int num_read = din.read(b);
                if (num_read != num_remaining_bytes) {
                    LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
                }
            }
            if (sid == Long.MAX_VALUE) {
                sid = this.observerCounter.getAndDecrement();
                LOG.info("Setting arbitrary identifier to observer: " + sid);
            }
        }
        catch (IOException e2) {
            this.closeSocket(sock);
            LOG.warn("Exception reading or writing challenge: " + e2.toString());
            return;
        }
        LOG.debug("Authenticating learner server.id: {}", (Object)sid);
        this.authServer.authenticate(sock, din);
        if (sid < this.mySid) {
            SendWorker sw = this.senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }
        } else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            this.senderWorkerMap.put(sid, sw);
            this.queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(1));
            sw.start();
            rw.start();
            return;
        }
        LOG.debug("Create new connection to server: " + sid);
        this.closeSocket(sock);
        this.connectOne(sid);
    }

    public void toSend(Long sid, ByteBuffer b) {
        if (this.mySid == sid) {
            b.position(0);
            this.addToRecvQueue(new Message(b.duplicate(), sid));
        } else {
            ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(1);
            ArrayBlockingQueue<ByteBuffer> bqExisting = this.queueSendMap.putIfAbsent(sid, bq);
            if (bqExisting != null) {
                this.addToSendQueue(bqExisting, b);
            } else {
                this.addToSendQueue(bq, b);
            }
            this.connectOne(sid);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public synchronized void connectOne(long sid) {
        if (!this.connectedToPeer(sid)) {
            if (!this.view.containsKey(sid)) {
                LOG.warn("Invalid server id: " + sid);
                return;
            }
            InetSocketAddress electionAddr = this.view.get((Object)Long.valueOf((long)sid)).electionAddr;
            try {
                LOG.debug("Opening channel to server " + sid);
                Socket sock = new Socket();
                this.setSockOpts(sock);
                sock.connect(this.view.get((Object)Long.valueOf((long)sid)).electionAddr, this.cnxTO);
                LOG.debug("Connected to server " + sid);
                if (this.quorumSaslAuthEnabled) {
                    this.initiateConnectionAsync(sock, sid);
                    return;
                }
                this.initiateConnection(sock, sid);
                return;
            }
            catch (UnresolvedAddressException e2) {
                LOG.warn("Cannot open channel to " + sid + " at election address " + electionAddr, e2);
                if (!this.view.containsKey(sid)) throw e2;
                this.view.get(sid).recreateSocketAddresses();
                throw e2;
            }
            catch (IOException e3) {
                LOG.warn("Cannot open channel to " + sid + " at election address " + electionAddr, e3);
                if (!this.view.containsKey(sid)) return;
                this.view.get(sid).recreateSocketAddresses();
                return;
            }
        } else {
            LOG.debug("There is a connection already for server " + sid);
        }
    }

    public void connectAll() {
        Enumeration<Long> en = this.queueSendMap.keys();
        while (en.hasMoreElements()) {
            long sid = en.nextElement();
            this.connectOne(sid);
        }
    }

    boolean haveDelivered() {
        for (ArrayBlockingQueue<ByteBuffer> queue : this.queueSendMap.values()) {
            LOG.debug("Queue size: " + queue.size());
            if (queue.size() != 0) continue;
            return true;
        }
        return false;
    }

    public void halt() {
        this.shutdown = true;
        LOG.debug("Halting listener");
        this.listener.halt();
        this.softHalt();
        if (this.connectionExecutor != null) {
            this.connectionExecutor.shutdown();
        }
        this.inprogressConnections.clear();
        this.resetConnectionThreadCount();
    }

    public void softHalt() {
        for (SendWorker sw : this.senderWorkerMap.values()) {
            LOG.debug("Halting sender: " + sw);
            sw.finish();
        }
    }

    private void setSockOpts(Socket sock) throws SocketException {
        sock.setTcpNoDelay(true);
        sock.setKeepAlive(this.tcpKeepAlive);
        sock.setSoTimeout(this.socketTimeout);
    }

    private void closeSocket(Socket sock) {
        try {
            sock.close();
        }
        catch (IOException ie) {
            LOG.error("Exception while closing", ie);
        }
    }

    public long getThreadCount() {
        return this.threadCnt.get();
    }

    public long getConnectionThreadCount() {
        return this.connectionThreadCnt.get();
    }

    private void resetConnectionThreadCount() {
        this.connectionThreadCnt.set(0);
    }

    private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
        if (queue.remainingCapacity() == 0) {
            try {
                queue.remove();
            }
            catch (NoSuchElementException ne) {
                LOG.debug("Trying to remove from an empty Queue. Ignoring exception " + ne);
            }
        }
        try {
            queue.add(buffer);
        }
        catch (IllegalStateException ie) {
            LOG.error("Unable to insert an element in the queue " + ie);
        }
    }

    private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
        return queue.isEmpty();
    }

    private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addToRecvQueue(Message msg) {
        Object object = this.recvQLock;
        synchronized (object) {
            if (this.recvQueue.remainingCapacity() == 0) {
                try {
                    this.recvQueue.remove();
                }
                catch (NoSuchElementException ne) {
                    LOG.debug("Trying to remove from an empty recvQueue. Ignoring exception " + ne);
                }
            }
            try {
                this.recvQueue.add(msg);
            }
            catch (IllegalStateException ie) {
                LOG.error("Unable to insert element in the recvQueue " + ie);
            }
        }
    }

    public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
        return this.recvQueue.poll(timeout, unit);
    }

    public boolean connectedToPeer(long peerSid) {
        return this.senderWorkerMap.get(peerSid) != null;
    }

    class RecvWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        volatile boolean running;
        final DataInputStream din;
        final SendWorker sw;

        RecvWorker(Socket sock, DataInputStream din, Long sid, SendWorker sw) {
            super("RecvWorker:" + sid);
            this.running = true;
            this.sid = sid;
            this.sock = sock;
            this.sw = sw;
            this.din = din;
            try {
                sock.setSoTimeout(0);
            }
            catch (IOException e2) {
                LOG.error("Error while accessing socket for " + sid, e2);
                QuorumCnxManager.this.closeSocket(sock);
                this.running = false;
            }
        }

        synchronized boolean finish() {
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            this.interrupt();
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                    int length = this.din.readInt();
                    if (length <= 0 || length > 524288) {
                        throw new IOException("Received packet with invalid packet: " + length);
                    }
                    byte[] msgArray = new byte[length];
                    this.din.readFully(msgArray, 0, length);
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    QuorumCnxManager.this.addToRecvQueue(new Message(message.duplicate(), this.sid));
                }
            }
            catch (Exception e2) {
                LOG.warn("Connection broken for id " + this.sid + ", my id = " + QuorumCnxManager.this.mySid + ", error = ", e2);
            }
            finally {
                LOG.warn("Interrupting SendWorker");
                this.sw.finish();
                if (this.sock != null) {
                    QuorumCnxManager.this.closeSocket(this.sock);
                }
            }
        }
    }

    class SendWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        RecvWorker recvWorker;
        volatile boolean running;
        DataOutputStream dout;

        SendWorker(Socket sock, Long sid) {
            super("SendWorker:" + sid);
            this.running = true;
            this.sid = sid;
            this.sock = sock;
            this.recvWorker = null;
            try {
                this.dout = new DataOutputStream(sock.getOutputStream());
            }
            catch (IOException e2) {
                LOG.error("Unable to access socket output stream", e2);
                QuorumCnxManager.this.closeSocket(sock);
                this.running = false;
            }
            LOG.debug("Address of remote peer: " + this.sid);
        }

        synchronized void setRecv(RecvWorker recvWorker) {
            this.recvWorker = recvWorker;
        }

        synchronized RecvWorker getRecvWorker() {
            return this.recvWorker;
        }

        synchronized boolean finish() {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Calling finish for " + this.sid);
            }
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            QuorumCnxManager.this.closeSocket(this.sock);
            this.interrupt();
            if (this.recvWorker != null) {
                this.recvWorker.finish();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Removing entry from senderWorkerMap sid=" + this.sid);
            }
            QuorumCnxManager.this.senderWorkerMap.remove(this.sid, this);
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        synchronized void send(ByteBuffer b) throws IOException {
            byte[] msgBytes = new byte[b.capacity()];
            try {
                b.position(0);
                b.get(msgBytes);
            }
            catch (BufferUnderflowException be) {
                LOG.error("BufferUnderflowException ", be);
                return;
            }
            this.dout.writeInt(b.capacity());
            this.dout.write(b.array());
            this.dout.flush();
        }

        @Override
        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                ByteBuffer b;
                ArrayBlockingQueue<ByteBuffer> bq = QuorumCnxManager.this.queueSendMap.get(this.sid);
                if ((bq == null || QuorumCnxManager.this.isSendQueueEmpty(bq)) && (b = QuorumCnxManager.this.lastMessageSent.get(this.sid)) != null) {
                    LOG.debug("Attempting to send lastMessage to sid=" + this.sid);
                    this.send(b);
                }
            }
            catch (IOException e2) {
                LOG.error("Failed to send last message. Shutting down thread.", e2);
                this.finish();
            }
            block6: while (true) {
                try {
                    while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                        ByteBuffer b = null;
                        try {
                            ArrayBlockingQueue<ByteBuffer> bq = QuorumCnxManager.this.queueSendMap.get(this.sid);
                            if (bq == null) {
                                LOG.error("No queue of incoming messages for server " + this.sid);
                                break block6;
                            }
                            b = QuorumCnxManager.this.pollSendQueue(bq, 1000L, TimeUnit.MILLISECONDS);
                            if (b == null) continue block6;
                            QuorumCnxManager.this.lastMessageSent.put(this.sid, b);
                            this.send(b);
                            continue block6;
                        }
                        catch (InterruptedException e3) {
                            LOG.warn("Interrupted while waiting for message on queue", e3);
                        }
                    }
                    break;
                }
                catch (Exception e4) {
                    LOG.warn("Exception when using channel: for id " + this.sid + " my id = " + QuorumCnxManager.this.mySid + " error = " + e4);
                    break;
                }
            }
            this.finish();
            LOG.warn("Send worker leaving thread");
        }
    }

    public class Listener
    extends ZooKeeperThread {
        volatile ServerSocket ss;

        public Listener() {
            super("ListenerThread");
            this.ss = null;
        }

        @Override
        public void run() {
            int numRetries = 0;
            while (!QuorumCnxManager.this.shutdown && numRetries < 3) {
                try {
                    InetSocketAddress addr;
                    this.ss = new ServerSocket();
                    this.ss.setReuseAddress(true);
                    if (QuorumCnxManager.this.listenOnAllIPs) {
                        int port = QuorumCnxManager.this.view.get((Object)Long.valueOf((long)QuorumCnxManager.this.mySid)).electionAddr.getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = QuorumCnxManager.this.view.get((Object)Long.valueOf((long)QuorumCnxManager.this.mySid)).electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    this.setName(QuorumCnxManager.this.view.get((Object)Long.valueOf((long)QuorumCnxManager.this.mySid)).electionAddr.toString());
                    this.ss.bind(addr);
                    while (!QuorumCnxManager.this.shutdown) {
                        Socket client = this.ss.accept();
                        QuorumCnxManager.this.setSockOpts(client);
                        LOG.info("Received connection request " + client.getRemoteSocketAddress());
                        if (QuorumCnxManager.this.quorumSaslAuthEnabled) {
                            QuorumCnxManager.this.receiveConnectionAsync(client);
                        } else {
                            QuorumCnxManager.this.receiveConnection(client);
                        }
                        numRetries = 0;
                    }
                }
                catch (IOException e2) {
                    LOG.error("Exception while listening", e2);
                    ++numRetries;
                    try {
                        this.ss.close();
                        Thread.sleep(1000L);
                    }
                    catch (IOException ie) {
                        LOG.error("Error closing server socket", ie);
                    }
                    catch (InterruptedException ie) {
                        LOG.error("Interrupted while sleeping. Ignoring exception", ie);
                    }
                }
            }
            LOG.info("Leaving listener");
            if (!QuorumCnxManager.this.shutdown) {
                LOG.error("As I'm leaving the listener thread, I won't be able to participate in leader election any longer: " + QuorumCnxManager.this.view.get((Object)Long.valueOf((long)QuorumCnxManager.this.mySid)).electionAddr);
            }
        }

        void halt() {
            try {
                LOG.debug("Trying to close listener: " + this.ss);
                if (this.ss != null) {
                    LOG.debug("Closing listener: " + QuorumCnxManager.this.mySid);
                    this.ss.close();
                }
            }
            catch (IOException e2) {
                LOG.warn("Exception when shutting down listener: " + e2);
            }
        }
    }

    private class QuorumConnectionReceiverThread
    extends ZooKeeperThread {
        private final Socket sock;

        QuorumConnectionReceiverThread(Socket sock) {
            super("QuorumConnectionReceiverThread-" + sock.getRemoteSocketAddress());
            this.sock = sock;
        }

        @Override
        public void run() {
            QuorumCnxManager.this.receiveConnection(this.sock);
        }
    }

    private class QuorumConnectionReqThread
    extends ZooKeeperThread {
        final Socket sock;
        final Long sid;

        QuorumConnectionReqThread(Socket sock, Long sid) {
            super("QuorumConnectionReqThread-" + sid);
            this.sock = sock;
            this.sid = sid;
        }

        @Override
        public void run() {
            try {
                QuorumCnxManager.this.initiateConnection(this.sock, this.sid);
            }
            finally {
                QuorumCnxManager.this.inprogressConnections.remove(this.sid);
            }
        }
    }

    public static class Message {
        ByteBuffer buffer;
        long sid;

        Message(ByteBuffer buffer, long sid) {
            this.buffer = buffer;
            this.sid = sid;
        }
    }
}

