package io.vproxy.base.selector.wrap.arqudp;

import io.vproxy.base.Config;
import io.vproxy.base.selector.Handler;
import io.vproxy.base.selector.HandlerContext;
import io.vproxy.base.selector.PeriodicEvent;
import io.vproxy.base.selector.SelectorEventLoop;
import io.vproxy.base.selector.wrap.VirtualFD;
import io.vproxy.base.selector.wrap.WrappedSelector;
import io.vproxy.base.util.ByteArray;
import io.vproxy.base.util.LogType;
import io.vproxy.base.util.Logger;
import io.vproxy.base.util.Utils;
import io.vproxy.base.util.nio.ByteArrayChannel;
import io.vproxy.vfd.EventSet;
import io.vproxy.vfd.FD;
import io.vproxy.vfd.IPPort;
import io.vproxy.vfd.SocketFD;
import io.vproxy.vmirror.MirrorDataFactory;
import java.io.IOException;
import java.net.SocketOption;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/vproxy/base/selector/wrap/arqudp/ArqUDPSocketFD.class */
public class ArqUDPSocketFD implements SocketFD, VirtualFD {
    private final boolean initiallyConnected;
    private final SocketFD fd;
    private final SelectorEventLoop loop;
    private final ArqUDPHandler handler;
    private final WrappedSelector selector;
    private final ArqUDPInsideFDHandler fdHandler;
    private final Deque<ByteBuffer> readBufs;
    private final Deque<ByteArrayChannel> writeBufs;
    private boolean notFullySent;
    private PeriodicEvent periodicEvent;
    private final MirrorDataFactory readingMirrorDataFactory;
    private final MirrorDataFactory writingMirrorDataFactory;
    private boolean selfFDReadable;
    private boolean selfFDWritable;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vproxy/base/selector/wrap/arqudp/ArqUDPSocketFD$ArqUDPInsideFDHandler.class */
    public class ArqUDPInsideFDHandler implements Handler<SocketFD> {
        private final ByteBuffer tmpBuffer = Utils.allocateByteBuffer(Config.udpMtu);
        private IOException error = null;
        private boolean invalid = false;
        static final /* synthetic */ boolean $assertionsDisabled;

        private ArqUDPInsideFDHandler() {
        }

        private void setError(IOException iOException) {
            this.error = iOException;
            ArqUDPSocketFD.this.setSelfFDReadable();
        }

        public IOException getError() {
            return this.error;
        }

        public boolean isInvalid() {
            return this.invalid;
        }

        private void watchInsideFDReadable() {
            ArqUDPSocketFD.this.loop.addOps(ArqUDPSocketFD.this.fd, EventSet.read());
        }

        private void unwatchInsideFDReadable() {
            ArqUDPSocketFD.this.loop.rmOps(ArqUDPSocketFD.this.fd, EventSet.read());
        }

        private void watchInsideFDWritable() {
            ArqUDPSocketFD.this.loop.addOps(ArqUDPSocketFD.this.fd, EventSet.write());
        }

        private void unwatchInsideFDWritable() {
            try {
                ArqUDPSocketFD.this.loop.rmOps(ArqUDPSocketFD.this.fd, EventSet.write());
            } catch (CancelledKeyException e) {
            }
        }

        @Override // io.vproxy.base.selector.Handler
        public void accept(HandlerContext<SocketFD> handlerContext) {
        }

        @Override // io.vproxy.base.selector.Handler
        public void connected(HandlerContext<SocketFD> handlerContext) {
            ArqUDPSocketFD.this.setSelfFDWritable();
            if (ArqUDPSocketFD.this.writeBufs.peek() == null) {
                return;
            }
            writable(handlerContext);
        }

        @Override // io.vproxy.base.selector.Handler
        public void readable(HandlerContext<SocketFD> handlerContext) {
            while (true) {
                try {
                    int read = handlerContext.getChannel().read(this.tmpBuffer);
                    if (read < 0) {
                        if (!$assertionsDisabled && !Logger.lowLevelDebug("reading data from " + handlerContext.getChannel() + " failed with " + read)) {
                            throw new AssertionError();
                        }
                        this.invalid = true;
                        unwatchInsideFDReadable();
                        return;
                    }
                    if (read == 0) {
                        if (!$assertionsDisabled && !Logger.lowLevelDebug("read nothing, nothing to handle " + handlerContext.getChannel())) {
                            throw new AssertionError();
                        }
                        return;
                    } else {
                        try {
                            readableOne();
                            this.tmpBuffer.limit(this.tmpBuffer.capacity()).position(0);
                        } catch (Throwable th) {
                            this.tmpBuffer.limit(this.tmpBuffer.capacity()).position(0);
                            throw th;
                        }
                    }
                } catch (IOException e) {
                    Logger.error(LogType.CONN_ERROR, "reading data from " + handlerContext.getChannel() + " failed", e);
                    setError(e);
                    unwatchInsideFDReadable();
                    return;
                }
            }
        }

