package org.asyncflows.io.net.selector;

import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import org.asyncflows.core.CoreFlows;
import org.asyncflows.core.Promise;
import org.asyncflows.core.data.Maybe;
import org.asyncflows.core.function.ASupplier;
import org.asyncflows.core.util.CloseableBase;
import org.asyncflows.core.util.CoreFlowsSeq;
import org.asyncflows.core.util.NeedsExport;
import org.asyncflows.core.util.RequestQueue;
import org.asyncflows.core.vats.Vat;
import org.asyncflows.io.AInput;
import org.asyncflows.io.AOutput;
import org.asyncflows.io.BufferOperations;
import org.asyncflows.io.IOExportUtil;
import org.asyncflows.io.IOUtil;
import org.asyncflows.io.net.ASocket;
import org.asyncflows.io.net.SocketExportUtil;
import org.asyncflows.io.net.SocketOptions;
import org.asyncflows.io.net.SocketUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/asyncflows/io/net/selector/SelectorSocket.class */
public class SelectorSocket extends CloseableBase implements ASocket, NeedsExport<ASocket> {
    private static final Logger LOG = LoggerFactory.getLogger(SelectorSocket.class);
    private static final int WRITE_LIMIT = 4096;
    private final ChannelContext channelContext;
    private final SocketChannel socketChannel;
    private final RequestQueue requests;
    private final SocketInput input;
    private final SocketOutput output;

    /* loaded from: input_file:org/asyncflows/io/net/selector/SelectorSocket$SocketInput.class */
    private class SocketInput extends CloseableBase implements AInput<ByteBuffer>, NeedsExport<AInput<ByteBuffer>> {
        public static final int ZERO_COUNT_LIMIT = 8;
        private final RequestQueue requests;
        private boolean eofSeen;

        private SocketInput() {
            this.requests = new RequestQueue();
        }

        @Override // org.asyncflows.io.AInput
        public Promise<Integer> read(final ByteBuffer byteBuffer) {
            return this.requests.runSeqUntilValue(new ASupplier<Maybe<Integer>>() { // from class: org.asyncflows.io.net.selector.SelectorSocket.SocketInput.1
                private boolean firstTime = true;
                private int zeroCount;

                public Promise<Maybe<Integer>> get() throws Exception {
                    ByteBuffer byteBuffer2;
                    int read;
                    if (SocketInput.this.eofSeen) {
                        return IOUtil.EOF_MAYBE_PROMISE;
                    }
                    if (byteBuffer.remaining() == 0) {
                        return CoreFlows.aMaybeValue(0);
                    }
                    if (byteBuffer.isDirect()) {
                        byteBuffer2 = byteBuffer;
                    } else {
                        byteBuffer2 = SelectorSocket.this.channelContext.getDirect();
                        byteBuffer2.clear();
                        byteBuffer2.limit(Math.min(byteBuffer.remaining(), byteBuffer2.capacity()));
                    }
                    try {
                        try {
                            read = SelectorSocket.this.socketChannel.read(byteBuffer2);
                        } catch (IOException e) {
                            if (!this.firstTime) {
                                Promise<Maybe<Integer>> aFailure = CoreFlows.aFailure(e);
                                if (!byteBuffer.isDirect()) {
                                    SelectorSocket.this.channelContext.releaseDirect(byteBuffer2);
                                }
                                return aFailure;
                            }
                            this.firstTime = false;
                        }
                        if (read < 0) {
                            SocketInput.this.eofSeen = true;
                            Promise<Maybe<Integer>> promise = IOUtil.EOF_MAYBE_PROMISE;
                            if (!byteBuffer.isDirect()) {
                                SelectorSocket.this.channelContext.releaseDirect(byteBuffer2);
                            }
                            return promise;
                        }
                        if (read > 0) {
                            if (!byteBuffer.isDirect()) {
                                byteBuffer2.flip();
                                byteBuffer.put(byteBuffer2);
                            }
                            Promise<Maybe<Integer>> aMaybeValue = CoreFlows.aMaybeValue(Integer.valueOf(read));
                            if (!byteBuffer.isDirect()) {
                                SelectorSocket.this.channelContext.releaseDirect(byteBuffer2);
                            }
                            return aMaybeValue;
                        }
                        this.zeroCount++;
                        if (this.zeroCount > 8) {
                            this.zeroCount = 0;
                            SelectorSocket.this.channelContext.changeSelector();
                        }
                        Promise<Maybe<Integer>> waitForRead = SelectorSocket.this.channelContext.waitForRead();
                        if (!byteBuffer.isDirect()) {
                            SelectorSocket.this.channelContext.releaseDirect(byteBuffer2);
                        }
                        return waitForRead;
                    } catch (Throwable th) {
                        if (!byteBuffer.isDirect()) {
                            SelectorSocket.this.channelContext.releaseDirect(byteBuffer2);
                        }
                        throw th;
                    }
                }
            });
        }

