package org.webpieces.nio.impl.cm.basic;

import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.webpieces.data.api.BufferPool;
import org.webpieces.nio.api.BackpressureConfig;
import org.webpieces.nio.api.channels.Channel;
import org.webpieces.nio.api.channels.ChannelSession;
import org.webpieces.nio.api.handlers.DataListener;
import org.webpieces.nio.api.handlers.RecordingDataListener;
import org.webpieces.nio.impl.util.ChannelSessionImpl;
import org.webpieces.util.exceptions.NioClosedChannelException;
import org.webpieces.util.exceptions.NioException;

/* loaded from: input_file:org/webpieces/nio/impl/cm/basic/BasChannelImpl.class */
public abstract class BasChannelImpl extends RegisterableChannelImpl implements Channel {
    private static final Logger apiLog = LoggerFactory.getLogger(Channel.class);
    private static final Logger log = LoggerFactory.getLogger(BasChannelImpl.class);
    private ChannelSession session;
    private BufferPool pool;
    private KeyProcessor router;
    private DataListener dataListener;
    private Object writeLock;
    private long waitingBytesCounter;
    private ConcurrentLinkedQueue<WriteInfo> dataToBeWritten;
    private int maxBytesWaitingSize;
    private boolean inDelayedWriteMode;
    private boolean isRecording;
    protected SocketAddress isConnectingTo;
    protected ChannelState channelState;
    private boolean isRemoteEndInitiateClose;
    private Integer maxUnackedBytes;
    private Integer readingThreshold;
    private AtomicInteger unackedBytes;
    private AtomicReference<BackflowState1> backflowState;

    public BasChannelImpl(String str, SelectorManager2 selectorManager2, KeyProcessor keyProcessor, BufferPool bufferPool, BackpressureConfig backpressureConfig) {
        super(str, selectorManager2);
        this.session = new ChannelSessionImpl();
        this.writeLock = new Object();
        this.waitingBytesCounter = 0L;
        this.dataToBeWritten = new ConcurrentLinkedQueue<>();
        this.maxBytesWaitingSize = 500000;
        this.unackedBytes = new AtomicInteger(0);
        this.backflowState = new AtomicReference<>(BackflowState1.REGISTERED);
        this.pool = bufferPool;
        this.isRecording = false;
        this.router = keyProcessor;
        this.maxUnackedBytes = backpressureConfig.getMaxBytes();
        this.readingThreshold = backpressureConfig.getStartReadingThreshold();
    }

    @Override // org.webpieces.nio.api.channels.RegisterableChannel
    public abstract boolean isBlocking();

    public abstract int readImpl(ByteBuffer byteBuffer);

    protected abstract int writeImpl(ByteBuffer byteBuffer);

    @Override // org.webpieces.nio.api.channels.Channel
    public CompletableFuture<Void> connect(SocketAddress socketAddress, DataListener dataListener) {
        this.dataListener = dataListener;
        if (this.isRecording) {
            this.dataListener = new RecordingDataListener("singleThreaded-", dataListener);
        }
        return connectImpl(socketAddress).thenCompose(channel -> {
            this.channelState = ChannelState.CONNECTED;
            return registerForReads(this.dataListener);
        });
    }

    protected abstract CompletableFuture<Channel> connectImpl(SocketAddress socketAddress);

    private void unqueueAndFailWritesThenClose(CloseRunnable closeRunnable) {
        List<CompletableFuture<Void>> failAllWritesInQueue;
        synchronized (this) {
            failAllWritesInQueue = failAllWritesInQueue();
        }
        closeRunnable.runDelayedAction();
        for (CompletableFuture<Void> completableFuture : failAllWritesInQueue) {
            log.info("WRITES outstanding while close was called, notifying client through his failure method of the exception");
            completableFuture.completeExceptionally(new NioClosedChannelException(this + "There are " + failAllWritesInQueue.size() + " writes that are not complete yet(you called write but they did not call success back to the client)."));
        }
    }

    @Override // org.webpieces.nio.api.channels.Channel
    public CompletableFuture<Void> write(ByteBuffer byteBuffer) {
        if (byteBuffer.remaining() == 0) {
            throw new IllegalArgumentException(this + "buffer has no data");
        }
        if (!this.selMgr.isRunning()) {
            throw new IllegalStateException(this + "ChannelManager must be running and is stopped");
        }
        if (this.channelState == ChannelState.CLOSED) {
            if (this.isRemoteEndInitiateClose) {
                throw new NioClosedChannelException(this + " Client cannot write after the remote end closed the socket");
            }
            throw new NioClosedChannelException(this + "Your Application cannot write after YOUR Application closed the socket");
        }
        if (this.channelState != ChannelState.CONNECTED) {
            throw new NioException(this + "The Channel is not connected yet");
        }
        if (apiLog.isTraceEnabled()) {
            apiLog.trace(this + "Basic.write");
        }
        return writeSynchronized(byteBuffer).thenApply(r5 -> {
            this.pool.releaseBuffer(byteBuffer);
            return null;
        });
    }

