package io.vproxy.base.selector.wrap.blocking;

import io.vproxy.base.selector.SelectorEventLoop;
import io.vproxy.base.selector.wrap.VirtualFD;
import io.vproxy.base.selector.wrap.WrappedSelector;
import io.vproxy.base.util.Lock;
import io.vproxy.base.util.LogType;
import io.vproxy.base.util.Logger;
import io.vproxy.base.util.Utils;
import io.vproxy.base.util.thread.VProxyThread;
import io.vproxy.vfd.AbstractDatagramFD;
import io.vproxy.vfd.FD;
import io.vproxy.vfd.SockAddr;
import java.io.IOException;
import java.net.SocketOption;
import java.nio.ByteBuffer;
import java.util.LinkedList;

/* loaded from: input_file:io/vproxy/base/selector/wrap/blocking/BlockingDatagramFD.class */
public class BlockingDatagramFD<ADDR extends SockAddr> implements AbstractDatagramFD<ADDR>, VirtualFD {
    private final AbstractDatagramFD<ADDR> fd;
    private final SelectorEventLoop loop;
    private final WrappedSelector selector;
    private final int readBufSize;
    private final int writeQByteLimit;
    private final int readBufPacketLimit;
    private final VProxyThread readThread;
    private final VProxyThread writeThread;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean isClosed = false;
    private final Lock readQLock = Lock.create();
    private final LinkedList<ByteBuffer> readQueue = new LinkedList<>();
    private IOException lastReadException = null;
    private volatile boolean isReading = false;
    private final Lock writeQLock = Lock.create();
    private final LinkedList<ByteBuffer> writeQueue = new LinkedList<>();
    private int currentWriteQueueBytes = 0;
    private IOException lastWriteException = null;
    private volatile boolean isWriting = false;

    public BlockingDatagramFD(AbstractDatagramFD<ADDR> abstractDatagramFD, SelectorEventLoop selectorEventLoop, int i, int i2, int i3) {
        if (!abstractDatagramFD.isOpen()) {
            throw new IllegalArgumentException("trying to handle a closed channel: " + abstractDatagramFD);
        }
        this.fd = abstractDatagramFD;
        this.loop = selectorEventLoop;
        this.selector = selectorEventLoop.selector;
        this.readBufSize = i;
        this.writeQByteLimit = i2;
        this.readBufPacketLimit = i3;
        this.readThread = VProxyThread.create(this::threadRead, "blocking-read-" + abstractDatagramFD.toString());
        this.readThread.start();
        this.writeThread = VProxyThread.create(this::threadWrite, "blocking-write-" + abstractDatagramFD.toString());
        this.writeThread.start();
    }

    private void setReadable() {
        this.loop.runOnLoop(() -> {
            this.selector.registerVirtualReadable(this);
        });
    }

    private void cancelReadable() {
        this.loop.runOnLoop(() -> {
            this.selector.removeVirtualReadable(this);
        });
    }

    private void setWritable() {
        this.selector.registerVirtualWritable(this);
    }

    @Override // io.vproxy.vfd.AbstractDatagramFD
    public void connect(ADDR addr) throws IOException {
        this.fd.connect(addr);
    }

    @Override // io.vproxy.vfd.AbstractDatagramFD
    public void bind(ADDR addr) throws IOException {
        this.fd.bind(addr);
    }

    @Override // io.vproxy.vfd.AbstractDatagramFD
    public int send(ByteBuffer byteBuffer, ADDR addr) throws UnsupportedOperationException {
        throw new UnsupportedOperationException("not supported for now");
    }

    @Override // io.vproxy.vfd.AbstractDatagramFD
    public ADDR receive(ByteBuffer byteBuffer) throws UnsupportedOperationException {
        throw new UnsupportedOperationException("not supported for now");
    }

    @Override // io.vproxy.vfd.NetworkFD
    public ADDR getLocalAddress() throws IOException {
        return this.fd.getLocalAddress();
    }

