/*
 * Decompiled with CFR 0.152.
 */
package org.webpieces.nio.impl.cm.basic;

import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.webpieces.data.api.BufferPool;
import org.webpieces.nio.api.BackpressureConfig;
import org.webpieces.nio.api.channels.Channel;
import org.webpieces.nio.api.channels.ChannelSession;
import org.webpieces.nio.api.exceptions.NioClosedChannelException;
import org.webpieces.nio.api.exceptions.NioException;
import org.webpieces.nio.api.handlers.DataListener;
import org.webpieces.nio.api.handlers.RecordingDataListener;
import org.webpieces.nio.impl.cm.basic.ChannelState;
import org.webpieces.nio.impl.cm.basic.CloseRunnable;
import org.webpieces.nio.impl.cm.basic.IdObject;
import org.webpieces.nio.impl.cm.basic.KeyProcessor;
import org.webpieces.nio.impl.cm.basic.RegisterableChannelImpl;
import org.webpieces.nio.impl.cm.basic.SelectorManager2;
import org.webpieces.nio.impl.cm.basic.WriteInfo;
import org.webpieces.nio.impl.util.ChannelSessionImpl;
import org.webpieces.util.logging.Logger;
import org.webpieces.util.logging.LoggerFactory;

public abstract class BasChannelImpl
extends RegisterableChannelImpl
implements Channel {
    private static final Logger apiLog = LoggerFactory.getLogger(Channel.class);
    private static final Logger log = LoggerFactory.getLogger(BasChannelImpl.class);
    private ChannelSession session = new ChannelSessionImpl();
    private BufferPool pool;
    private KeyProcessor router;
    private DataListener dataListener;
    private Object writeLock = new Object();
    private long waitingBytesCounter = 0L;
    private ConcurrentLinkedQueue<WriteInfo> dataToBeWritten = new ConcurrentLinkedQueue();
    private int maxBytesWaitingSize = 500000;
    private boolean inDelayedWriteMode;
    private boolean isRecording;
    protected SocketAddress isConnectingTo;
    protected ChannelState channelState;
    private boolean isRemoteEndInitiateClose;
    private int unackedBytes;
    private int maxUnackedBytes;
    private Integer readingThreshold;

    public BasChannelImpl(IdObject id, SelectorManager2 selMgr, KeyProcessor router, BufferPool pool, BackpressureConfig config) {
        super(id, selMgr);
        this.pool = pool;
        this.isRecording = false;
        this.router = router;
        this.maxUnackedBytes = config.getMaxBytes();
        this.readingThreshold = config.getStartReadingThreshold();
    }

    @Override
    public abstract boolean isBlocking();

    public abstract int readImpl(ByteBuffer var1);

    protected abstract int writeImpl(ByteBuffer var1);

    @Override
    public CompletableFuture<Void> connect(SocketAddress addr, DataListener listener) {
        this.dataListener = listener;
        if (this.isRecording) {
            this.dataListener = new RecordingDataListener("singleThreaded-", listener);
        }
        CompletableFuture<Channel> future = this.connectImpl(addr);
        return future.thenCompose(v -> {
            this.channelState = ChannelState.CONNECTED;
            return this.registerForReads(this.dataListener);
        });
    }

    protected abstract CompletableFuture<Channel> connectImpl(SocketAddress var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unqueueAndFailWritesThenClose(CloseRunnable action) {
        List<CompletableFuture<Void>> promises;
        BasChannelImpl basChannelImpl = this;
        synchronized (basChannelImpl) {
            promises = this.failAllWritesInQueue();
        }
        action.runDelayedAction();
        for (CompletableFuture completableFuture : promises) {
            log.info("WRITES outstanding while close was called, notifying client through his failure method of the exception");
            NioClosedChannelException closeExc = new NioClosedChannelException("There are " + promises.size() + " writes that are not complete yet(you called write but they did not call success back to the client).");
            completableFuture.completeExceptionally(closeExc);
        }
    }

    @Override
    public CompletableFuture<Void> write(ByteBuffer b) {
        if (b.remaining() == 0) {
            throw new IllegalArgumentException("buffer has no data");
        }
        if (!this.selMgr.isRunning()) {
            throw new IllegalStateException(this + "ChannelManager must be running and is stopped");
        }
        if (this.channelState == ChannelState.CLOSED) {
            if (this.isRemoteEndInitiateClose) {
                throw new NioClosedChannelException(this + "Client cannot write after the remote end closed the socket");
            }
            throw new NioClosedChannelException(this + "Your Application cannot write after YOUR Application closed the socket");
        }
        if (this.channelState != ChannelState.CONNECTED) {
            throw new NioException("The Channel is not connected yet");
        }
        apiLog.trace(() -> this + "Basic.write");
        return this.writeSynchronized(b).thenApply(v -> {
            this.pool.releaseBuffer(b);
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> writeSynchronized(ByteBuffer b) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Object object = this.writeLock;
        synchronized (object) {
            if (!this.inDelayedWriteMode) {
                int totalToWriteOut = b.remaining();
                int written = this.writeImpl(b);
                if (written != totalToWriteOut) {
                    if (b.remaining() + written != totalToWriteOut) {
                        throw new IllegalStateException("Something went wrong.  b.remaining()=" + b.remaining() + " written=" + written + " total=" + totalToWriteOut);
                    }
                    this.registerForWrites();
                    this.inDelayedWriteMode = true;
                } else {
                    log.trace(() -> this + " wrote bytes on client thread");
                    return CompletableFuture.completedFuture(null);
                }
            }
            log.trace(() -> this + "sent write to queue");
            WriteInfo holder = new WriteInfo(b, future);
            this.dataToBeWritten.add(holder);
            this.waitingBytesCounter += (long)b.remaining();
            if (this.waitingBytesCounter > (long)this.maxBytesWaitingSize) {
                // empty if block
            }
        }
        return future;
    }

    private synchronized List<CompletableFuture<Void>> failAllWritesInQueue() {
        ArrayList<CompletableFuture<Void>> copy = new ArrayList<CompletableFuture<Void>>();
        while (!this.dataToBeWritten.isEmpty()) {
            WriteInfo runnable = (WriteInfo)this.dataToBeWritten.remove();
            ByteBuffer buffer = runnable.getBuffer();
            buffer.position(buffer.limit());
            this.pool.releaseBuffer(buffer);
            copy.add(runnable.getPromise());
        }
        this.waitingBytesCounter = 0L;
        return copy;
    }

    private void registerForWrites() {
        log.trace(() -> this + "registering channel for write msg. size=" + this.dataToBeWritten.size());
        this.selMgr.registerSelectableChannel(this, 4, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void writeAll() {
        ArrayList<CompletableFuture<Void>> finishedPromises = new ArrayList<CompletableFuture<Void>>();
        Iterator iterator = this.writeLock;
        synchronized (iterator) {
            if (this.dataToBeWritten.isEmpty()) {
                throw new IllegalStateException("bug, I am not sure this is possible..it shouldn't be...look into");
            }
            while (!this.dataToBeWritten.isEmpty()) {
                WriteInfo writeInfo = this.dataToBeWritten.peek();
                ByteBuffer buffer = writeInfo.getBuffer();
                int initialSize = buffer.remaining();
                int wroteOut = this.writeImpl(buffer);
                if (buffer.hasRemaining()) {
                    if (buffer.remaining() + wroteOut != initialSize) {
                        throw new IllegalStateException("Something went wrong.  b.remaining()=" + buffer.remaining() + " written=" + wroteOut + " total=" + initialSize);
                    }
                    log.trace(() -> this + "Did not write all data out");
                    int leftOverSize = buffer.remaining();
                    int writtenOut = initialSize - leftOverSize;
                    this.waitingBytesCounter -= (long)writtenOut;
                    break;
                }
                this.dataToBeWritten.poll();
                this.waitingBytesCounter -= (long)initialSize;
                finishedPromises.add(writeInfo.getPromise());
            }
            if (this.dataToBeWritten.isEmpty() && this.inDelayedWriteMode) {
                this.inDelayedWriteMode = false;
                log.trace(() -> this + "unregister writes");
                this.router.unregisterSelectableChannel(this, 4);
            }
        }
        for (CompletableFuture completableFuture : finishedPromises) {
            completableFuture.complete(null);
        }
    }

    @Override
    public CompletableFuture<Void> bind(SocketAddress addr) {
        if (!(addr instanceof InetSocketAddress)) {
            throw new IllegalArgumentException(this + "Can only bind to InetSocketAddress addressses");
        }
        apiLog.trace(() -> this + "Basic.bind called addr=" + addr);
        try {
            this.bindImpl(addr);
            return CompletableFuture.completedFuture(null);
        }
        catch (IOException e) {
            throw new NioException(e);
        }
    }

    private void bindImpl(SocketAddress addr) throws IOException {
        try {
            this.bindImpl2(addr);
        }
        catch (Error e) {
            if (e.getCause() instanceof SocketException) {
                BindException exc = new BindException(e.getMessage());
                exc.initCause(e.getCause());
                throw exc;
            }
            throw e;
        }
    }

    protected abstract void bindImpl2(SocketAddress var1) throws IOException;

    CompletableFuture<Void> registerForReads(DataListener l) {
        this.dataListener = l;
        return this.registerForReads();
    }

    public CompletableFuture<Void> registerForReads() {
        if (this.dataListener == null) {
            throw new IllegalArgumentException(this + "listener cannot be null");
        }
        if (this.channelState != ChannelState.CONNECTED) {
            throw new IllegalStateException(this + "Must call one of the connect methods first(ie. connect THEN register for reads)");
        }
        if (this.isClosed()) {
            throw new IllegalStateException("Channel is closed");
        }
        apiLog.trace(() -> this + "Basic.registerForReads called");
        try {
            return this.selMgr.registerChannelForRead(this, this.dataListener);
        }
        catch (IOException e) {
            throw new NioException(e);
        }
        catch (InterruptedException e) {
            throw new NioException(e);
        }
    }

    public CompletableFuture<Channel> unregisterForReads() {
        apiLog.trace(() -> this + "Basic.unregisterForReads called");
        try {
            return this.selMgr.unregisterChannelForRead(this).thenApply(v -> this);
        }
        catch (IOException e) {
            throw new NioException(e);
        }
        catch (InterruptedException e) {
            throw new NioException(e);
        }
    }

    protected void setConnectingTo(SocketAddress addr) {
        this.isConnectingTo = addr;
    }

    protected void setClosed(boolean b, boolean isServerClosing) {
        this.isRemoteEndInitiateClose = isServerClosing;
        this.channelState = ChannelState.CLOSED;
    }

    @Override
    public CompletableFuture<Void> close() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            apiLog.trace(() -> this + "Basic.close called");
            if (!this.isOpen()) {
                future.complete(null);
                return future;
            }
            this.setClosed(true, false);
            CloseRunnable runnable = new CloseRunnable(this, future);
            this.unqueueAndFailWritesThenClose(runnable);
        }
        catch (Exception e) {
            log.error(this + "Exception closing channel", (Throwable)e);
            future.completeExceptionally(e);
        }
        return future;
    }

    protected abstract boolean isOpen();

    public void serverClosed() throws IOException {
        this.setClosed(true, true);
        this.closeImpl();
    }

    protected abstract void closeImpl() throws IOException;

    @Override
    public ChannelSession getSession() {
        return this.session;
    }

    public int addUnackedByteCount(int bytes) {
        this.unackedBytes += bytes;
        return this.unackedBytes;
    }

    public boolean isOverMaxUnacked() {
        return this.unackedBytes >= this.maxUnackedBytes;
    }

    public boolean isUnderThreshold() {
        return this.unackedBytes <= this.readingThreshold;
    }

    public int getMaxUnacked() {
        return this.maxUnackedBytes;
    }

    public int getReadThreshold() {
        return this.readingThreshold;
    }
}