        /* renamed from: export, reason: merged with bridge method [inline-methods] */
        public AInput<ByteBuffer> m22export() {
            return m21export(Vat.current());
        }

        /* renamed from: export, reason: merged with bridge method [inline-methods] */
        public AInput<ByteBuffer> m21export(Vat vat) {
            return IOExportUtil.export(vat, this);
        }

        protected Promise<Void> closeAction() {
            try {
                SelectorSocket.this.socketChannel.socket().shutdownInput();
                return CoreFlows.aVoid();
            } catch (ClosedChannelException e) {
                return CoreFlows.aVoid();
            } catch (IOException e2) {
                return CoreFlows.aFailure(e2);
            }
        }
    }

    /* loaded from: input_file:org/asyncflows/io/net/selector/SelectorSocket$SocketOutput.class */
    private class SocketOutput extends CloseableBase implements AOutput<ByteBuffer>, NeedsExport<AOutput<ByteBuffer>> {
        private final RequestQueue requests;

        private SocketOutput() {
            this.requests = new RequestQueue();
        }

        @Override // org.asyncflows.io.AOutput
        public Promise<Void> write(final ByteBuffer byteBuffer) {
            return this.requests.runSeqWhile(new ASupplier<Boolean>() { // from class: org.asyncflows.io.net.selector.SelectorSocket.SocketOutput.1
                private boolean firstRun = true;

                public Promise<Boolean> get() throws Exception {
                    ByteBuffer direct;
                    if (byteBuffer.remaining() == 0) {
                        return CoreFlows.aFalse();
                    }
                    if (byteBuffer.isDirect()) {
                        direct = byteBuffer;
                    } else {
                        direct = SelectorSocket.this.channelContext.getDirect();
                        direct.clear();
                        direct.limit(Math.min(direct.limit(), SelectorSocket.WRITE_LIMIT));
                        int position = byteBuffer.position();
                        BufferOperations.BYTE.put(direct, byteBuffer);
                        byteBuffer.position(position);
                        direct.flip();
                    }
                    try {
                        try {
                            int write = SelectorSocket.this.socketChannel.write(direct);
                            if (write <= 0) {
                                Promise<Boolean> waitForWrite = SelectorSocket.this.channelContext.waitForWrite();
                                if (!byteBuffer.isDirect()) {
                                    SelectorSocket.this.channelContext.releaseDirect(direct);
                                }
                                return waitForWrite;
                            }
                            if (!byteBuffer.isDirect()) {
                                byteBuffer.position(write + byteBuffer.position());
                            }
                            if (!byteBuffer.hasRemaining()) {
                                Promise<Boolean> aFalse = CoreFlows.aFalse();
                                if (!byteBuffer.isDirect()) {
                                    SelectorSocket.this.channelContext.releaseDirect(direct);
                                }
                                return aFalse;
                            }
                            if (!direct.hasRemaining()) {
                                Promise<Boolean> aTrue = CoreFlows.aTrue();
                                if (!byteBuffer.isDirect()) {
                                    SelectorSocket.this.channelContext.releaseDirect(direct);
                                }
                                return aTrue;
                            }
                            this.firstRun = true;
                            Promise<Boolean> aBoolean = CoreFlows.aBoolean(byteBuffer.hasRemaining());
                            if (!byteBuffer.isDirect()) {
                                SelectorSocket.this.channelContext.releaseDirect(direct);
                            }
                            return aBoolean;
                        } catch (IOException e) {
                            if (!this.firstRun) {
                                throw e;
                            }
                            this.firstRun = false;
                            Promise<Boolean> waitForWrite2 = SelectorSocket.this.channelContext.waitForWrite();
                            if (!byteBuffer.isDirect()) {
                                SelectorSocket.this.channelContext.releaseDirect(direct);
                            }
                            return waitForWrite2;
                        }
                    } catch (Throwable th) {
                        if (!byteBuffer.isDirect()) {
                            SelectorSocket.this.channelContext.releaseDirect(direct);
                        }
                        throw th;
                    }
                }
            });
        }