    private CompletableFuture<Void> writeSynchronized(ByteBuffer byteBuffer) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        synchronized (this.writeLock) {
            if (!this.inDelayedWriteMode) {
                int remaining = byteBuffer.remaining();
                int writeImpl = writeImpl(byteBuffer);
                if (writeImpl == remaining) {
                    if (log.isTraceEnabled()) {
                        log.trace(this + " wrote bytes on client thread");
                    }
                    return CompletableFuture.completedFuture(null);
                }
                if (byteBuffer.remaining() + writeImpl != remaining) {
                    throw new IllegalStateException(this + "Something went wrong.  b.remaining()=" + byteBuffer.remaining() + " written=" + writeImpl + " total=" + remaining);
                }
                registerForWrites();
                this.inDelayedWriteMode = true;
            }
            if (log.isTraceEnabled()) {
                log.trace(this + "sent write to queue");
            }
            this.dataToBeWritten.add(new WriteInfo(byteBuffer, completableFuture));
            this.waitingBytesCounter += byteBuffer.remaining();
            if (this.waitingBytesCounter > this.maxBytesWaitingSize) {
            }
            return completableFuture;
        }
    }

    private synchronized List<CompletableFuture<Void>> failAllWritesInQueue() {
        ArrayList arrayList = new ArrayList();
        while (!this.dataToBeWritten.isEmpty()) {
            WriteInfo remove = this.dataToBeWritten.remove();
            ByteBuffer buffer = remove.getBuffer();
            buffer.position(buffer.limit());
            this.pool.releaseBuffer(buffer);
            arrayList.add(remove.getPromise());
        }
        this.waitingBytesCounter = 0L;
        return arrayList;
    }

    private void registerForWrites() {
        if (log.isTraceEnabled()) {
            log.trace(this + "registering channel for write msg. size=" + this.dataToBeWritten.size());
        }
        this.selMgr.registerSelectableChannel(this, 4, null, () -> {
            return true;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeAll() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.writeLock) {
            if (this.dataToBeWritten.isEmpty()) {
                throw new IllegalStateException(this + "bug, I am not sure this is possible..it shouldn't be...look into");
            }
            while (true) {
                if (this.dataToBeWritten.isEmpty()) {
                    break;
                }
                WriteInfo peek = this.dataToBeWritten.peek();
                ByteBuffer buffer = peek.getBuffer();
                int remaining = buffer.remaining();
                int writeImpl = writeImpl(buffer);
                if (!buffer.hasRemaining()) {
                    this.dataToBeWritten.poll();
                    this.waitingBytesCounter -= remaining;
                    arrayList.add(peek.getPromise());
                } else {
                    if (buffer.remaining() + writeImpl != remaining) {
                        throw new IllegalStateException(this + "Something went wrong.  b.remaining()=" + buffer.remaining() + " written=" + writeImpl + " total=" + remaining);
                    }
                    if (log.isTraceEnabled()) {
                        log.trace(this + "Did not write all data out");
                    }
                    this.waitingBytesCounter -= remaining - buffer.remaining();
                }
            }
            if (this.dataToBeWritten.isEmpty() && this.inDelayedWriteMode) {
                this.inDelayedWriteMode = false;
                if (log.isTraceEnabled()) {
                    log.trace(this + "unregister writes");
                }
                this.router.unregisterSelectableChannel(this, 4);
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((CompletableFuture) it.next()).complete(null);
        }
    }

    @Override // org.webpieces.nio.api.channels.RegisterableChannel
    public CompletableFuture<Void> bind(SocketAddress socketAddress) {
        if (!(socketAddress instanceof InetSocketAddress)) {
            throw new IllegalArgumentException(this + "Can only bind to InetSocketAddress addressses");
        }
        if (apiLog.isTraceEnabled()) {
            apiLog.trace(this + "Basic.bind called addr=" + socketAddress);
        }
        try {
            bindImpl(socketAddress);
            return CompletableFuture.completedFuture(null);
        } catch (IOException e) {
            throw new NioException(e);
        }
    }

    private void bindImpl(SocketAddress socketAddress) throws IOException {
        try {
            bindImpl2(socketAddress);
        } catch (Error e) {
            if (!(e.getCause() instanceof SocketException)) {
                throw e;
            }
            BindException bindException = new BindException(e.getMessage());
            bindException.initCause(e.getCause());
            throw bindException;
        }
    }

    protected abstract void bindImpl2(SocketAddress socketAddress) throws IOException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> registerForReads(DataListener dataListener) {
        this.dataListener = dataListener;
        return registerForReads(() -> {
            return true;
        });
    }

    public CompletableFuture<Void> registerForReads(Supplier<Boolean> supplier) {
        if (this.dataListener == null) {
            throw new IllegalArgumentException(this + "listener cannot be null");
        }
        if (this.channelState != ChannelState.CONNECTED) {
            throw new IllegalStateException(this + "Must call one of the connect methods first(ie. connect THEN register for reads)");
        }
        if (isClosed()) {
            throw new IllegalStateException(this + "Channel is closed");
        }
        if (apiLog.isTraceEnabled()) {
            apiLog.trace(this + "Basic.registerForReads called");
        }
        try {
            return this.selMgr.registerChannelForRead(this, this.dataListener, supplier);
        } catch (IOException e) {
            throw new NioException(e);
        } catch (InterruptedException e2) {
            throw new NioException(e2);
        }
    }

    public CompletableFuture<Channel> unregisterForReads() {
        if (apiLog.isTraceEnabled()) {
            apiLog.trace(this + "Basic.unregisterForReads called");
        }
        try {
            return this.selMgr.unregisterChannelForRead(this).thenApply(r3 -> {
                return this;
            });
        } catch (IOException e) {
            throw new NioException(e);
        } catch (InterruptedException e2) {
            throw new NioException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setConnectingTo(SocketAddress socketAddress) {
        this.isConnectingTo = socketAddress;
    }

    protected void setClosed(boolean z) {
        this.isRemoteEndInitiateClose = z;
        this.channelState = ChannelState.CLOSED;
    }

    @Override // org.webpieces.nio.api.channels.Channel
    public CompletableFuture<Void> close() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            if (apiLog.isTraceEnabled()) {
                apiLog.trace(this + "Basic.close called");
            }
        } catch (Exception e) {
            log.error(this + "Exception closing channel", e);
            completableFuture.completeExceptionally(e);
        }
        if (!isOpen()) {
            completableFuture.complete(null);
            return completableFuture;
        }
        setClosed(false);
        unqueueAndFailWritesThenClose(new CloseRunnable(this, completableFuture));
        return completableFuture;
    }

    protected abstract boolean isOpen();

    public void serverClosed() throws IOException {
        setClosed(true);
        closeImpl();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void closeImpl() throws IOException;

    @Override // org.webpieces.nio.api.channels.Channel
    public ChannelSession getSession() {
        return this.session;
    }

    public boolean isOverMaxUnacked(int i) {
        return i >= this.maxUnackedBytes.intValue();
    }

    public boolean isUnderThreshold(int i) {
        return i <= this.readingThreshold.intValue();
    }

    public Integer getMaxUnacked() {
        return this.maxUnackedBytes;
    }

    public int getReadThreshold() {
        return this.readingThreshold.intValue();
    }

    public AtomicInteger getUnackedBytes() {
        return this.unackedBytes;
    }

    public AtomicReference<BackflowState1> getCompareSetBackflowState() {
        return this.backflowState;
    }

    @Override // org.webpieces.nio.impl.cm.basic.RegisterableChannelImpl
    public /* bridge */ /* synthetic */ SelectorManager2 getSelectorManager() {
        return super.getSelectorManager();
    }

    @Override // org.webpieces.nio.impl.cm.basic.RegisterableChannelImpl
    public /* bridge */ /* synthetic */ void wakeupSelector() throws IOException {
        super.wakeupSelector();
    }

    @Override // org.webpieces.nio.impl.cm.basic.RegisterableChannelImpl
    public /* bridge */ /* synthetic */ void setKey(SelectionKey selectionKey) {
        super.setKey(selectionKey);
    }

    @Override // org.webpieces.nio.impl.cm.basic.RegisterableChannelImpl
    public /* bridge */ /* synthetic */ String toString() {
        return super.toString();
    }

    @Override // org.webpieces.nio.impl.cm.basic.RegisterableChannelImpl, org.webpieces.nio.api.channels.RegisterableChannel
    public /* bridge */ /* synthetic */ String getChannelId() {
        return super.getChannelId();
    }
}