        public void readableOne() {
            this.tmpBuffer.flip();
            int limit = this.tmpBuffer.limit() - this.tmpBuffer.position();
            if (limit == 0) {
                watchInsideFDReadable();
                return;
            }
            ByteArrayChannel fromEmpty = ByteArrayChannel.fromEmpty(limit);
            fromEmpty.write(this.tmpBuffer);
            try {
                ByteArray parse = ArqUDPSocketFD.this.handler.parse(fromEmpty);
                if (!$assertionsDisabled && !Logger.lowLevelDebug("checking writable for " + ArqUDPSocketFD.this + ", writableLen = " + ArqUDPSocketFD.this.handler.writableLen())) {
                    throw new AssertionError();
                }
                if (ArqUDPSocketFD.this.handler.writableLen() > 0) {
                    ArqUDPSocketFD.this.setSelfFDWritable();
                }
                if (parse == null) {
                    watchInsideFDReadable();
                    return;
                }
                ArqUDPSocketFD.this.readBufs.add(ByteBuffer.wrap(parse.toJavaArray()));
                ArqUDPSocketFD.this.setSelfFDReadable();
                watchInsideFDReadable();
            } catch (IOException e) {
                setError(e);
                Logger.error(LogType.CONN_ERROR, "parse kcp packet failed", e);
                unwatchInsideFDReadable();
            }
        }

        @Override // io.vproxy.base.selector.Handler
        public void writable(HandlerContext<SocketFD> handlerContext) {
            if (!$assertionsDisabled && !Logger.lowLevelDebug("writable for " + handlerContext.getChannel() + " in " + ArqUDPSocketFD.this)) {
                throw new AssertionError();
            }
            while (true) {
                ByteArrayChannel peek = ArqUDPSocketFD.this.writeBufs.peek();
                if (peek == null) {
                    unwatchInsideFDWritable();
                    return;
                }
                if (peek.used() == 0) {
                    ArqUDPSocketFD.this.writeBufs.poll();
                } else {
                    if (!$assertionsDisabled && !Logger.lowLevelDebug("arq udp socket is writing " + peek.used() + " bytes to " + handlerContext.getChannel())) {
                        throw new AssertionError();
                    }
                    if (!$assertionsDisabled && !Logger.lowLevelNetDebugPrintBytes(peek.getBytes(), peek.getReadOff(), peek.getWriteOff())) {
                        throw new AssertionError();
                    }
                    int used = peek.used();
                    try {
                        int write = handlerContext.getChannel().write(ByteBuffer.wrap(peek.readableArray().toJavaArray()));
                        if (write < used) {
                            if (!$assertionsDisabled && !Logger.lowLevelDebug("not all data wrote: " + (used - write))) {
                                throw new AssertionError();
                            }
                            if (write != 0) {
                                Logger.shouldNotHappen("writing half udp packet to the fd " + handlerContext.getChannel());
                            }
                            watchInsideFDWritable();
                            ArqUDPSocketFD.this.notFullySent = true;
                            return;
                        }
                        if (!$assertionsDisabled && !Logger.lowLevelDebug("the buffer is empty now, everything wrote")) {
                            throw new AssertionError();
                        }
                        ArqUDPSocketFD.this.notFullySent = false;
                        peek.skip(used);
                    } catch (IOException e) {
                        Logger.error(LogType.CONN_ERROR, "writing data to " + handlerContext.getChannel() + " failed", e);
                        setError(e);
                        return;
                    }
                }
            }
        }

        @Override // io.vproxy.base.selector.Handler
        public void removed(HandlerContext<SocketFD> handlerContext) {
            this.invalid = true;
        }