        @Override // org.asyncflows.io.AOutput
        public Promise<Void> flush() {
            return this.requests.run(CoreFlows::aVoid);
        }

        /* renamed from: export, reason: merged with bridge method [inline-methods] */
        public AOutput<ByteBuffer> m24export() {
            return m23export(Vat.current());
        }

        /* renamed from: export, reason: merged with bridge method [inline-methods] */
        public AOutput<ByteBuffer> m23export(Vat vat) {
            return IOExportUtil.export(vat, this);
        }

        protected Promise<Void> closeAction() {
            try {
                SelectorSocket.this.socketChannel.socket().shutdownOutput();
                return CoreFlows.aVoid();
            } catch (ClosedChannelException e) {
                return CoreFlows.aVoid();
            } catch (IOException e2) {
                return CoreFlows.aFailure(e2);
            }
        }
    }

    public SelectorSocket(Selector selector) throws IOException {
        this(selector, SocketChannel.open());
    }

    public SelectorSocket(Selector selector, SocketChannel socketChannel) throws IOException {
        this.requests = new RequestQueue();
        this.input = new SocketInput();
        this.output = new SocketOutput();
        this.socketChannel = socketChannel;
        this.socketChannel.configureBlocking(false);
        this.channelContext = new ChannelContext(socketChannel, selector);
    }

    @Override // org.asyncflows.io.net.ASocket
    public Promise<Void> setOptions(SocketOptions socketOptions) {
        try {
            ensureOpen();
            if (socketOptions != null) {
                SocketUtil.applyOptions(this.socketChannel.socket(), socketOptions);
            }
            return CoreFlows.aVoid();
        } catch (SocketException e) {
            return CoreFlows.aFailure(e);
        }
    }

    @Override // org.asyncflows.io.net.ASocket
    public Promise<Void> connect(SocketAddress socketAddress) {
        return this.requests.run(() -> {
            return this.socketChannel.connect(socketAddress) ? CoreFlows.aVoid() : CoreFlowsSeq.aSeqWhile(() -> {
                ensureOpen();
                if (!this.socketChannel.finishConnect()) {
                    return this.channelContext.waitForConnect();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Socket connected: " + this.socketChannel.socket().getLocalSocketAddress() + " -> " + this.socketChannel.socket().getRemoteSocketAddress());
                }
                return CoreFlows.aFalse();
            });
        });
    }

    @Override // org.asyncflows.io.net.ASocket
    public Promise<SocketAddress> getRemoteAddress() {
        return CoreFlows.aValue(this.socketChannel.socket().getRemoteSocketAddress());
    }

    @Override // org.asyncflows.io.net.ASocket
    public Promise<SocketAddress> getLocalAddress() {
        return CoreFlows.aValue(this.socketChannel.socket().getLocalSocketAddress());
    }

    @Override // org.asyncflows.io.AChannel
    public Promise<AInput<ByteBuffer>> getInput() {
        return CoreFlows.aValue(this.input.m22export());
    }

    @Override // org.asyncflows.io.AChannel
    public Promise<AOutput<ByteBuffer>> getOutput() {
        return CoreFlows.aValue(this.output.m24export());
    }

    /* renamed from: export, reason: merged with bridge method [inline-methods] */
    public ASocket m20export(Vat vat) {
        return SocketExportUtil.export(vat, this);
    }

    protected Promise<Void> closeAction() {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closing socket: " + this.socketChannel.socket().getLocalSocketAddress() + " -> " + this.socketChannel.socket().getRemoteSocketAddress());
            }
            this.socketChannel.close();
            return super.closeAction();
        } catch (IOException e) {
            return CoreFlows.aFailure(e);
        } finally {
            this.channelContext.close();
        }
    }
}
