/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.ovsdb.lib.impl;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLPeerUnverifiedException;
import org.opendaylight.aaa.cert.api.ICertificateManager;
import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.opendaylight.ovsdb.lib.OvsdbConnection;
import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
import org.opendaylight.ovsdb.lib.impl.ChannelConnectionHandler;
import org.opendaylight.ovsdb.lib.impl.NettyBootstrapFactory;
import org.opendaylight.ovsdb.lib.impl.OvsdbClientImpl;
import org.opendaylight.ovsdb.lib.impl.StalePassiveConnectionService;
import org.opendaylight.ovsdb.lib.jsonrpc.ExceptionHandler;
import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Component(service={OvsdbConnection.class}, configurationPid={"org.opendaylight.ovsdb.library"})
@Designate(ocd=Configuration.class)
public class OvsdbConnectionService
implements AutoCloseable,
OvsdbConnection {
    private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionService.class);
    private static final int IDLE_READER_TIMEOUT = 30;
    private static final int READ_TIMEOUT = 180;
    private static final int RETRY_PERIOD = 100;
    private static final String DEFAULT_LISTENER_IP = "0.0.0.0";
    private static final int DEFAULT_LISTENER_PORT = 6640;
    private static final int DEFAULT_RPC_TASK_TIMEOUT = 1000;
    private static final int DEFAULT_JSON_RPC_DECODER_MAX_FRAME_LENGTH = 100000;
    private static final StringEncoder UTF8_ENCODER = new StringEncoder(StandardCharsets.UTF_8);
    private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(10, new ThreadFactoryBuilder().setNameFormat("OVSDBPassiveConnServ-%d").build());
    private static final ExecutorService CONNECTION_NOTIFIER_SERVICE = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("OVSDBConnNotifSer-%d").build());
    private static final StalePassiveConnectionService STALE_PASSIVE_CONNECTION_SERVICE = new StalePassiveConnectionService(client -> {
        OvsdbConnectionService.notifyListenerForPassiveConnection(client);
        return null;
    });
    private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
    private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<OvsdbClient, Channel>();
    private final NettyBootstrapFactory bootstrapFactory;
    private final ICertificateManager certManagerSrv;
    private final boolean useSSL;
    private final int jsonRpcDecoderMaxFrameLength;
    private final AtomicBoolean singletonCreated = new AtomicBoolean(false);
    private volatile Channel serverChannel;
    private final String listenerIp;
    private final int listenerPort;

    @Inject
    public OvsdbConnectionService(NettyBootstrapFactory bootstrapFactory, ICertificateManager certManagerSrv) {
        this(bootstrapFactory, certManagerSrv, DEFAULT_LISTENER_IP, 6640, 1000, false, 100000);
    }

    @Activate
    public OvsdbConnectionService(@Reference NettyBootstrapFactory bootstrapFactory, @Reference(target="(type=default-certificate-manager)") ICertificateManager certManagerSrv, Configuration configuration) {
        this(bootstrapFactory, certManagerSrv, configuration.ovsdb$_$listener$_$ip(), configuration.ovsdb$_$listener$_$port(), configuration.ovsdb$_$rpc$_$task$_$timeout(), configuration.use$_$ssl(), configuration.json$_$rpc$_$decoder$_$max$_$frame$_$length());
    }

    public OvsdbConnectionService(NettyBootstrapFactory bootstrapFactory, ICertificateManager certManagerSrv, String listenerIp, int listenerPort, int ovsdbRpcTaskTimeout, boolean useSSL, int jsonRpcDecoderMaxFrameLength) {
        this.bootstrapFactory = Objects.requireNonNull(bootstrapFactory);
        this.certManagerSrv = Objects.requireNonNull(certManagerSrv);
        this.listenerIp = Objects.requireNonNull(listenerIp);
        this.listenerPort = listenerPort;
        this.useSSL = useSSL;
        this.jsonRpcDecoderMaxFrameLength = jsonRpcDecoderMaxFrameLength;
        JsonRpcEndpoint.setReaperInterval(ovsdbRpcTaskTimeout);
        LOG.info("OVSDB IP for listening connection is set to : {}", (Object)listenerIp);
        LOG.info("OVSDB port for listening connection is set to : {}", (Object)listenerPort);
        LOG.info("Json Rpc Decoder Max Frame Length set to : {}", (Object)jsonRpcDecoderMaxFrameLength);
    }

    @Override
    public OvsdbClient connect(InetAddress address, int port) {
        if (this.useSSL) {
            if (this.certManagerSrv == null) {
                LOG.error("Certificate Manager service is not available cannot establish the SSL communication.");
                return null;
            }
            return this.connectWithSsl(address, port, this.certManagerSrv);
        }
        return this.connectWithSsl(address, port, null);
    }

    @Override
    public OvsdbClient connectWithSsl(InetAddress address, int port, ICertificateManager certificateManagerSrv) {
        ChannelFuture future = ((Bootstrap)this.bootstrapFactory.newClient().handler((ChannelHandler)(certificateManagerSrv == null ? new ClientChannelInitializer() : new SslClientChannelInitializer(certificateManagerSrv, address, port)))).connect(address, port);
        try {
            future.sync();
        }
        catch (InterruptedException e) {
            LOG.warn("Failed to connect {}:{}", new Object[]{address, port, e});
            return null;
        }
        catch (Throwable throwable) {
            LOG.error("Error while binding to address {}, port {}", new Object[]{address, port, throwable});
            throw throwable;
        }
        return OvsdbConnectionService.getChannelClient(future.channel(), OvsdbConnectionInfo.ConnectionType.ACTIVE, OvsdbConnectionInfo.SocketConnectionType.SSL);
    }

    @Override
    public void disconnect(OvsdbClient client) {
        if (client == null) {
            return;
        }
        Channel channel = CONNECTIONS.get(client);
        if (channel != null) {
            client.setConnectionPublished(false);
            channel.disconnect();
        }
        CONNECTIONS.remove(client);
    }

    @Override
    public void registerConnectionListener(OvsdbConnectionListener listener) {
        LOG.info("registerConnectionListener: registering {}", (Object)listener.getClass().getSimpleName());
        if (CONNECTION_LISTENERS.add(listener)) {
            LOG.info("registerConnectionListener: registered {} notifying exisitng connections", (Object)listener.getClass().getSimpleName());
            this.notifyAlreadyExistingConnectionsToListener(listener);
        }
    }

    private void notifyAlreadyExistingConnectionsToListener(OvsdbConnectionListener listener) {
        for (OvsdbClient client : this.getConnections()) {
            CONNECTION_NOTIFIER_SERVICE.execute(() -> {
                LOG.trace("Connection {} notified to listener {}", (Object)client.getConnectionInfo(), (Object)listener);
                listener.connected(client);
            });
        }
    }

    @Override
    public void unregisterConnectionListener(OvsdbConnectionListener listener) {
        CONNECTION_LISTENERS.remove(listener);
    }

    private static OvsdbClient getChannelClient(Channel channel, OvsdbConnectionInfo.ConnectionType type, OvsdbConnectionInfo.SocketConnectionType socketConnType) {
        JsonRpcEndpoint endpoint = new JsonRpcEndpoint(channel);
        channel.pipeline().addLast(new ChannelHandler[]{endpoint});
        OvsdbClientImpl client = new OvsdbClientImpl(endpoint, channel, type, socketConnType);
        client.setConnectionPublished(true);
        CONNECTIONS.put(client, channel);
        channel.closeFuture().addListener((GenericFutureListener)new ChannelConnectionHandler(client));
        return client;
    }

    @Override
    public synchronized boolean startOvsdbManager() {
        int ovsdbListenerPort = this.listenerPort;
        String ovsdbListenerIp = this.listenerIp;
        if (this.singletonCreated.getAndSet(true)) {
            return false;
        }
        LOG.info("startOvsdbManager: Starting");
        this.ovsdbManager(ovsdbListenerIp, ovsdbListenerPort);
        return true;
    }

    @Override
    public synchronized boolean startOvsdbManagerWithSsl(String ovsdbListenIp, int ovsdbListenPort, ICertificateManager certificateManagerSrv, String[] protocols, String[] cipherSuites) {
        if (!this.singletonCreated.getAndSet(true)) {
            this.ovsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort, certificateManagerSrv == null ? new ServerChannelInitializer() : new SslServerChannelInitializer(certificateManagerSrv, protocols, cipherSuites));
            return true;
        }
        return false;
    }

    @Override
    public synchronized boolean restartOvsdbManagerWithSsl(String ovsdbListenIp, int ovsdbListenPort, ICertificateManager certificateManagerSrv, String[] protocols, String[] cipherSuites) {
        if (this.singletonCreated.getAndSet(false) && this.serverChannel != null) {
            this.serverChannel.close();
            LOG.info("Server channel closed");
        }
        this.serverChannel = null;
        return this.startOvsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort, certificateManagerSrv, protocols, cipherSuites);
    }

    private void ovsdbManager(String ip, int port) {
        if (this.useSSL) {
            if (this.certManagerSrv == null) {
                LOG.error("Certificate Manager service is not available cannot establish the SSL communication.");
                return;
            }
            this.ovsdbManagerWithSsl(ip, port, new SslServerChannelInitializer(this.certManagerSrv));
        } else {
            this.ovsdbManagerWithSsl(ip, port, new ServerChannelInitializer());
        }
    }

    private void ovsdbManagerWithSsl(String ip, int port, ServerChannelInitializer channelHandler) {
        ((ServerBootstrap)this.bootstrapFactory.newServer().handler((ChannelHandler)new LoggingHandler(LogLevel.INFO))).childHandler((ChannelHandler)channelHandler).bind(ip, port).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            if (future.isSuccess()) {
                this.serverChannel = future.channel();
            } else {
                LOG.error("Error while binding to address {}, port {}", new Object[]{ip, port, future.cause()});
            }
        }));
    }

    private static void handleNewPassiveConnection(final OvsdbClient client) {
        ListenableFuture<List<String>> echoFuture = client.echo();
        LOG.debug("Send echo message to probe the OVSDB switch {}", (Object)client.getConnectionInfo());
        Futures.addCallback(echoFuture, (FutureCallback)new FutureCallback<List<String>>(){

            public void onSuccess(List<String> result) {
                LOG.info("Probe was successful to OVSDB switch {}", (Object)client.getConnectionInfo());
                try {
                    OvsdbConnectionService.getPassiveClientsFromSameNode(client);
                }
                catch (Throwable throwable) {
                    LOG.error("Failed to get passive clients from same node", throwable);
                }
                OvsdbConnectionService.notifyListenerForPassiveConnection(client);
            }

            public void onFailure(Throwable failureException) {
                LOG.error("Probe failed to OVSDB switch. Disconnecting the channel {}", (Object)client.getConnectionInfo());
                client.disconnect();
            }
        }, (Executor)CONNECTION_NOTIFIER_SERVICE);
    }

    private static void handleNewPassiveConnection(final Channel channel) {
        if (!channel.isOpen()) {
            LOG.warn("Channel {} is not open, skipped further processing of the connection.", (Object)channel);
            return;
        }
        final SslHandler sslHandler = (SslHandler)channel.pipeline().get("ssl");
        if (sslHandler != null) {
            class HandleNewPassiveSslRunner
            implements Runnable {
                private int retryTimes = 3;

                HandleNewPassiveSslRunner() {
                }

                private void retry() {
                    if (this.retryTimes > 0) {
                        EXECUTOR_SERVICE.schedule(this, 100L, TimeUnit.MILLISECONDS);
                    } else {
                        LOG.debug("channel closed {}", (Object)channel);
                        channel.disconnect();
                    }
                    --this.retryTimes;
                }

                @Override
                public void run() {
                    SSLEngineResult.HandshakeStatus status = sslHandler.engine().getHandshakeStatus();
                    LOG.debug("Handshake status {}", (Object)status);
                    switch (status) {
                        case FINISHED: 
                        case NOT_HANDSHAKING: {
                            if (sslHandler.engine().getSession().getCipherSuite().equals("SSL_NULL_WITH_NULL_NULL")) {
                                LOG.debug("handshake not begin yet {}", (Object)status);
                                this.retry();
                                break;
                            }
                            try {
                                sslHandler.engine().getSession().getPeerCertificates();
                                OvsdbClient client = OvsdbConnectionService.getChannelClient(channel, OvsdbConnectionInfo.ConnectionType.PASSIVE, OvsdbConnectionInfo.SocketConnectionType.SSL);
                                OvsdbConnectionService.handleNewPassiveConnection(client);
                            }
                            catch (SSLPeerUnverifiedException e) {
                                LOG.debug("Peer certifiacte is not verified yet {}", (Object)status);
                                this.retry();
                            }
                            break;
                        }
                        case NEED_UNWRAP: 
                        case NEED_TASK: {
                            LOG.debug("handshake not done yet {}", (Object)status);
                            this.retry();
                            break;
                        }
                        case NEED_WRAP: {
                            if (sslHandler.engine().getSession().getCipherSuite().equals("SSL_NULL_WITH_NULL_NULL")) {
                                LOG.error("Ssl handshake fail. channel {}", (Object)channel);
                                channel.disconnect();
                                break;
                            }
                            LOG.debug("handshake not done yet {}", (Object)status);
                            this.retry();
                            break;
                        }
                        default: {
                            LOG.error("unknown hadshake status {}", (Object)status);
                        }
                    }
                }
            }
            EXECUTOR_SERVICE.schedule(new HandleNewPassiveSslRunner(), 100L, TimeUnit.MILLISECONDS);
        } else {
            EXECUTOR_SERVICE.execute(() -> {
                OvsdbClient client = OvsdbConnectionService.getChannelClient(channel, OvsdbConnectionInfo.ConnectionType.PASSIVE, OvsdbConnectionInfo.SocketConnectionType.NON_SSL);
                OvsdbConnectionService.handleNewPassiveConnection(client);
            });
        }
    }

    public static void channelClosed(OvsdbClient client) {
        LOG.info("Connection closed {}", (Object)client.getConnectionInfo());
        CONNECTIONS.remove(client);
        if (client.isConnectionPublished()) {
            for (OvsdbConnectionListener listener : CONNECTION_LISTENERS) {
                listener.disconnected(client);
            }
        }
        STALE_PASSIVE_CONNECTION_SERVICE.clientDisconnected(client);
    }

    @Override
    public Collection<OvsdbClient> getConnections() {
        return CONNECTIONS.keySet();
    }

    @Override
    public void close() throws Exception {
        LOG.info("OvsdbConnectionService closed");
        JsonRpcEndpoint.close();
    }

    @Override
    public OvsdbClient getClient(Channel channel) {
        for (Map.Entry<OvsdbClient, Channel> entry : CONNECTIONS.entrySet()) {
            OvsdbClient client = entry.getKey();
            Channel ctx = entry.getValue();
            if (!ctx.equals(channel)) continue;
            return client;
        }
        return null;
    }

    private static List<OvsdbClient> getPassiveClientsFromSameNode(OvsdbClient ovsdbClient) {
        ArrayList<OvsdbClient> passiveClients = new ArrayList<OvsdbClient>();
        for (OvsdbClient client : CONNECTIONS.keySet()) {
            if (client.equals(ovsdbClient) || !client.getConnectionInfo().getRemoteAddress().equals(ovsdbClient.getConnectionInfo().getRemoteAddress()) || client.getConnectionInfo().getType() != OvsdbConnectionInfo.ConnectionType.PASSIVE) continue;
            passiveClients.add(client);
        }
        return passiveClients;
    }

    public static void notifyListenerForPassiveConnection(OvsdbClient client) {
        client.setConnectionPublished(true);
        for (OvsdbConnectionListener listener : CONNECTION_LISTENERS) {
            CONNECTION_NOTIFIER_SERVICE.execute(() -> {
                LOG.trace("Connection {} notified to listener {}", (Object)client.getConnectionInfo(), (Object)listener);
                listener.connected(client);
            });
        }
    }

    @ObjectClassDefinition
    public static @interface Configuration {
        @AttributeDefinition
        public String ovsdb$_$listener$_$ip() default "0.0.0.0";

        @AttributeDefinition(min="1", max="65535")
        public int ovsdb$_$listener$_$port() default 6640;

        @AttributeDefinition
        public int ovsdb$_$rpc$_$task$_$timeout() default 1000;

        @AttributeDefinition
        public boolean use$_$ssl() default false;

        @AttributeDefinition
        public int json$_$rpc$_$decoder$_$max$_$frame$_$length() default 100000;
    }

    private class ClientChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private ClientChannelInitializer() {
        }

        public void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast(new ChannelHandler[]{new JsonRpcDecoder(OvsdbConnectionService.this.jsonRpcDecoderMaxFrameLength), UTF8_ENCODER, new IdleStateHandler(30, 0, 0), new ReadTimeoutHandler(180), new ExceptionHandler(OvsdbConnectionService.this)});
        }
    }

    private class SslClientChannelInitializer
    extends ClientChannelInitializer {
        private final ICertificateManager certManagerSrv;
        private final InetAddress address;
        private final int port;

        SslClientChannelInitializer(ICertificateManager certManagerSrv, InetAddress address, int port) {
            this.certManagerSrv = Objects.requireNonNull(certManagerSrv);
            this.address = Objects.requireNonNull(address);
            this.port = port;
        }

        @Override
        public void initChannel(SocketChannel channel) throws Exception {
            SSLContext sslContext = this.certManagerSrv.getServerContext();
            if (sslContext != null) {
                SSLEngine engine = sslContext.createSSLEngine(this.address.toString(), this.port);
                engine.setUseClientMode(true);
                channel.pipeline().addLast("ssl", (ChannelHandler)new SslHandler(engine));
            }
            super.initChannel(channel);
        }
    }

    private class ServerChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private ServerChannelInitializer() {
        }

        public final void initChannel(SocketChannel channel) {
            LOG.debug("New Passive channel created : {}", (Object)channel);
            this.initChannelImpl(channel);
        }

        void initChannelImpl(SocketChannel channel) {
            channel.pipeline().addLast(new ChannelHandler[]{new JsonRpcDecoder(OvsdbConnectionService.this.jsonRpcDecoderMaxFrameLength), UTF8_ENCODER, new IdleStateHandler(30, 0, 0), new ReadTimeoutHandler(180), new ExceptionHandler(OvsdbConnectionService.this)});
            OvsdbConnectionService.handleNewPassiveConnection((Channel)channel);
        }
    }

    private final class SslServerChannelInitializer
    extends ServerChannelInitializer {
        private final ICertificateManager certManagerSrv;
        private final String[] protocols;
        private final String[] cipherSuites;

        SslServerChannelInitializer(ICertificateManager certManagerSrv, String[] protocols, String[] cipherSuites) {
            this.certManagerSrv = Objects.requireNonNull(certManagerSrv);
            this.protocols = Objects.requireNonNull(protocols);
            this.cipherSuites = Objects.requireNonNull(cipherSuites);
        }

        SslServerChannelInitializer(ICertificateManager certManagerSrv) {
            this(certManagerSrv, certManagerSrv.getTlsProtocols(), certManagerSrv.getCipherSuites());
        }

        @Override
        void initChannelImpl(SocketChannel channel) {
            SSLContext sslContext = this.certManagerSrv.getServerContext();
            if (sslContext != null) {
                SSLEngine engine = sslContext.createSSLEngine();
                engine.setUseClientMode(false);
                engine.setNeedClientAuth(true);
                if (this.protocols != null && this.protocols.length > 0) {
                    engine.setEnabledProtocols(this.protocols);
                    LOG.debug("Supported ssl protocols {}", (Object)Arrays.toString(engine.getSupportedProtocols()));
                    LOG.debug("Enabled ssl protocols {}", (Object)Arrays.toString(engine.getEnabledProtocols()));
                }
                if (this.cipherSuites != null && this.cipherSuites.length > 0) {
                    engine.setEnabledCipherSuites(this.cipherSuites);
                    LOG.debug("Enabled cipher suites {}", (Object)Arrays.toString(engine.getEnabledCipherSuites()));
                }
                channel.pipeline().addLast("ssl", (ChannelHandler)new SslHandler(engine));
            }
            super.initChannelImpl(channel);
        }
    }
}

