package org.reaktivity.nukleus.tcp.internal.stream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SocketChannel;
import java.util.function.ToIntFunction;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.tcp.internal.TcpRouteCounters;
import org.reaktivity.nukleus.tcp.internal.poller.Poller;
import org.reaktivity.nukleus.tcp.internal.poller.PollerKey;
import org.reaktivity.nukleus.tcp.internal.types.OctetsFW;

/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/stream/WriteStream.class */
public final class WriteStream {
    public static final int WRITE_SPIN_COUNT = 16;
    private static final int EOS_REQUESTED = -1;
    private final long streamId;
    private final MessageConsumer sourceThrottle;
    private final SocketChannel channel;
    private final Poller poller;
    private final BufferPool bufferPool;
    private final MessageWriter writer;
    private final TcpRouteCounters counters;
    private int slotOffset;
    private int slotPosition;
    private PollerKey key;
    private int readableBytes;
    private ByteBuffer writeBuffer;
    private MessageConsumer correlatedInput;
    private long correlatedStreamId;
    private int windowThreshold;
    private int pendingCredit;
    static final /* synthetic */ boolean $assertionsDisabled;
    private int slot = EOS_REQUESTED;
    private final ToIntFunction<PollerKey> writeHandler = this::handleWrite;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WriteStream(MessageConsumer messageConsumer, long j, SocketChannel socketChannel, Poller poller, BufferPool bufferPool, ByteBuffer byteBuffer, MessageWriter messageWriter, TcpRouteCounters tcpRouteCounters, int i) {
        this.streamId = j;
        this.sourceThrottle = messageConsumer;
        this.channel = socketChannel;
        this.poller = poller;
        this.bufferPool = bufferPool;
        this.writeBuffer = byteBuffer;
        this.writer = messageWriter;
        this.counters = tcpRouteCounters;
        this.windowThreshold = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleStream(int i, DirectBuffer directBuffer, int i2, int i3) {
        switch (i) {
            case 1:
                processBegin(directBuffer, i2, i2 + i3);
                return;
            case 2:
                try {
                    processData(directBuffer, i2, i2 + i3);
                    return;
                } catch (IOException e) {
                    handleIOExceptionFromWrite();
                    return;
                }
            case 3:
                processEnd(directBuffer, i2, i2 + i3);
                return;
            case 4:
                processAbort(directBuffer, i2, i2 + i3);
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doConnected() {
        this.key = this.poller.doRegister(this.channel, 4, this.writeHandler);
        offerWindow(this.bufferPool.slotCapacity());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doConnectFailed() {
        if (this.channel.isOpen()) {
            CloseHelper.quietClose(this.channel);
            this.counters.connectionsClosed.getAsLong();
        }
        this.counters.connectFailed.getAsLong();
        this.writer.doReset(this.sourceThrottle, this.streamId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setCorrelatedInput(long j, MessageConsumer messageConsumer) {
        this.correlatedInput = messageConsumer;
        this.correlatedStreamId = j;
    }

    private void handleIOExceptionFromWrite() {
        if (this.correlatedInput != null) {
            this.writer.doTcpAbort(this.correlatedInput, this.correlatedStreamId);
        }
        doFail();
    }

    private void processAbort(DirectBuffer directBuffer, int i, int i2) {
        if (this.slot != EOS_REQUESTED) {
            this.bufferPool.release(this.slot);
        }
        doCleanup();
    }

    private void processBegin(DirectBuffer directBuffer, int i, int i2) {
    }

    private void processData(DirectBuffer directBuffer, int i, int i2) throws IOException {
        this.writer.dataRO.wrap(directBuffer, i, i2);
        if (!$assertionsDisabled && this.writer.dataRO.padding() != 0) {
            throw new AssertionError();
        }
        OctetsFW payload = this.writer.dataRO.payload();
        int length = this.writer.dataRO.length();
        this.counters.framesWritten.getAsLong();
        this.counters.bytesWritten.accept(length);
        if (!reduceWindow(length)) {
            if (this.slot == EOS_REQUESTED) {
                doFail();
                return;
            } else {
                this.writer.doReset(this.sourceThrottle, this.streamId);
                return;
            }
        }
        ByteBuffer writeBuffer = getWriteBuffer(directBuffer, payload.offset(), length);
        int remaining = writeBuffer.remaining();
        int i3 = 0;
        for (int i4 = 16; i3 == 0 && i4 > 0; i4 += EOS_REQUESTED) {
            i3 = this.channel.write(writeBuffer);
        }
        int i5 = this.slot;
        if (handleUnwrittenData(writeBuffer, i3)) {
            if (i3 < remaining) {
                this.key.register(4);
            } else if (i5 != EOS_REQUESTED) {
                this.key.clear(4);
            }
        }
    }

    private void processEnd(DirectBuffer directBuffer, int i, int i2) {
        if (this.slot != EOS_REQUESTED) {
            this.readableBytes = EOS_REQUESTED;
        } else {
            this.writer.endRO.wrap(directBuffer, i, i2);
            doCleanup();
        }
    }

    private void doFail() {
        this.writer.doReset(this.sourceThrottle, this.streamId);
        if (this.slot != EOS_REQUESTED) {
            this.bufferPool.release(this.slot);
        }
        doCleanup();
    }

    private void doCleanup() {
        if (this.key != null && this.key.isValid()) {
            this.key.clear(4);
        }
        if (this.channel.isConnected() && this.channel.isOpen()) {
            try {
                this.channel.shutdownOutput();
                closeIfInputShutdown();
            } catch (IOException e) {
                LangUtil.rethrowUnchecked(e);
            }
        }
    }

    private ByteBuffer getWriteBuffer(DirectBuffer directBuffer, int i, int i2) {
        ByteBuffer byteBuffer;
        if (this.slot == EOS_REQUESTED) {
            this.writeBuffer.clear();
            directBuffer.getBytes(i, this.writeBuffer, i2);
            this.writeBuffer.flip();
            byteBuffer = this.writeBuffer;
        } else {
            ByteBuffer byteBuffer2 = this.bufferPool.byteBuffer(this.slot);
            byteBuffer2.position(this.slotPosition);
            directBuffer.getBytes(i, byteBuffer2, i2);
            this.slotPosition += i2;
            byteBuffer2.position(this.slotOffset);
            byteBuffer2.limit(this.slotPosition);
            byteBuffer = byteBuffer2;
        }
        return byteBuffer;
    }

    private boolean handleUnwrittenData(ByteBuffer byteBuffer, int i) {
        boolean z = true;
        if (this.slot == EOS_REQUESTED) {
            if (byteBuffer.hasRemaining()) {
                this.slot = this.bufferPool.acquire(this.streamId);
                if (this.slot == EOS_REQUESTED) {
                    this.counters.overflows.getAsLong();
                    doFail();
                    z = false;
                } else {
                    ByteBuffer byteBuffer2 = this.bufferPool.byteBuffer(this.slot);
                    this.slotOffset = byteBuffer2.position();
                    byteBuffer2.position(this.slotOffset);
                    byteBuffer2.put(byteBuffer);
                    this.slotPosition = byteBuffer2.position();
                    if (i > 0) {
                        offerWindow(i);
                    }
                }
            } else if (i > 0) {
                offerWindow(i);
            }
        } else if (byteBuffer.hasRemaining()) {
            this.slotOffset = byteBuffer.position();
        } else {
            offerWindow(this.slotPosition - this.bufferPool.byteBuffer(this.slot).position());
            this.bufferPool.release(this.slot);
            this.slot = EOS_REQUESTED;
        }
        return z;
    }

    private int handleWrite(PollerKey pollerKey) {
        int i = 0;
        try {
            pollerKey.clear(4);
            ByteBuffer byteBuffer = this.bufferPool.byteBuffer(this.slot);
            byteBuffer.position(this.slotOffset);
            byteBuffer.limit(this.slotPosition);
            i = this.channel.write(byteBuffer);
            handleUnwrittenData(byteBuffer, i);
            if (this.slot != EOS_REQUESTED) {
                pollerKey.register(4);
            } else if (this.readableBytes < 0) {
                doCleanup();
            }
        } catch (IOException | CancelledKeyException e) {
            handleIOExceptionFromWrite();
        }
        return i;
    }

    private boolean reduceWindow(int i) {
        this.readableBytes -= i;
        return this.readableBytes >= 0;
    }

    private void offerWindow(int i) {
        this.pendingCredit += i;
        if (this.pendingCredit < this.windowThreshold || this.readableBytes <= EOS_REQUESTED) {
            return;
        }
        this.readableBytes += this.pendingCredit;
        this.writer.doWindow(this.sourceThrottle, this.streamId, this.pendingCredit, 0, 0);
        this.pendingCredit = 0;
    }

    private void closeIfInputShutdown() {
        if (this.channel.socket().isInputShutdown() && this.channel.isOpen()) {
            CloseHelper.quietClose(this.channel);
            this.counters.connectionsClosed.getAsLong();
        }
    }

    static {
        $assertionsDisabled = !WriteStream.class.desiredAssertionStatus();
    }
}
