/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.common.X509Util;
import org.apache.zookeeper.server.NettyServerCnxn;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServerCnxnFactory
extends ServerCnxnFactory {
    private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class);
    ServerBootstrap bootstrap;
    Channel parentChannel;
    ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
    HashMap<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap();
    InetSocketAddress localAddress;
    int maxClientCnxns = 60;
    CnxnChannelHandler channelHandler = new CnxnChannelHandler();
    boolean killed;

    NettyServerCnxnFactory() {
        this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
        this.bootstrap.setOption("reuseAddress", true);
        this.bootstrap.setOption("child.tcpNoDelay", true);
        this.bootstrap.setOption("child.soLinger", -1);
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory(){

            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline p = Channels.pipeline();
                if (NettyServerCnxnFactory.this.secure) {
                    NettyServerCnxnFactory.this.initSSL(p);
                }
                p.addLast("servercnxnfactory", NettyServerCnxnFactory.this.channelHandler);
                return p;
            }
        });
    }

    private synchronized void initSSL(ChannelPipeline p) throws X509Exception, KeyManagementException, NoSuchAlgorithmException {
        SSLContext sslContext;
        String authProviderProp = System.getProperty("zookeeper.ssl.authProvider");
        if (authProviderProp == null) {
            sslContext = X509Util.createSSLContext();
        } else {
            sslContext = SSLContext.getInstance("TLSv1");
            X509AuthenticationProvider authProvider = (X509AuthenticationProvider)ProviderRegistry.getProvider(System.getProperty("zookeeper.ssl.authProvider", "x509"));
            if (authProvider == null) {
                LOG.error("Auth provider not found: {}", (Object)authProviderProp);
                throw new X509Exception.SSLContextException("Could not create SSLContext with specified auth provider: " + authProviderProp);
            }
            sslContext.init(new X509KeyManager[]{authProvider.getKeyManager()}, new X509TrustManager[]{authProvider.getTrustManager()}, null);
        }
        SSLEngine sslEngine = sslContext.createSSLEngine();
        sslEngine.setUseClientMode(false);
        sslEngine.setNeedClientAuth(true);
        p.addLast("ssl", new SslHandler(sslEngine));
        LOG.info("SSL handler added for channel: {}", (Object)p.getChannel());
    }

    @Override
    public void closeAll() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("closeAll()");
        }
        int length = this.cnxns.size();
        for (ServerCnxn cnxn : this.cnxns) {
            try {
                cnxn.close();
            }
            catch (Exception e) {
                LOG.warn("Ignoring exception closing cnxn sessionid 0x" + Long.toHexString(cnxn.getSessionId()), e);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("allChannels size:" + this.allChannels.size() + " cnxns size:" + length);
        }
    }

    @Override
    public boolean closeSession(long sessionId) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("closeSession sessionid:0x" + sessionId);
        }
        for (ServerCnxn cnxn : this.cnxns) {
            if (cnxn.getSessionId() != sessionId) continue;
            try {
                cnxn.close();
            }
            catch (Exception e) {
                LOG.warn("exception during session close", e);
            }
            return true;
        }
        return false;
    }

    @Override
    public void configure(InetSocketAddress addr, int maxClientCnxns, boolean secure) throws IOException {
        this.configureSaslLogin();
        this.localAddress = addr;
        this.maxClientCnxns = maxClientCnxns;
        this.secure = secure;
    }

    @Override
    public int getMaxClientCnxnsPerHost() {
        return this.maxClientCnxns;
    }

    @Override
    public void setMaxClientCnxnsPerHost(int max) {
        this.maxClientCnxns = max;
    }

    @Override
    public int getLocalPort() {
        return this.localAddress.getPort();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void join() throws InterruptedException {
        NettyServerCnxnFactory nettyServerCnxnFactory = this;
        synchronized (nettyServerCnxnFactory) {
            while (!this.killed) {
                this.wait();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        LOG.info("shutdown called " + this.localAddress);
        if (this.login != null) {
            this.login.shutdown();
        }
        if (this.parentChannel != null) {
            this.parentChannel.close().awaitUninterruptibly();
            this.closeAll();
            this.allChannels.close().awaitUninterruptibly();
            this.bootstrap.releaseExternalResources();
        }
        if (this.zkServer != null) {
            this.zkServer.shutdown();
        }
        NettyServerCnxnFactory nettyServerCnxnFactory = this;
        synchronized (nettyServerCnxnFactory) {
            this.killed = true;
            this.notifyAll();
        }
    }

    @Override
    public void start() {
        LOG.info("binding to port " + this.localAddress);
        this.parentChannel = this.bootstrap.bind(this.localAddress);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reconfigure(InetSocketAddress addr) {
        try (Channel oldChannel = this.parentChannel;){
            LOG.info("binding to port {}", (Object)addr);
            this.parentChannel = this.bootstrap.bind(addr);
            this.localAddress = addr;
        }
    }

    @Override
    public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException {
        this.start();
        this.setZooKeeperServer(zks);
        if (startServer) {
            zks.startdata();
            zks.startup();
        }
    }

    @Override
    public Iterable<ServerCnxn> getConnections() {
        return this.cnxns;
    }

    @Override
    public InetSocketAddress getLocalAddress() {
        return this.localAddress;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addCnxn(NettyServerCnxn cnxn) {
        this.cnxns.add(cnxn);
        HashMap<InetAddress, Set<NettyServerCnxn>> hashMap = this.ipMap;
        synchronized (hashMap) {
            InetAddress addr = ((InetSocketAddress)cnxn.channel.getRemoteAddress()).getAddress();
            Set<NettyServerCnxn> s = this.ipMap.get(addr);
            if (s == null) {
                s = new HashSet<NettyServerCnxn>();
            }
            s.add(cnxn);
            this.ipMap.put(addr, s);
        }
    }

    @Override
    public void resetAllConnectionStats() {
        for (ServerCnxn c : this.cnxns) {
            c.resetStats();
        }
    }

    @Override
    public Iterable<Map<String, Object>> getAllConnectionInfo(boolean brief) {
        HashSet<Map<String, Object>> info = new HashSet<Map<String, Object>>();
        for (ServerCnxn c : this.cnxns) {
            info.add(c.getConnectionInfo(brief));
        }
        return info;
    }

    @ChannelHandler.Sharable
    class CnxnChannelHandler
    extends SimpleChannelHandler {
        CnxnChannelHandler() {
        }

        @Override
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Channel closed " + e);
            }
            NettyServerCnxnFactory.this.allChannels.remove(ctx.getChannel());
        }

        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Channel connected " + e);
            }
            NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(), NettyServerCnxnFactory.this.zkServer, NettyServerCnxnFactory.this);
            ctx.setAttachment(cnxn);
            if (NettyServerCnxnFactory.this.secure) {
                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
                ChannelFuture handshakeFuture = sslHandler.handshake();
                handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
            } else {
                NettyServerCnxnFactory.this.allChannels.add(ctx.getChannel());
                NettyServerCnxnFactory.this.addCnxn(cnxn);
            }
        }

        @Override
        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            NettyServerCnxn cnxn;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Channel disconnected " + e);
            }
            if ((cnxn = (NettyServerCnxn)ctx.getAttachment()) != null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Channel disconnect caused close " + e);
                }
                cnxn.close();
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            LOG.warn("Exception caught " + e, e.getCause());
            NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
            if (cnxn != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Closing " + cnxn);
                }
                cnxn.close();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            if (LOG.isTraceEnabled()) {
                LOG.trace("message received called " + e.getMessage());
            }
            try {
                NettyServerCnxn cnxn;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("New message " + e.toString() + " from " + ctx.getChannel());
                }
                NettyServerCnxn nettyServerCnxn = cnxn = (NettyServerCnxn)ctx.getAttachment();
                synchronized (nettyServerCnxn) {
                    this.processMessage(e, cnxn);
                }
            }
            catch (Exception ex) {
                LOG.error("Unexpected exception in receive", ex);
                throw ex;
            }
        }

        private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: " + cnxn.queuedBuffer);
            }
            if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
                LOG.debug("Received ResumeMessageEvent");
                if (cnxn.queuedBuffer != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("processing queue " + Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump(cnxn.queuedBuffer));
                    }
                    cnxn.receiveMessage(cnxn.queuedBuffer);
                    if (!cnxn.queuedBuffer.readable()) {
                        LOG.debug("Processed queue - no bytes remaining");
                        cnxn.queuedBuffer = null;
                    } else {
                        LOG.debug("Processed queue - bytes remaining");
                    }
                } else {
                    LOG.debug("queue empty");
                }
                cnxn.channel.setReadable(true);
            } else {
                ChannelBuffer buf = (ChannelBuffer)e.getMessage();
                if (LOG.isTraceEnabled()) {
                    LOG.trace(Long.toHexString(cnxn.sessionId) + " buf 0x" + ChannelBuffers.hexDump(buf));
                }
                if (cnxn.throttled) {
                    LOG.debug("Received message while throttled");
                    if (cnxn.queuedBuffer == null) {
                        LOG.debug("allocating queue");
                        cnxn.queuedBuffer = ChannelBuffers.dynamicBuffer(buf.readableBytes());
                    }
                    cnxn.queuedBuffer.writeBytes(buf);
                    if (LOG.isTraceEnabled()) {
                        LOG.trace(Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump(cnxn.queuedBuffer));
                    }
                } else {
                    LOG.debug("not throttled");
                    if (cnxn.queuedBuffer != null) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace(Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump(cnxn.queuedBuffer));
                        }
                        cnxn.queuedBuffer.writeBytes(buf);
                        if (LOG.isTraceEnabled()) {
                            LOG.trace(Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump(cnxn.queuedBuffer));
                        }
                        cnxn.receiveMessage(cnxn.queuedBuffer);
                        if (!cnxn.queuedBuffer.readable()) {
                            LOG.debug("Processed queue - no bytes remaining");
                            cnxn.queuedBuffer = null;
                        } else {
                            LOG.debug("Processed queue - bytes remaining");
                        }
                    } else {
                        cnxn.receiveMessage(buf);
                        if (buf.readable()) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("Before copy " + buf);
                            }
                            cnxn.queuedBuffer = ChannelBuffers.dynamicBuffer(buf.readableBytes());
                            cnxn.queuedBuffer.writeBytes(buf);
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("Copy is " + cnxn.queuedBuffer);
                                LOG.trace(Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump(cnxn.queuedBuffer));
                            }
                        }
                    }
                }
            }
        }

        @Override
        public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
            if (LOG.isTraceEnabled()) {
                LOG.trace("write complete " + e);
            }
        }

        private final class CertificateVerifier
        implements ChannelFutureListener {
            private final SslHandler sslHandler;
            private final NettyServerCnxn cnxn;

            CertificateVerifier(SslHandler sslHandler, NettyServerCnxn cnxn) {
                this.sslHandler = sslHandler;
                this.cnxn = cnxn;
            }

            @Override
            public void operationComplete(ChannelFuture future) throws SSLPeerUnverifiedException {
                if (future.isSuccess()) {
                    LOG.debug("Successful handshake with session 0x{}", (Object)Long.toHexString(this.cnxn.sessionId));
                    SSLEngine eng = this.sslHandler.getEngine();
                    SSLSession session = eng.getSession();
                    this.cnxn.setClientCertificateChain(session.getPeerCertificates());
                    String authProviderProp = System.getProperty("zookeeper.ssl.authProvider", "x509");
                    X509AuthenticationProvider authProvider = (X509AuthenticationProvider)ProviderRegistry.getProvider(authProviderProp);
                    if (authProvider == null) {
                        LOG.error("Auth provider not found: {}", (Object)authProviderProp);
                        this.cnxn.close();
                        return;
                    }
                    if (KeeperException.Code.OK != authProvider.handleAuthentication(this.cnxn, null)) {
                        LOG.error("Authentication failed for session 0x{}", (Object)Long.toHexString(this.cnxn.sessionId));
                        this.cnxn.close();
                        return;
                    }
                    NettyServerCnxnFactory.this.allChannels.add(future.getChannel());
                    NettyServerCnxnFactory.this.addCnxn(this.cnxn);
                } else {
                    LOG.error("Unsuccessful handshake with session 0x{}", (Object)Long.toHexString(this.cnxn.sessionId));
                    this.cnxn.close();
                }
            }
        }
    }
}

