/*
 * Decompiled with CFR 0.152.
 */
package org.webpieces.nio.impl.cm.basic;

import java.io.IOException;
import java.net.PortUnreachableException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.webpieces.data.api.BufferPool;
import org.webpieces.nio.api.channels.Channel;
import org.webpieces.nio.api.channels.RegisterableChannel;
import org.webpieces.nio.api.exceptions.NioException;
import org.webpieces.nio.api.handlers.DataListener;
import org.webpieces.nio.api.jdk.JdkSelect;
import org.webpieces.nio.impl.cm.basic.BasChannelImpl;
import org.webpieces.nio.impl.cm.basic.BasTCPChannel;
import org.webpieces.nio.impl.cm.basic.BasTCPServerChannel;
import org.webpieces.nio.impl.cm.basic.ChannelInfo;
import org.webpieces.nio.impl.cm.basic.OpType;
import org.webpieces.nio.impl.cm.basic.RegisterableChannelImpl;
import org.webpieces.nio.impl.cm.basic.SelectorManager2;
import org.webpieces.util.logging.Logger;
import org.webpieces.util.logging.LoggerFactory;

public final class KeyProcessor {
    private static final Logger apiLog = LoggerFactory.getLogger(DataListener.class);
    private static final Logger log = LoggerFactory.getLogger(KeyProcessor.class);
    private static boolean logBufferNextRead = false;
    private JdkSelect selector;
    private BufferPool pool;

    public KeyProcessor(JdkSelect selector, BufferPool pool) {
        this.selector = selector;
        this.pool = pool;
    }

    public void processKeys(Set<SelectionKey> keySet) {
        Iterator<SelectionKey> iter = keySet.iterator();
        while (iter.hasNext()) {
            SelectionKey key = null;
            RegisterableChannel channel = null;
            try {
                key = iter.next();
                ChannelInfo struct = (ChannelInfo)key.attachment();
                channel = struct.getChannel();
                SelectionKey current = key;
                RegisterableChannel finalChannel = channel;
                log.trace(() -> finalChannel + " ops=" + OpType.opType(current.readyOps()) + " acc=" + current.isAcceptable() + " read=" + current.isReadable() + " write" + current.isWritable());
                this.processKey(key, struct);
            }
            catch (IOException e) {
                log.error(channel + "Processing of key failed, closing channel", (Throwable)e);
                try {
                    if (key == null) continue;
                    key.channel().close();
                }
                catch (Throwable ee) {
                    log.error(channel + "Close of channel failed", ee);
                }
            }
            catch (CancelledKeyException e) {
                RegisterableChannel fChannel = channel;
                log.trace(() -> fChannel + "Processing of key failed, but continuing channel manager loop", (Throwable)e);
            }
            catch (Throwable e) {
                log.error(channel + "Processing of key failed, but continuing channel manager loop", e);
                try {
                    key.cancel();
                }
                catch (Throwable ee) {
                    log.info(channel + "cancelling key failed.  exception type=" + ee.getClass() + " msg=" + ee.getMessage());
                }
            }
        }
        keySet.clear();
    }

    private void processKey(SelectionKey key, ChannelInfo info) throws IOException, InterruptedException {
        log.trace(() -> key.attachment() + "proccessing");
        if (!this.selector.isChannelOpen(key) || !key.isValid()) {
            return;
        }
        if (key.isAcceptable()) {
            this.acceptSocket(key, info);
        }
        if (key.isConnectable()) {
            this.connect(key, info);
        }
        if (key.isWritable()) {
            this.write(key, info);
        }
        if (key.isReadable()) {
            this.read(key, info);
        }
    }

    private void acceptSocket(SelectionKey key, ChannelInfo info) throws IOException {
        log.trace(() -> info.getChannel() + "Incoming Connection=" + key);
        BasTCPServerChannel channel = (BasTCPServerChannel)info.getChannel();
        channel.accept(channel.getChannelCount());
    }

    private void connect(SelectionKey key, ChannelInfo info) throws IOException {
        log.trace(() -> info.getChannel() + "finishing connect process");
        CompletableFuture<Channel> callback = info.getConnectCallback();
        BasTCPChannel channel = (BasTCPChannel)info.getChannel();
        try {
            int interests = key.interestOps();
            key.interestOps(interests & 0xFFFFFFF7);
            channel.finishConnect();
            callback.complete(channel);
        }
        catch (Exception e) {
            log.error(key.attachment() + "Could not open connection", (Throwable)e);
            callback.completeExceptionally(e);
        }
    }

