package org.reaktivity.nukleus.tcp.internal.reader.stream;

import java.io.IOException;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.util.function.LongFunction;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.tcp.internal.poller.PollerKey;
import org.reaktivity.nukleus.tcp.internal.reader.Target;
import org.reaktivity.nukleus.tcp.internal.router.Correlation;
import org.reaktivity.nukleus.tcp.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.WindowFW;

/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/reader/stream/StreamFactory.class */
public final class StreamFactory {
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();
    private final int bufferSize;
    private final LongFunction<Correlation> resolveCorrelation;
    private final ByteBuffer readBuffer;
    private final AtomicBuffer atomicBuffer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/reader/stream/StreamFactory$Stream.class */
    public final class Stream {
        private final Target target;
        private final long streamId;
        private final PollerKey key;
        private final SocketChannel channel;
        private final long correlationId;
        private int readableBytes;
        static final /* synthetic */ boolean $assertionsDisabled;

        private Stream(Target target, long j, PollerKey pollerKey, SocketChannel socketChannel, long j2) {
            this.target = target;
            this.streamId = j;
            this.key = pollerKey;
            this.channel = socketChannel;
            this.correlationId = j2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int handleStream(PollerKey pollerKey) {
            int i;
            if (!$assertionsDisabled && this.readableBytes <= 0) {
                throw new AssertionError();
            }
            int min = Math.min(this.readableBytes, StreamFactory.this.bufferSize);
            StreamFactory.this.readBuffer.position(0);
            StreamFactory.this.readBuffer.limit(min);
            try {
                i = this.channel.read(StreamFactory.this.readBuffer);
            } catch (IOException e) {
                i = -1;
            }
            if (i == -1) {
                this.readableBytes = -1;
                this.target.doTcpEnd(this.streamId);
                this.target.removeThrottle(this.streamId);
                pollerKey.cancel(1);
                return 1;
            }
            if (i == 0) {
                return 1;
            }
            this.target.doTcpData(this.streamId, StreamFactory.this.atomicBuffer, 0, i);
            this.readableBytes -= i;
            if (this.readableBytes != 0) {
                return 1;
            }
            pollerKey.clear(1);
            return 1;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    processReset(directBuffer, i2, i3);
                    return;
                case 1073741826:
                    processWindow(directBuffer, i2, i3);
                    return;
                default:
                    return;
            }
        }

        private void processWindow(DirectBuffer directBuffer, int i, int i2) {
            StreamFactory.this.windowRO.wrap(directBuffer, i, i + i2);
            if (this.readableBytes != -1) {
                this.readableBytes += StreamFactory.this.windowRO.update();
                handleStream(this.key);
                if (this.readableBytes > 0) {
                    this.key.register(1);
                }
            }
        }

        private void processReset(DirectBuffer directBuffer, int i, int i2) {
            StreamFactory.this.resetRO.wrap(directBuffer, i, i + i2);
            try {
                try {
                    if (StreamFactory.this.resolveCorrelation.apply(this.correlationId) == null) {
                        this.channel.shutdownInput();
                    } else {
                        this.channel.setOption((SocketOption<SocketOption>) StandardSocketOptions.SO_LINGER, (SocketOption) 0);
                        this.channel.close();
                    }
                    this.target.removeThrottle(this.streamId);
                } catch (IOException e) {
                    LangUtil.rethrowUnchecked(e);
                    this.target.removeThrottle(this.streamId);
                }
            } catch (Throwable th) {
                this.target.removeThrottle(this.streamId);
                throw th;
            }
        }

        static {
            $assertionsDisabled = !StreamFactory.class.desiredAssertionStatus();
        }
    }

    public StreamFactory(int i, LongFunction<Correlation> longFunction) {
        this.bufferSize = i - 10;
        this.resolveCorrelation = longFunction;
        this.readBuffer = ByteBuffer.allocateDirect(this.bufferSize).order(ByteOrder.nativeOrder());
        this.atomicBuffer = new UnsafeBuffer(this.readBuffer);
    }

    public ToIntFunction<PollerKey> newStream(Target target, long j, PollerKey pollerKey, SocketChannel socketChannel, long j2) {
        Stream stream = new Stream(target, j, pollerKey, socketChannel, j2);
        stream.getClass();
        target.addThrottle(j, (i, directBuffer, i2, i3) -> {
            stream.handleThrottle(i, directBuffer, i2, i3);
        });
        stream.getClass();
        return pollerKey2 -> {
            return stream.handleStream(pollerKey2);
        };
    }
}