        static {
            $assertionsDisabled = !ArqUDPSocketFD.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ArqUDPSocketFD(SocketFD socketFD, SelectorEventLoop selectorEventLoop, Function<Consumer<ByteArrayChannel>, ArqUDPHandler> function) {
        this(false, socketFD, selectorEventLoop, function);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ArqUDPSocketFD(boolean z, SocketFD socketFD, SelectorEventLoop selectorEventLoop, Function<Consumer<ByteArrayChannel>, ArqUDPHandler> function) {
        this.readBufs = new LinkedList();
        this.writeBufs = new LinkedList();
        this.notFullySent = false;
        this.selfFDReadable = false;
        this.selfFDWritable = false;
        this.initiallyConnected = z;
        this.fd = socketFD;
        this.loop = selectorEventLoop;
        this.selector = selectorEventLoop.selector;
        this.fdHandler = new ArqUDPInsideFDHandler();
        this.handler = function.apply(byteArrayChannel -> {
            if (this.notFullySent) {
                if (!$assertionsDisabled && !Logger.lowLevelDebug("`notFullySent` flag is set, clear the writeBufs queue")) {
                    throw new AssertionError();
                }
                this.notFullySent = false;
                this.writeBufs.clear();
            }
            this.writeBufs.add(byteArrayChannel);
            if (!$assertionsDisabled && !Logger.lowLevelDebug("writeBufs currently have " + this.writeBufs.size() + " elements")) {
                throw new AssertionError();
            }
            this.fdHandler.watchInsideFDWritable();
        });
        setSelfFDWritable();
        this.readingMirrorDataFactory = new MirrorDataFactory("arq-udp", mirrorData -> {
            try {
                mirrorData.setSrc(getRemoteAddress());
            } catch (IOException e) {
                mirrorData.setSrcRef(socketFD);
            }
            try {
                mirrorData.setDst(getLocalAddress());
            } catch (IOException e2) {
                mirrorData.setDstRef(this);
            }
        });
        this.writingMirrorDataFactory = new MirrorDataFactory("arq-udp", mirrorData2 -> {
            try {
                mirrorData2.setSrc(getLocalAddress());
            } catch (IOException e) {
                mirrorData2.setSrcRef(this);
            }
            try {
                mirrorData2.setDst(getRemoteAddress());
            } catch (IOException e2) {
                mirrorData2.setDstRef(socketFD);
            }
        });
    }

    @Override // io.vproxy.vfd.SocketFD
    public void connect(IPPort iPPort) throws IOException {
        this.fd.connect(iPPort);
    }

    @Override // io.vproxy.vfd.SocketFD
    public boolean isConnected() {
        return this.initiallyConnected || this.fd.isConnected();
    }

    @Override // io.vproxy.vfd.SocketFD
    public void shutdownOutput() throws IOException {
        this.fd.shutdownOutput();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.vproxy.vfd.NetworkFD
    public IPPort getLocalAddress() throws IOException {
        return this.fd.getLocalAddress();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.vproxy.vfd.NetworkFD
    public IPPort getRemoteAddress() throws IOException {
        return this.fd.getRemoteAddress();
    }

    @Override // io.vproxy.vfd.SocketFD
    public boolean finishConnect() throws IOException {
        return this.fd.finishConnect();
    }

    private void checkException() throws IOException {
        if (this.fdHandler.getError() != null) {
            throw this.fdHandler.getError();
        }
    }

    private void mirrorRead(ByteBuffer byteBuffer, int i) {
        if (byteBuffer.position() == i) {
            return;
        }
        this.readingMirrorDataFactory.build().setMeta("r=" + this.readBufs.size() + ";w=" + this.writeBufs.size() + ";p=" + this.handler.getClass().getSimpleName()).setDataAfter(byteBuffer, i).setTransportLayerProtocol("UDP").mirror();
    }

    @Override // io.vproxy.vfd.ReadableByteStream
    public int read(ByteBuffer byteBuffer) throws IOException {
        if (this.readBufs.isEmpty()) {
            if (this.fdHandler.isInvalid()) {
                return -1;
            }
            checkException();
            return 0;
        }
        if (!$assertionsDisabled && !Utils.debug(() -> {
            if (!$assertionsDisabled && !Logger.lowLevelNetDebug("BEGIN: readBufs inside ArqUDPSocketFD:=================")) {
                throw new AssertionError();
            }
            for (ByteBuffer byteBuffer2 : this.readBufs) {
                if (!$assertionsDisabled && !Logger.lowLevelNetDebugPrintBytes(byteBuffer2.array(), byteBuffer2.position(), byteBuffer2.limit())) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && !Logger.lowLevelNetDebug("---")) {
                    throw new AssertionError();
                }
            }
            if (!$assertionsDisabled && !Logger.lowLevelNetDebug("END: readBufs inside ArqUDPSocketFD:=================")) {
                throw new AssertionError();
            }
        })) {
            throw new AssertionError();
        }
        int position = byteBuffer.position();
        int writeFromFIFOQueueToBuffer = Utils.writeFromFIFOQueueToBuffer(this.readBufs, byteBuffer);
        if (this.readingMirrorDataFactory.isEnabled()) {
            mirrorRead(byteBuffer, position);
        }
        if (!$assertionsDisabled && !Utils.debug(() -> {
            int position2 = byteBuffer.position();
            byteBuffer.position(position);
            int i = position2 - position;
            byte[] allocateByteArray = Utils.allocateByteArray(i);
            byteBuffer.get(allocateByteArray);
            if (!$assertionsDisabled && !Logger.lowLevelDebug("read " + i + " bytes from " + this)) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !Logger.lowLevelNetDebugPrintBytes(allocateByteArray)) {
                throw new AssertionError();
            }
        })) {
            throw new AssertionError();
        }
        checkException();
        if (this.readBufs.isEmpty() && !this.fdHandler.isInvalid()) {
            cancelSelfFDReadable();
        }
        return writeFromFIFOQueueToBuffer;
    }

    public int writableLen() {
        return this.handler.writableLen();
    }

    private void mirrorWrite(byte[] bArr) {
        this.writingMirrorDataFactory.build().setMeta("r=" + this.readBufs.size() + ";w=" + this.writeBufs.size() + ";p=" + this.handler.getClass().getSimpleName()).setData(bArr).setTransportLayerProtocol("UDP").mirror();
    }

    @Override // io.vproxy.vfd.WritableByteStream
    public int write(ByteBuffer byteBuffer) throws IOException {
        int limit = byteBuffer.limit() - byteBuffer.position();
        if (limit == 0) {
            return 0;
        }
        checkException();
        int writableLen = this.handler.writableLen();
        if (writableLen <= 0 || writableLen < limit) {
            cancelSelfFDWritable();
            if (writableLen <= 0) {
                return 0;
            }
        }
        int min = Math.min(writableLen, limit);
        byte[] allocateByteArray = Utils.allocateByteArray(min);
        byteBuffer.get(allocateByteArray);
        if (this.writingMirrorDataFactory.isEnabled()) {
            mirrorWrite(allocateByteArray);
        }
        if (!$assertionsDisabled && !Logger.lowLevelDebug("write " + min + " bytes to ArqUDPSocketFD")) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !Logger.lowLevelNetDebugPrintBytes(allocateByteArray)) {
            throw new AssertionError();
        }
        this.handler.write(ByteArray.from(allocateByteArray));
        return min;
    }

    @Override // io.vproxy.vfd.FD
    public void configureBlocking(boolean z) throws IOException {
        this.fd.configureBlocking(z);
    }

    @Override // io.vproxy.vfd.FD
    public <T> void setOption(SocketOption<T> socketOption, T t) throws IOException {
        this.fd.setOption(socketOption, t);
    }

    @Override // io.vproxy.vfd.FD
    public FD real() {
        return this.fd.real();
    }

    @Override // io.vproxy.vfd.FD
    public boolean contains(FD fd) {
        return this.fd == fd || this.fd.contains(fd);
    }

    @Override // io.vproxy.vfd.FD
    public boolean isOpen() {
        return this.fd.isOpen();
    }

    @Override // io.vproxy.vfd.FD, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.fd.close();
    }