    private void read(SelectionKey key, ChannelInfo info) throws IOException {
        log.trace(() -> info.getChannel() + "reading data");
        DataListener in = info.getDataHandler();
        BasChannelImpl channel = (BasChannelImpl)info.getChannel();
        ByteBuffer chunk = this.pool.nextBuffer(1024);
        try {
            if (logBufferNextRead) {
                log.info(channel + "buffer=" + chunk);
            }
            int bytes = channel.readImpl(chunk);
            if (logBufferNextRead) {
                logBufferNextRead = false;
                log.info(channel + "buffer2=" + chunk);
            }
            this.processBytes(key, info, chunk, bytes);
        }
        catch (PortUnreachableException e) {
            log.trace(() -> "Client sent data to a host or port that is not listening to udp, or udp can't get through to that machine", (Throwable)e);
            in.failure(channel, null, e);
        }
        catch (NotYetConnectedException e) {
            log.error("Can't read until UDPChannel is connected", (Throwable)e);
            in.failure(channel, null, e);
        }
        catch (IOException e) {
            this.process(key, in, info, chunk, e);
        }
        catch (NioException e) {
            Throwable cause = e.getCause();
            if (cause instanceof IOException) {
                IOException ioExc = (IOException)cause;
                this.process(key, in, info, chunk, ioExc);
            }
            throw e;
        }
    }

    private void process(SelectionKey key, DataListener in, ChannelInfo info, ByteBuffer chunk, IOException e) throws IOException {
        Channel channel = (Channel)info.getChannel();
        String msg = e.getMessage();
        if (msg != null && (msg.contains("An existing connection was forcibly closed") || msg.contains("Connection reset by peer") || msg.contains("An established connection was aborted by the software in your host machine"))) {
            log.trace(() -> "Exception 2", (Throwable)e);
            this.processBytes(key, info, chunk, -1);
        } else {
            log.error("IO Exception unexpected", (Throwable)e);
            in.failure(channel, null, e);
        }
    }

    private void processBytes(SelectionKey key, ChannelInfo info, ByteBuffer data, int bytes) throws IOException {
        DataListener in = info.getDataHandler();
        BasChannelImpl channel = (BasChannelImpl)info.getChannel();
        ByteBuffer b = data;
        b.flip();
        if (bytes < 0) {
            apiLog.trace(() -> channel + "far end closed, cancel key, close socket");
            channel.serverClosed();
            in.farEndClosed(channel);
        } else if (bytes > 0) {
            apiLog.trace(() -> channel + "READ bytes=" + bytes);
            this.fireIncomingRead(key, bytes, in, channel, b);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fireIncomingRead(SelectionKey key, int bytes, DataListener in, BasChannelImpl channel, ByteBuffer b) {
        int unackedByteCnt;
        CompletableFuture<Void> future = in.incomingData(channel, b);
        boolean unregister = false;
        BasChannelImpl basChannelImpl = channel;
        synchronized (basChannelImpl) {
            unackedByteCnt = channel.addUnackedByteCount(bytes);
            if (channel.isOverMaxUnacked()) {
                unregister = true;
            }
        }
        if (unregister) {
            log.warn(channel + "Overloaded channel.  unregistering until YOU catch up you slowass(lol). num=" + unackedByteCnt + " max=" + channel.getMaxUnacked());
            this.unregisterSelectableChannel(channel, 1);
        }
        future.handle((v, t) -> {
            int unackedCnt;
            boolean register = false;
            BasChannelImpl basChannelImpl = channel;
            synchronized (basChannelImpl) {
                unackedCnt = channel.addUnackedByteCount(-bytes);
                if (!this.isReading(key) && channel.isUnderThreshold()) {
                    register = true;
                }
            }
            if (register) {
                log.warn(channel + "BOOM. you caught back up, reregistering for reads now. unackedCnt=" + unackedCnt + " readThreshold=" + channel.getReadThreshold());
                channel.registerForReads();
            }
            if (t != null) {
                apiLog.error(channel + " Exception on incoming data", t);
            }
            return null;
        });
    }

    private boolean isReading(SelectionKey key) {
        return (key.interestOps() & 1) > 0;
    }

    private void write(SelectionKey key, ChannelInfo info) throws IOException, InterruptedException {
        log.trace(() -> info.getChannel() + "writing data");
        BasChannelImpl channel = (BasChannelImpl)info.getChannel();
        log.trace(() -> channel + "notifying channel of write");
        channel.writeAll();
    }

    void unregisterSelectableChannel(RegisterableChannelImpl channel, int ops) {
        SelectorManager2 mgr = channel.getSelectorManager();
        if (!Thread.currentThread().equals(mgr.getThread())) {
            throw new RuntimeException(channel + "Bug, changing selector keys can only be done on registration thread because there is not synchronization");
        }
        SelectionKey key = channel.keyFor();
        if (key == null || !key.isValid()) {
            return;
        }
        int previous = key.interestOps();
        int opsNow = previous & ~ops;
        key.interestOps(opsNow);
        if (key.attachment() != null) {
            ChannelInfo struct = (ChannelInfo)key.attachment();
            struct.removeListener(ops);
        }
    }
}