    @Override // io.vproxy.vfd.NetworkFD
    public ADDR getRemoteAddress() throws IOException {
        return this.fd.getRemoteAddress();
    }

    @Override // io.vproxy.vfd.ReadableByteStream
    public int read(ByteBuffer byteBuffer) throws IOException {
        if (this.lastReadException != null) {
            IOException iOException = this.lastReadException;
            this.lastReadException = null;
            resumeBlockingRead();
            throw iOException;
        }
        int limit = byteBuffer.limit() - byteBuffer.position();
        if (limit == 0) {
            return 0;
        }
        int position = byteBuffer.position();
        Lock.Locked lock = this.readQLock.lock();
        try {
            if (this.readQueue.isEmpty()) {
                cancelReadable();
                if (lock != null) {
                    lock.close();
                }
                return 0;
            }
            ByteBuffer first = this.readQueue.getFirst();
            if (first.limit() - first.position() <= limit) {
                this.readQueue.removeFirst();
                resumeBlockingRead();
                byteBuffer.put(first);
            }
            if (this.readQueue.isEmpty()) {
                cancelReadable();
            }
            if (lock != null) {
                lock.close();
            }
            return byteBuffer.position() - position;
        } catch (Throwable th) {
            if (lock != null) {
                try {
                    lock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.vproxy.vfd.WritableByteStream
    public int write(ByteBuffer byteBuffer) throws IOException {
        if (this.lastWriteException != null) {
            IOException iOException = this.lastWriteException;
            this.lastWriteException = null;
            resumeBlockingWrite();
            throw iOException;
        }
        int limit = byteBuffer.limit() - byteBuffer.position();
        if (limit == 0) {
            return 0;
        }
        Lock.Locked lock = this.writeQLock.lock();
        try {
            if (this.currentWriteQueueBytes + limit > this.writeQByteLimit) {
                Logger.warn(LogType.BUFFER_INSUFFICIENT, "cannot store bytes into the write queue: current: " + this.currentWriteQueueBytes + ", input: " + limit + ", limit: " + this.writeQByteLimit);
                if (lock != null) {
                    lock.close();
                }
                return 0;
            }
            ByteBuffer allocateByteBuffer = Utils.allocateByteBuffer(limit);
            allocateByteBuffer.put(byteBuffer);
            allocateByteBuffer.flip();
            this.writeQueue.add(allocateByteBuffer);
            this.currentWriteQueueBytes += limit;
            if (this.currentWriteQueueBytes > this.writeQByteLimit * 0.8d) {
                Logger.warn(LogType.BUFFER_INSUFFICIENT, "the write queue consumes more than 80% of the limit: current: " + this.currentWriteQueueBytes + ", limit: " + this.writeQByteLimit);
            }
            if (lock != null) {
                lock.close();
            }
            resumeBlockingWrite();
            return limit;
        } catch (Throwable th) {
            if (lock != null) {
                try {
                    lock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.vproxy.base.selector.wrap.VirtualFD
    public void onRegister() {
        resumeBlockingThreads();
        Lock.Locked lock = this.readQLock.lock();
        try {
            if (!this.readQueue.isEmpty()) {
                setReadable();
            }
            if (lock != null) {
                lock.close();
            }
            setWritable();
        } catch (Throwable th) {
            if (lock != null) {
                try {
                    lock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.vproxy.base.selector.wrap.VirtualFD
    public void onRemove() {
        pauseBlockingThreads();
    }

    @Override // io.vproxy.vfd.FD
    public void configureBlocking(boolean z) {
    }

    private void resumeBlockingThreads() {
        resumeBlockingRead();
        resumeBlockingWrite();
    }

    private void pauseBlockingThreads() {
        pauseBlockingRead();
        pauseBlockingWrite();
    }

    private void resumeBlockingRead() {
        if (this.isReading) {
            return;
        }
        this.isReading = true;
        this.readThread.interrupt();
    }

    private void pauseBlockingRead() {
        this.isReading = false;
    }

    private void resumeBlockingWrite() {
        if (this.isWriting) {
            return;
        }
        this.isWriting = true;
        this.writeThread.interrupt();
    }

    private void pauseBlockingWrite() {
        this.isWriting = false;
    }

    @Override // io.vproxy.vfd.FD
    public <T> void setOption(SocketOption<T> socketOption, T t) throws IOException {
        this.fd.setOption(socketOption, t);
    }

    @Override // io.vproxy.vfd.FD
    public FD real() {
        return this.fd;
    }

    @Override // io.vproxy.vfd.FD
    public boolean contains(FD fd) {
        return this.fd == fd || this.fd.contains(fd);
    }

    @Override // io.vproxy.vfd.FD
    public boolean isOpen() {
        return this.fd.isOpen();
    }

    @Override // io.vproxy.vfd.FD, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.isClosed) {
            this.fd.close();
            return;
        }
        this.isClosed = true;
        this.fd.close();
        this.readThread.interrupt();
        this.writeThread.interrupt();
    }

    private void threadRead() {
        while (!this.isClosed) {
            if (this.isReading) {
                doRead();
            } else {
                try {
                    Thread.sleep(86400000L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    private void doRead() {
        Lock.Locked lock = this.readQLock.lock();
        try {
            if (this.readQueue.size() >= this.readBufPacketLimit) {
                if (!$assertionsDisabled && !Logger.lowLevelDebug("cannot store packets into the readQueue: size = " + this.readQueue.size() + ", limit = " + this.readBufPacketLimit)) {
                    throw new AssertionError();
                }
                setReadable();
                this.isReading = false;
                if (lock != null) {
                    lock.close();
                    return;
                }
                return;
            }
            if (lock != null) {
                lock.close();
            }
            ByteBuffer allocateByteBuffer = Utils.allocateByteBuffer(this.readBufSize);
            try {
                this.fd.read(allocateByteBuffer);
                allocateByteBuffer.flip();
                lock = this.readQLock.lock();
                try {
                    this.readQueue.addLast(allocateByteBuffer);
                    setReadable();
                    if (lock != null) {
                        lock.close();
                    }
                } finally {
                }
            } catch (IOException e) {
                this.lastReadException = e;
                this.isReading = false;
            }
        } finally {
        }
    }

    private void threadWrite() {
        while (!this.isClosed) {
            if (this.isWriting) {
                doWrite();
            } else {
                try {
                    Thread.sleep(86400000L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    private void doWrite() {
        Lock.Locked lock = this.writeQLock.lock();
        try {
            if (this.writeQueue.isEmpty()) {
                this.isWriting = false;
                if (lock != null) {
                    lock.close();
                    return;
                }
                return;
            }
            ByteBuffer first = this.writeQueue.getFirst();
            if (lock != null) {
                lock.close();
            }
            int limit = first.limit() - first.position();
            try {
                int write = this.fd.write(first);
                if (write <= 0) {
                    Logger.shouldNotHappen("we expect a blocking call to write, but return value is " + write + " on fd " + this.fd);
                    return;
                }
                if (write != limit) {
                    Logger.shouldNotHappen("the datagram fd is writing partial packet: " + this.fd + ", n = " + write + ", bufLen = " + limit);
                }
                lock = this.writeQLock.lock();
                try {
                    this.writeQueue.removeFirst();
                    this.currentWriteQueueBytes -= write;
                    if (lock != null) {
                        lock.close();
                    }
                } finally {
                }
            } catch (IOException e) {
                this.lastWriteException = e;
                this.isWriting = false;
            }
        } finally {
        }
    }

    public String toString() {
        return "BlockingDatagramFD{fd=" + this.fd + "}";
    }

    static {
        $assertionsDisabled = !BlockingDatagramFD.class.desiredAssertionStatus();
    }
}
