/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.ClientCnxnSocket;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.common.X509Util;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientCnxnSocketNetty
extends ClientCnxnSocket {
    private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
    ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
    Channel channel;
    CountDownLatch firstConnect;
    ChannelFuture connectFuture;
    Lock connectLock = new ReentrantLock();
    AtomicBoolean disconnected = new AtomicBoolean();
    AtomicBoolean needSasl = new AtomicBoolean();
    Semaphore waitSasl = new Semaphore(0);

    ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
        this.clientConfig = clientConfig;
        this.initProperties();
    }

    @Override
    boolean isConnected() {
        return this.channel != null;
    }

    @Override
    void connect(InetSocketAddress addr) throws IOException {
        this.firstConnect = new CountDownLatch(1);
        ClientBootstrap bootstrap = new ClientBootstrap(this.channelFactory);
        bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
        bootstrap.setOption("soLinger", -1);
        bootstrap.setOption("tcpNoDelay", true);
        this.connectFuture = bootstrap.connect(addr);
        this.connectFuture.addListener(new ChannelFutureListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                ClientCnxnSocketNetty.this.connectLock.lock();
                try {
                    if (!channelFuture.isSuccess() || ClientCnxnSocketNetty.this.connectFuture == null) {
                        LOG.info("future isn't success, cause: {}", channelFuture.getCause());
                        return;
                    }
                    ClientCnxnSocketNetty.this.channel = channelFuture.getChannel();
                    ClientCnxnSocketNetty.this.disconnected.set(false);
                    ClientCnxnSocketNetty.this.initialized = false;
                    ClientCnxnSocketNetty.this.lenBuffer.clear();
                    ClientCnxnSocketNetty.this.incomingBuffer = ClientCnxnSocketNetty.this.lenBuffer;
                    ClientCnxnSocketNetty.this.sendThread.primeConnection();
                    ClientCnxnSocketNetty.this.updateNow();
                    ClientCnxnSocketNetty.this.updateLastSendAndHeard();
                    if (ClientCnxnSocketNetty.this.sendThread.tunnelAuthInProgress()) {
                        ClientCnxnSocketNetty.this.waitSasl.drainPermits();
                        ClientCnxnSocketNetty.this.needSasl.set(true);
                        ClientCnxnSocketNetty.this.sendPrimePacket();
                    } else {
                        ClientCnxnSocketNetty.this.needSasl.set(false);
                    }
                    ClientCnxnSocketNetty.this.wakeupCnxn();
                    ClientCnxnSocketNetty.this.firstConnect.countDown();
                    LOG.info("channel is connected: {}", (Object)channelFuture.getChannel());
                }
                finally {
                    ClientCnxnSocketNetty.this.connectLock.unlock();
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void cleanup() {
        this.connectLock.lock();
        try {
            if (this.connectFuture != null) {
                this.connectFuture.cancel();
                this.connectFuture = null;
            }
            if (this.channel != null) {
                this.channel.close().awaitUninterruptibly();
                this.channel = null;
            }
        }
        finally {
            this.connectLock.unlock();
        }
        Iterator iter = this.outgoingQueue.iterator();
        while (iter.hasNext()) {
            ClientCnxn.Packet p = (ClientCnxn.Packet)iter.next();
            if (p != WakeupPacket.getInstance()) continue;
            iter.remove();
        }
    }

    @Override
    void close() {
        this.channelFactory.releaseExternalResources();
    }

    @Override
    void saslCompleted() {
        this.needSasl.set(false);
        this.waitSasl.release();
    }

    @Override
    void connectionPrimed() {
    }

    @Override
    void packetAdded() {
    }

    @Override
    void onClosing() {
        this.firstConnect.countDown();
        this.wakeupCnxn();
        LOG.info("channel is told closing");
    }

    private void wakeupCnxn() {
        if (this.needSasl.get()) {
            this.waitSasl.release();
        }
        this.outgoingQueue.add(WakeupPacket.getInstance());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void doTransport(int waitTimeOut, List<ClientCnxn.Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {
        try {
            if (!this.firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
                return;
            }
            ClientCnxn.Packet head = null;
            if (this.needSasl.get() ? !this.waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS) : (head = (ClientCnxn.Packet)this.outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
                return;
            }
            if (!this.sendThread.getZkState().isAlive()) {
                this.addBack(head);
                return;
            }
            if (this.disconnected.get()) {
                this.addBack(head);
                throw new ClientCnxn.EndOfStreamException("channel for sessionid 0x" + Long.toHexString(this.sessionId) + " is lost");
            }
            if (head != null) {
                this.doWrite(pendingQueue, head, cnxn);
            }
        }
        finally {
            this.updateNow();
        }
    }

    private void addBack(ClientCnxn.Packet head) {
        if (head != null && head != WakeupPacket.getInstance()) {
            this.outgoingQueue.addFirst(head);
        }
    }

    private void sendPkt(ClientCnxn.Packet p) {
        p.createBB();
        this.updateLastSend();
        ++this.sentCount;
        this.channel.write(ChannelBuffers.wrappedBuffer(p.bb));
    }

    private void sendPrimePacket() {
        this.sendPkt((ClientCnxn.Packet)this.outgoingQueue.remove());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doWrite(List<ClientCnxn.Packet> pendingQueue, ClientCnxn.Packet p, ClientCnxn cnxn) {
        this.updateNow();
        while (true) {
            if (p != WakeupPacket.getInstance()) {
                if (p.requestHeader != null && p.requestHeader.getType() != 11 && p.requestHeader.getType() != 100) {
                    p.requestHeader.setXid(cnxn.getXid());
                    List<ClientCnxn.Packet> list = pendingQueue;
                    synchronized (list) {
                        pendingQueue.add(p);
                    }
                }
                this.sendPkt(p);
            }
            if (this.outgoingQueue.isEmpty()) break;
            p = (ClientCnxn.Packet)this.outgoingQueue.remove();
        }
    }

    @Override
    void sendPacket(ClientCnxn.Packet p) throws IOException {
        if (this.channel == null) {
            throw new IOException("channel has been closed");
        }
        this.sendPkt(p);
    }

    @Override
    SocketAddress getRemoteSocketAddress() {
        Channel copiedChanRef = this.channel;
        return copiedChanRef == null ? null : copiedChanRef.getRemoteAddress();
    }

    @Override
    SocketAddress getLocalSocketAddress() {
        Channel copiedChanRef = this.channel;
        return copiedChanRef == null ? null : copiedChanRef.getLocalAddress();
    }

    @Override
    void testableCloseSocket() throws IOException {
        Channel copiedChanRef = this.channel;
        if (copiedChanRef != null) {
            copiedChanRef.disconnect().awaitUninterruptibly();
        }
    }

    private class ZKClientHandler
    extends SimpleChannelUpstreamHandler {
        AtomicBoolean channelClosed = new AtomicBoolean(false);

        private ZKClientHandler() {
        }

        @Override
        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            LOG.info("channel is disconnected: {}", (Object)ctx.getChannel());
            this.cleanup();
        }

        private void cleanup() {
            if (!this.channelClosed.compareAndSet(false, true)) {
                return;
            }
            ClientCnxnSocketNetty.this.disconnected.set(true);
            ClientCnxnSocketNetty.this.onClosing();
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            ClientCnxnSocketNetty.this.updateNow();
            ChannelBuffer buf = (ChannelBuffer)e.getMessage();
            while (buf.readable()) {
                if (ClientCnxnSocketNetty.this.incomingBuffer.remaining() > buf.readableBytes()) {
                    int newLimit = ClientCnxnSocketNetty.this.incomingBuffer.position() + buf.readableBytes();
                    ClientCnxnSocketNetty.this.incomingBuffer.limit(newLimit);
                }
                buf.readBytes(ClientCnxnSocketNetty.this.incomingBuffer);
                ClientCnxnSocketNetty.this.incomingBuffer.limit(ClientCnxnSocketNetty.this.incomingBuffer.capacity());
                if (ClientCnxnSocketNetty.this.incomingBuffer.hasRemaining()) continue;
                ClientCnxnSocketNetty.this.incomingBuffer.flip();
                if (ClientCnxnSocketNetty.this.incomingBuffer == ClientCnxnSocketNetty.this.lenBuffer) {
                    ++ClientCnxnSocketNetty.this.recvCount;
                    ClientCnxnSocketNetty.this.readLength();
                    continue;
                }
                if (!ClientCnxnSocketNetty.this.initialized) {
                    ClientCnxnSocketNetty.this.readConnectResult();
                    ClientCnxnSocketNetty.this.lenBuffer.clear();
                    ClientCnxnSocketNetty.this.incomingBuffer = ClientCnxnSocketNetty.this.lenBuffer;
                    ClientCnxnSocketNetty.this.initialized = true;
                    ClientCnxnSocketNetty.this.updateLastHeard();
                    continue;
                }
                ClientCnxnSocketNetty.this.sendThread.readResponse(ClientCnxnSocketNetty.this.incomingBuffer);
                ClientCnxnSocketNetty.this.lenBuffer.clear();
                ClientCnxnSocketNetty.this.incomingBuffer = ClientCnxnSocketNetty.this.lenBuffer;
                ClientCnxnSocketNetty.this.updateLastHeard();
            }
            ClientCnxnSocketNetty.this.wakeupCnxn();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            LOG.warn("Exception caught: {}", (Object)e, (Object)e.getCause());
            this.cleanup();
        }
    }

    private class ZKClientPipelineFactory
    implements ChannelPipelineFactory {
        private SSLContext sslContext = null;
        private SSLEngine sslEngine = null;

        private ZKClientPipelineFactory() {
        }

        @Override
        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            if (ClientCnxnSocketNetty.this.clientConfig.getBoolean("zookeeper.client.secure")) {
                this.initSSL(pipeline);
            }
            pipeline.addLast("handler", new ZKClientHandler());
            return pipeline;
        }

        private synchronized void initSSL(ChannelPipeline pipeline) throws X509Exception.SSLContextException {
            if (this.sslContext == null || this.sslEngine == null) {
                this.sslContext = X509Util.createSSLContext(ClientCnxnSocketNetty.this.clientConfig);
                this.sslEngine = this.sslContext.createSSLEngine();
                this.sslEngine.setUseClientMode(true);
            }
            pipeline.addLast("ssl", new SslHandler(this.sslEngine));
            LOG.info("SSL handler added for channel: {}", (Object)pipeline.getChannel());
        }
    }

    private static class WakeupPacket {
        private static final ClientCnxn.Packet instance = new ClientCnxn.Packet(null, null, null, null, null);

        protected WakeupPacket() {
        }

        public static ClientCnxn.Packet getInstance() {
            return instance;
        }
    }
}

