package org.reaktivity.nukleus.tcp.internal.writer.stream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.function.LongSupplier;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.concurrent.MessageHandler;
import org.reaktivity.nukleus.tcp.internal.types.OctetsFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.DataFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.EndFW;
import org.reaktivity.nukleus.tcp.internal.writer.Source;
import org.reaktivity.nukleus.tcp.internal.writer.Target;

/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/writer/stream/StreamFactory.class */
public final class StreamFactory {
    public static final int WRITE_SPIN_COUNT = 16;
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final Source source;
    private final int windowSize;
    private final Slab writeSlab;
    private final LongSupplier incrementOverflow;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/writer/stream/StreamFactory$Stream.class */
    public final class Stream {
        private static final int EOS_REQUESTED = -1;
        private final long id;
        private final Target target;
        private final SocketChannel channel;
        private int slot;
        private SelectionKey key;
        private int readableBytes;

        private Stream(long j, Target target, SocketChannel socketChannel) {
            this.slot = EOS_REQUESTED;
            this.id = j;
            this.target = target;
            this.channel = socketChannel;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleStream(int i, DirectBuffer directBuffer, int i2, int i3) {
            try {
                switch (i) {
                    case 1:
                        processBegin(directBuffer, i2, i2 + i3);
                        break;
                    case 2:
                        processData(directBuffer, i2, i2 + i3);
                        break;
                    case EndFW.TYPE_ID /* 3 */:
                        processEnd(directBuffer, i2, i2 + i3);
                        break;
                }
            } catch (IOException e) {
                doFail();
            }
        }

        private void processBegin(DirectBuffer directBuffer, int i, int i2) {
            StreamFactory.this.beginRO.wrap(directBuffer, i, i2);
            this.key = this.target.doRegister(this.channel, 0, this::handleWrite);
            offerWindow(StreamFactory.this.windowSize);
        }

        private void processData(DirectBuffer directBuffer, int i, int i2) throws IOException {
            StreamFactory.this.dataRO.wrap(directBuffer, i, i2);
            OctetsFW payload = StreamFactory.this.dataRO.payload();
            int length = StreamFactory.this.dataRO.length();
            if (!reduceWindow(length)) {
                if (this.slot == EOS_REQUESTED) {
                    doFail();
                    return;
                } else {
                    StreamFactory.this.source.doReset(this.id);
                    return;
                }
            }
            ByteBuffer byteBuffer = StreamFactory.this.writeSlab.get(this.slot, directBuffer, payload.offset(), length);
            int i3 = 0;
            for (int i4 = 16; i3 == 0 && i4 > 0; i4 += EOS_REQUESTED) {
                i3 = this.channel.write(byteBuffer);
            }
            int i5 = this.slot;
            this.slot = StreamFactory.this.writeSlab.written(this.id, this.slot, byteBuffer, i3, this::offerWindow);
            if (this.slot == -2) {
                StreamFactory.this.incrementOverflow.getAsLong();
                doFail();
            } else if (i3 < length) {
                this.key.interestOps(4);
            } else if (i5 != EOS_REQUESTED) {
                this.key.interestOps(0);
            }
        }

        private void processEnd(DirectBuffer directBuffer, int i, int i2) {
            if (this.slot != EOS_REQUESTED) {
                this.readableBytes = EOS_REQUESTED;
            } else {
                StreamFactory.this.endRO.wrap(directBuffer, i, i2);
                doCleanup();
            }
        }

        private void doFail() {
            StreamFactory.this.source.doReset(this.id);
            if (this.slot >= 0) {
                StreamFactory.this.writeSlab.release(this.slot);
            }
            doCleanup();
        }

        private void doCleanup() {
            this.key.interestOps(0);
            try {
                StreamFactory.this.source.removeStream(this.id);
                this.channel.shutdownOutput();
            } catch (IOException e) {
                LangUtil.rethrowUnchecked(e);
            }
        }

        private int handleWrite() {
            this.key.interestOps(0);
            ByteBuffer byteBuffer = StreamFactory.this.writeSlab.get(this.slot);
            try {
                int write = this.channel.write(byteBuffer);
                this.slot = StreamFactory.this.writeSlab.written(this.id, this.slot, byteBuffer, write, this::offerWindow);
                if (this.slot != EOS_REQUESTED) {
                    this.key.interestOps(4);
                } else if (this.readableBytes < 0) {
                    doCleanup();
                }
                return write;
            } catch (IOException e) {
                doFail();
                return 0;
            }
        }

        private boolean reduceWindow(int i) {
            this.readableBytes -= i;
            return this.readableBytes >= 0;
        }

        private void offerWindow(int i) {
            if (this.readableBytes > EOS_REQUESTED) {
                this.readableBytes += i;
                StreamFactory.this.source.doWindow(this.id, i);
            }
        }
    }

    public StreamFactory(Source source, int i, int i2, LongSupplier longSupplier) {
        this.source = source;
        this.windowSize = i;
        this.writeSlab = new Slab(i2, i);
        this.incrementOverflow = longSupplier;
    }

    public MessageHandler newStream(long j, Target target, SocketChannel socketChannel) {
        Stream stream = new Stream(j, target, socketChannel);
        stream.getClass();
        return (i, directBuffer, i2, i3) -> {
            stream.handleStream(i, directBuffer, i2, i3);
        };
    }
}