    @Override // io.vproxy.base.selector.wrap.VirtualFD
    public void onRegister() {
        EventSet read = EventSet.read();
        if (!this.writeBufs.isEmpty()) {
            read = read.combine(EventSet.write());
        }
        if (!$assertionsDisabled && !Logger.lowLevelDebug(this + ".onRegister() with events " + read)) {
            throw new AssertionError();
        }
        try {
            this.loop.add(this.fd, read, null, this.fdHandler);
            this.periodicEvent = this.loop.period(this.handler.clockInterval(), () -> {
                try {
                    this.handler.clock(Config.currentTimestamp);
                } catch (IOException e) {
                    this.fdHandler.setError(e);
                }
            });
            if (this.selfFDReadable) {
                setSelfFDReadable();
            }
            if (this.selfFDWritable) {
                setSelfFDWritable();
            }
        } catch (IOException e) {
            Logger.shouldNotHappen("onRegister callback failed when adding fd " + this.fd + " to loop", e);
            throw new RuntimeException(e);
        }
    }

    @Override // io.vproxy.base.selector.wrap.VirtualFD
    public void onRemove() {
        if (!$assertionsDisabled && !Logger.lowLevelDebug(this + ".onRemove()")) {
            throw new AssertionError();
        }
        if (this.periodicEvent != null) {
            this.periodicEvent.cancel();
        }
        this.loop.remove(this.fd);
    }

    private void setSelfFDReadable() {
        this.selfFDReadable = true;
        this.selector.registerVirtualReadable(this);
    }

    private void setSelfFDWritable() {
        this.selfFDWritable = true;
        this.selector.registerVirtualWritable(this);
    }

    private void cancelSelfFDReadable() {
        this.selfFDReadable = false;
        this.selector.removeVirtualReadable(this);
    }

    private void cancelSelfFDWritable() {
        this.selfFDWritable = false;
        this.selector.removeVirtualWritable(this);
    }

    public String toString() {
        return getClass().getSimpleName() + "(" + this.fd + ")";
    }

    static {
        $assertionsDisabled = !ArqUDPSocketFD.class.desiredAssertionStatus();
    }
}
