/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.distributed.cache.server;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.net.ssl.SSLContext;
import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
import org.apache.nifi.distributed.cache.server.CacheServer;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCacheServer
implements CacheServer {
    private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
    private final String identifier;
    private final int port;
    private final int maxReadSize;
    private final SSLContext sslContext;
    protected volatile boolean stopped = false;
    private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<Thread>();
    private volatile ServerSocketChannel serverSocketChannel;

    public AbstractCacheServer(String identifier, SSLContext sslContext, int port, int maxReadSize) {
        this.identifier = identifier;
        this.port = port;
        this.sslContext = sslContext;
        this.maxReadSize = maxReadSize;
    }

    @Override
    public int getPort() {
        return this.serverSocketChannel == null ? this.port : this.serverSocketChannel.socket().getLocalPort();
    }

    @Override
    public void start() throws IOException {
        this.serverSocketChannel = ServerSocketChannel.open();
        this.serverSocketChannel.configureBlocking(true);
        this.serverSocketChannel.bind(new InetSocketAddress(this.port));
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                while (true) {
                    SocketChannel socketChannel;
                    try {
                        socketChannel = AbstractCacheServer.this.serverSocketChannel.accept();
                        logger.debug("Connected to {}", new Object[]{socketChannel});
                    }
                    catch (IOException e) {
                        if (!AbstractCacheServer.this.stopped) {
                            logger.error("{} unable to accept connection from remote peer due to {}", (Object)this, (Object)e.toString());
                            if (logger.isDebugEnabled()) {
                                logger.error("", (Throwable)e);
                            }
                        }
                        return;
                    }
                    Runnable processInputRunnable = new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            SocketChannelOutputStream rawOutputStream;
                            SocketChannelInputStream rawInputStream;
                            String peer = socketChannel.socket().getInetAddress().getHostName();
                            try {
                                if (AbstractCacheServer.this.sslContext == null) {
                                    rawInputStream = new SocketChannelInputStream(socketChannel);
                                    rawOutputStream = new SocketChannelOutputStream(socketChannel);
                                } else {
                                    SSLSocketChannel sslSocketChannel = new SSLSocketChannel(AbstractCacheServer.this.sslContext, socketChannel, false);
                                    sslSocketChannel.connect();
                                    rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
                                    rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
                                }
                            }
                            catch (IOException e) {
                                logger.error("Cannot create input and/or output streams for {}", (Object)new Object[]{AbstractCacheServer.this.identifier}, (Object)e);
                                if (logger.isDebugEnabled()) {
                                    logger.error("", (Throwable)e);
                                }
                                try {
                                    socketChannel.close();
                                }
                                catch (IOException iOException) {
                                    // empty catch block
                                }
                                return;
                            }
                            try (BufferedInputStream in = new BufferedInputStream((InputStream)rawInputStream);
                                 BufferedOutputStream out = new BufferedOutputStream((OutputStream)rawOutputStream);){
                                StandardVersionNegotiator versionNegotiator = AbstractCacheServer.this.getVersionNegotiator();
                                ProtocolHandshake.receiveHandshake((InputStream)in, (OutputStream)out, (VersionNegotiator)versionNegotiator);
                                boolean continueComms = true;
                                while (continueComms) {
                                    continueComms = AbstractCacheServer.this.listen(in, out, versionNegotiator.getVersion());
                                }
                                logger.debug("Client issued close on {}", new Object[]{socketChannel});
                            }
                            catch (SocketTimeoutException e) {
                                logger.debug("30 sec timeout reached", (Throwable)e);
                            }
                            catch (IOException | HandshakeException e) {
                                if (!AbstractCacheServer.this.stopped) {
                                    logger.error("{} unable to communicate with remote peer {} due to {}", new Object[]{this, peer, e.toString()});
                                    if (logger.isDebugEnabled()) {
                                        logger.error("", e);
                                    }
                                }
                            }
                            finally {
                                AbstractCacheServer.this.processInputThreads.remove(Thread.currentThread());
                            }
                        }
                    };
                    Thread processInputThread = new Thread(processInputRunnable);
                    processInputThread.setName("Distributed Cache Server Communications Thread: " + AbstractCacheServer.this.identifier);
                    processInputThread.setDaemon(true);
                    processInputThread.start();
                    AbstractCacheServer.this.processInputThreads.add(processInputThread);
                }
            }
        };
        Thread thread = new Thread(runnable);
        thread.setDaemon(true);
        thread.setName("Distributed Cache Server: " + this.identifier);
        thread.start();
    }

    protected StandardVersionNegotiator getVersionNegotiator() {
        return new StandardVersionNegotiator(new int[]{1});
    }

    @Override
    public void stop() throws IOException {
        this.stopped = true;
        logger.info("Stopping CacheServer {}", new Object[]{this.identifier});
        if (this.serverSocketChannel != null && this.serverSocketChannel.isOpen()) {
            try {
                this.serverSocketChannel.close();
            }
            catch (IOException e) {
                logger.warn("Server Socket Close Failed", (Throwable)e);
            }
        }
        for (Thread processInputThread : this.processInputThreads) {
            processInputThread.interrupt();
            int i = 0;
            while (!processInputThread.isInterrupted() && i++ < 5) {
                try {
                    Thread.sleep(50L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        this.processInputThreads.clear();
    }

    public String toString() {
        return "CacheServer[id=" + this.identifier + "]";
    }

    protected abstract boolean listen(InputStream var1, OutputStream var2, int var3) throws IOException;

    protected byte[] readValue(DataInputStream dis) throws IOException {
        int numBytes = this.validateSize(dis.readInt());
        byte[] buffer = new byte[numBytes];
        dis.readFully(buffer);
        return buffer;
    }

    protected int validateSize(int size) {
        if (size <= this.maxReadSize) {
            return size;
        }
        throw new IllegalStateException(String.format("Size [%d] exceeds maximum configured read [%d]", size, this.maxReadSize));
    }
}

