/*
 * Decompiled with CFR 0.152.
 */
package org.rapidoid.net.impl;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import org.rapidoid.buffer.BufGroup;
import org.rapidoid.buffer.IncompleteReadException;
import org.rapidoid.net.impl.AbstractEventLoop;
import org.rapidoid.net.impl.ConnectionTarget;
import org.rapidoid.net.impl.CtxListener;
import org.rapidoid.net.impl.Protocol;
import org.rapidoid.net.impl.ProtocolException;
import org.rapidoid.net.impl.RapidoidChannel;
import org.rapidoid.net.impl.RapidoidConnection;
import org.rapidoid.net.impl.RapidoidHelper;
import org.rapidoid.pool.ArrayPool;
import org.rapidoid.pool.Pool;
import org.rapidoid.util.SimpleList;
import org.rapidoid.util.U;

public class RapidoidWorker
extends AbstractEventLoop<RapidoidWorker> {
    private final Queue<RapidoidConnection> restarting;
    private final Queue<ConnectionTarget> connecting;
    private final Queue<RapidoidChannel> connected;
    private final SimpleList<RapidoidConnection> done;
    private final Pool<RapidoidConnection> connections;
    private final int maxPipelineSize;
    final Protocol protocol;
    final RapidoidHelper helper;
    private final boolean isProtocolListener;
    private final int bufSize;
    private final boolean noDelay;

    public RapidoidWorker(String name, final BufGroup bufs, Protocol protocol, RapidoidHelper helper, int bufSizeKB, boolean noNelay) {
        super(name);
        this.protocol = protocol;
        this.helper = helper;
        this.maxPipelineSize = U.option("pipeline-max", Integer.MAX_VALUE);
        int queueSize = U.micro() ? 1000 : 1000000;
        int growFactor = U.micro() ? 2 : 10;
        this.restarting = new ArrayBlockingQueue<RapidoidConnection>(queueSize);
        this.connecting = new ArrayBlockingQueue<ConnectionTarget>(queueSize);
        this.connected = new ArrayBlockingQueue<RapidoidChannel>(queueSize);
        this.done = new SimpleList(queueSize / 10, growFactor);
        this.isProtocolListener = protocol instanceof CtxListener;
        this.connections = new ArrayPool<RapidoidConnection>(new Callable<RapidoidConnection>(){

            @Override
            public RapidoidConnection call() throws Exception {
                return new RapidoidConnection(RapidoidWorker.this, bufs);
            }
        }, 100000);
        this.bufSize = bufSizeKB * 1024;
        this.noDelay = noNelay;
    }

    public void accept(SocketChannel socketChannel) throws IOException {
        this.configureSocket(socketChannel);
        this.connected.add(new RapidoidChannel(socketChannel, false));
        this.selector.wakeup();
    }

    public void connect(ConnectionTarget target) throws IOException {
        this.configureSocket(target.socketChannel);
        this.connecting.add(target);
        if (target.socketChannel.connect(target.addr)) {
            U.debug("Opened socket, connected", "address", target.addr);
        } else {
            U.debug("Opened socket, connecting...", "address", target.addr);
        }
        this.selector.wakeup();
    }

    private void configureSocket(SocketChannel socketChannel) throws IOException, SocketException {
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setTcpNoDelay(this.noDelay);
        socket.setReceiveBufferSize(this.bufSize);
        socket.setSendBufferSize(this.bufSize);
        socket.setReuseAddress(true);
    }

    @Override
    protected void connectOP(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        ConnectionTarget target = (ConnectionTarget)key.attachment();
        try {
            boolean ready = socketChannel.finishConnect();
            U.failIf(!ready, "Expected an established connection!");
            this.connected.add(new RapidoidChannel(socketChannel, true));
        }
        catch (ConnectException e) {
            this.retryConnecting(target);
        }
    }

    private void retryConnecting(ConnectionTarget target) throws IOException {
        U.debug("Reconnecting...", "address", target.addr);
        target.socketChannel = SocketChannel.open();
        target.retryAfter = U.time() + 1000L;
        this.connect(target);
    }

    @Override
    protected void readOP(SelectionKey key) throws IOException {
        int read;
        SocketChannel socketChannel = (SocketChannel)key.channel();
        RapidoidConnection conn = (RapidoidConnection)key.attachment();
        try {
            read = conn.input.append(socketChannel);
        }
        catch (Exception e) {
            read = -1;
        }
        if (read == -1) {
            U.debug("The other end closed the connection!");
            if (conn.isClient()) {
                InetSocketAddress addr = conn.getAddress();
                this.close(key);
                this.retryConnecting(new ConnectionTarget(null, addr));
            } else {
                this.close(key);
            }
            return;
        }
        this.processMsgs(conn);
        conn.completedInputPos = conn.input.position();
    }

    private int processMsgs(RapidoidConnection conn) {
        int reqN;
        for (reqN = 0; reqN < this.maxPipelineSize && conn.input().hasRemaining() && this.processNext(conn); ++reqN) {
        }
        return reqN;
    }

    private boolean processNext(RapidoidConnection conn) {
        int pos = conn.input().position();
        int limit = conn.input().limit();
        int osize = conn.output().size();
        try {
            conn.done = false;
            this.protocol.process(conn);
            if (!conn.closed && !conn.isAsync()) {
                conn.done();
            }
            U.debug("Completed message processing");
            return true;
        }
        catch (IncompleteReadException e) {
            U.debug("Incomplete message");
            conn.input().position(pos);
            conn.input().limit(limit);
            conn.output().deleteAfter(osize);
        }
        catch (ProtocolException e) {
            U.warn("Protocol error", "error", e);
            conn.output().deleteAfter(osize);
            conn.write(U.or(e.getMessage(), "Protocol error!"));
            conn.error();
            conn.close(true);
        }
        catch (Throwable e) {
            U.error("Failed to process message!", e);
            conn.close(true);
        }
        return false;
    }

    public void close(RapidoidConnection conn) {
        this.close(conn.key);
    }

    private void close(SelectionKey key) {
        try {
            RapidoidConnection conn = (RapidoidConnection)key.attachment();
            this.clearKey(key);
            if (conn != null && !conn.closed) {
                U.trace("Closing connection", "connection", conn);
                assert (conn.key == key);
                conn.reset();
                this.connections.release(conn);
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void clearKey(SelectionKey key) throws IOException {
        if (key.isValid()) {
            SocketChannel socketChannel = (SocketChannel)key.channel();
            socketChannel.close();
            key.attach(null);
            key.cancel();
        }
    }

    @Override
    protected void writeOP(SelectionKey key) throws IOException {
        RapidoidConnection conn = (RapidoidConnection)key.attachment();
        SocketChannel socketChannel = (SocketChannel)key.channel();
        this.checkOnSameThread();
        try {
            boolean complete;
            int wrote = conn.output.writeTo(socketChannel);
            conn.output.deleteBefore(wrote);
            boolean bl = complete = conn.output.size() == 0;
            if (conn.closeAfterWrite() && complete) {
                this.close(conn);
            } else {
                if (complete) {
                    key.interestOps(1);
                } else {
                    key.interestOps(5);
                }
                conn.wrote(complete);
            }
        }
        catch (IOException e) {
            this.close(conn);
        }
    }

    public void wantToWrite(RapidoidConnection conn) {
        if (this.onSameThread()) {
            conn.key.interestOps(4);
        } else {
            this.wantToWriteAsync(conn);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wantToWriteAsync(RapidoidConnection conn) {
        SimpleList<RapidoidConnection> simpleList = this.done;
        synchronized (simpleList) {
            this.done.add(conn);
        }
        this.selector.wakeup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doProcessing() {
        RapidoidConnection restartedConn;
        RapidoidChannel channel;
        SelectionKey newKey;
        long now = U.time();
        int connectingN = this.connecting.size();
        for (int i = 0; i < connectingN; ++i) {
            ConnectionTarget target = this.connecting.poll();
            assert (target != null);
            if (target.retryAfter < now) {
                U.debug("connecting", "address", target.addr);
                try {
                    newKey = target.socketChannel.register(this.selector, 8);
                    newKey.attach(target);
                }
                catch (ClosedChannelException e) {
                    U.warn("Closed channel", e);
                }
                continue;
            }
            this.connecting.add(target);
        }
        while ((channel = this.connected.poll()) != null) {
            SocketChannel socketChannel = channel.socketChannel;
            U.debug("connected", "address", socketChannel.socket().getRemoteSocketAddress());
            try {
                newKey = socketChannel.register(this.selector, 1);
                RapidoidConnection conn = this.attachConn(newKey);
                conn.setClient(channel.isClient);
                try {
                    this.processNext(conn);
                }
                finally {
                    conn.setInitial(false);
                }
            }
            catch (ClosedChannelException e) {
                U.warn("Closed channel", e);
            }
        }
        while ((restartedConn = this.restarting.poll()) != null) {
            U.debug("restarting", "connection", restartedConn);
            this.processNext(restartedConn);
        }
        SimpleList<RapidoidConnection> simpleList = this.done;
        synchronized (simpleList) {
            for (int i = 0; i < this.done.size(); ++i) {
                RapidoidConnection conn = this.done.get(i);
                if (conn.key == null || !conn.key.isValid()) continue;
                conn.key.interestOps(4);
            }
            this.done.clear();
        }
    }

    private RapidoidConnection attachConn(SelectionKey key) {
        Object attachment = key.attachment();
        assert (attachment == null || attachment instanceof ConnectionTarget);
        RapidoidConnection conn = this.connections.get();
        U.must(conn.closed);
        conn.closed = false;
        conn.key = key;
        if (this.isProtocolListener) {
            conn.setListener((CtxListener)((Object)this.protocol));
        }
        key.attach(conn);
        return conn;
    }

    @Override
    protected void failedOP(SelectionKey key, Throwable e) {
        U.error("Network error", e);
        this.close(key);
    }

    public void restart(RapidoidConnection conn) {
        this.restarting.add(conn);
    }
}

