package org.reaktivity.nukleus.tcp.internal.stream;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;
import java.util.function.LongConsumer;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.Configuration;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;
import org.reaktivity.nukleus.tcp.internal.poller.Poller;
import org.reaktivity.nukleus.tcp.internal.poller.PollerKey;
import org.reaktivity.nukleus.tcp.internal.types.OctetsFW;
import org.reaktivity.nukleus.tcp.internal.types.TcpAddressFW;
import org.reaktivity.nukleus.tcp.internal.types.control.RouteFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.TcpBeginExFW;
import org.reaktivity.nukleus.tcp.internal.util.CIDR;
import org.reaktivity.nukleus.tcp.internal.util.function.LongObjectBiConsumer;

/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/stream/ClientStreamFactory.class */
public class ClientStreamFactory implements StreamFactory {
    private final BufferPool bufferPool;
    private final LongSupplier incrementOverflow;
    private Poller poller;
    private final RouteManager router;
    private final LongSupplier supplyStreamId;
    private final LongFunction<IntUnaryOperator> groupBudgetClaimer;
    private final LongFunction<IntUnaryOperator> groupBudgetReleaser;
    private final ByteBuffer readByteBuffer;
    private final MutableDirectBuffer readBuffer;
    private final ByteBuffer writeByteBuffer;
    private final MessageWriter writer;
    private final Function<RouteFW, LongSupplier> supplyWriteFrameCounter;
    private final Function<RouteFW, LongSupplier> supplyReadFrameCounter;
    private final Function<RouteFW, LongConsumer> supplyWriteBytesAccumulator;
    private final Function<RouteFW, LongConsumer> supplyReadBytesAccumulator;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final RouteFW routeRO = new RouteFW();
    private final TcpBeginExFW tcpBeginExRO = new TcpBeginExFW();
    private final BeginFW beginRO = new BeginFW();
    private final Map<String, Predicate<? super InetAddress>> targetToCidrMatch = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/stream/ClientStreamFactory$Request.class */
    public final class Request implements ToIntFunction<PollerKey> {
        private final WriteStream stream;
        private final SocketChannel channel;
        private final String acceptReplyName;
        private final long correlationId;
        private final MessageConsumer outputThrottle;
        private final long outputStreamdId;
        private final LongObjectBiConsumer<MessageConsumer> setCorrelatedInput;
        private final LongSupplier readFrameCounter;
        private final LongConsumer readBytesAccumulator;

        private Request(SocketChannel socketChannel, WriteStream writeStream, String str, long j, MessageConsumer messageConsumer, long j2, LongObjectBiConsumer<MessageConsumer> longObjectBiConsumer, LongSupplier longSupplier, LongConsumer longConsumer) {
            this.channel = socketChannel;
            this.stream = writeStream;
            this.acceptReplyName = str;
            this.correlationId = j;
            this.outputThrottle = messageConsumer;
            this.outputStreamdId = j2;
            this.setCorrelatedInput = longObjectBiConsumer;
            this.readFrameCounter = longSupplier;
            this.readBytesAccumulator = longConsumer;
        }

        public String toString() {
            return String.format("[writeStream=%s]", this.stream);
        }

        @Override // java.util.function.ToIntFunction
        public int applyAsInt(PollerKey pollerKey) {
            try {
                this.channel.finishConnect();
                ClientStreamFactory.this.handleConnected(this);
                return 1;
            } catch (IOException e) {
                ClientStreamFactory.this.handleConnectFailed(this);
                return 1;
            } finally {
                pollerKey.cancel(8);
            }
        }
    }

    public ClientStreamFactory(Configuration configuration, RouteManager routeManager, Poller poller, MutableDirectBuffer mutableDirectBuffer, BufferPool bufferPool, LongSupplier longSupplier, LongSupplier longSupplier2, LongFunction<IntUnaryOperator> longFunction, LongFunction<IntUnaryOperator> longFunction2, Function<RouteFW, LongSupplier> function, Function<RouteFW, LongConsumer> function2, Function<RouteFW, LongSupplier> function3, Function<RouteFW, LongConsumer> function4) {
        this.router = (RouteManager) Objects.requireNonNull(routeManager);
        this.poller = poller;
        this.writeByteBuffer = ByteBuffer.allocateDirect(mutableDirectBuffer.capacity()).order(ByteOrder.nativeOrder());
        this.bufferPool = (BufferPool) Objects.requireNonNull(bufferPool);
        this.incrementOverflow = longSupplier;
        this.supplyStreamId = (LongSupplier) Objects.requireNonNull(longSupplier2);
        this.groupBudgetClaimer = (LongFunction) Objects.requireNonNull(longFunction);
        this.groupBudgetReleaser = (LongFunction) Objects.requireNonNull(longFunction2);
        this.writer = new MessageWriter((MutableDirectBuffer) Objects.requireNonNull(mutableDirectBuffer));
        this.readByteBuffer = ByteBuffer.allocateDirect(Math.min(mutableDirectBuffer.capacity() - 49, 65535)).order(ByteOrder.nativeOrder());
        this.readBuffer = new UnsafeBuffer(this.readByteBuffer);
        this.supplyWriteFrameCounter = function3;
        this.supplyReadFrameCounter = function;
        this.supplyWriteBytesAccumulator = function4;
        this.supplyReadBytesAccumulator = function2;
    }

    public MessageConsumer newStream(int i, DirectBuffer directBuffer, int i2, int i3, MessageConsumer messageConsumer) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        long sourceRef = wrap.sourceRef();
        if (sourceRef != 0) {
            return newAcceptStream(wrap, messageConsumer);
        }
        long streamId = wrap.streamId();
        this.writer.doReset(messageConsumer, streamId);
        throw new IllegalArgumentException(String.format("Stream id %d is not a connect stream, sourceRef is zero", Long.valueOf(streamId), Long.valueOf(sourceRef)));
    }

    private MessageConsumer newAcceptStream(BeginFW beginFW, MessageConsumer messageConsumer) {
        MessageConsumer messageConsumer2 = null;
        long streamId = beginFW.streamId();
        String asString = beginFW.source().asString();
        long sourceRef = beginFW.sourceRef();
        long correlationId = beginFW.correlationId();
        OctetsFW extension = beginFW.extension();
        boolean z = extension.sizeof() > 0;
        RouteFW routeFW = (RouteFW) this.router.resolve(beginFW.authorization(), (i, directBuffer, i2, i3) -> {
            RouteFW wrap = this.routeRO.wrap(directBuffer, i2, i3);
            return sourceRef == wrap.sourceRef() && !(z && resolveRemoteAddressExt(extension, wrap.target().asString(), wrap.targetRef()) == null);
        }, this::wrapRoute);
        if (routeFW != null) {
            SocketChannel newSocketChannel = newSocketChannel();
            String asString2 = routeFW.target().asString();
            long targetRef = routeFW.targetRef();
            InetSocketAddress resolveRemoteAddressExt = z ? resolveRemoteAddressExt(extension, asString2, targetRef) : new InetSocketAddress(asString2, (int) targetRef);
            if (!$assertionsDisabled && resolveRemoteAddressExt == null) {
                throw new AssertionError();
            }
            LongSupplier apply = this.supplyWriteFrameCounter.apply(routeFW);
            LongConsumer apply2 = this.supplyWriteBytesAccumulator.apply(routeFW);
            LongSupplier apply3 = this.supplyReadFrameCounter.apply(routeFW);
            LongConsumer apply4 = this.supplyReadBytesAccumulator.apply(routeFW);
            WriteStream writeStream = new WriteStream(messageConsumer, streamId, newSocketChannel, this.poller, this.incrementOverflow, this.bufferPool, this.writeByteBuffer, this.writer, apply, apply2);
            Objects.requireNonNull(writeStream);
            messageConsumer2 = writeStream::handleStream;
            Objects.requireNonNull(writeStream);
            doConnect(writeStream, newSocketChannel, resolveRemoteAddressExt, asString, correlationId, messageConsumer, streamId, writeStream::setCorrelatedInput, apply3, apply4);
        } else {
            this.writer.doReset(messageConsumer, streamId);
        }
        return messageConsumer2;
    }

    private InetSocketAddress resolveRemoteAddressExt(OctetsFW octetsFW, String str, long j) {
        InetAddress inetAddress;
        InetSocketAddress inetSocketAddress = null;
        try {
            TcpBeginExFW tcpBeginExFW = this.tcpBeginExRO;
            Objects.requireNonNull(tcpBeginExFW);
            TcpBeginExFW tcpBeginExFW2 = (TcpBeginExFW) octetsFW.get(tcpBeginExFW::wrap);
            TcpAddressFW remoteAddress = tcpBeginExFW2.remoteAddress();
            Predicate<? super InetAddress> extensionMatcher = extensionMatcher(str);
            int remotePort = tcpBeginExFW2.remotePort();
            if (j == 0 || j == remotePort) {
                switch (remoteAddress.kind()) {
                    case 1:
                        OctetsFW ipv4Address = remoteAddress.ipv4Address();
                        byte[] bArr = new byte[ipv4Address.sizeof()];
                        ipv4Address.buffer().getBytes(ipv4Address.offset(), bArr, 0, ipv4Address.sizeof());
                        InetAddress byAddress = InetAddress.getByAddress(bArr);
                        inetAddress = extensionMatcher.test(byAddress) ? byAddress : null;
                        break;
                    case 2:
                        OctetsFW ipv6Address = remoteAddress.ipv6Address();
                        byte[] bArr2 = new byte[ipv6Address.sizeof()];
                        ipv6Address.buffer().getBytes(ipv6Address.offset(), bArr2, 0, ipv6Address.sizeof());
                        InetAddress byAddress2 = InetAddress.getByAddress(bArr2);
                        inetAddress = extensionMatcher.test(byAddress2) ? byAddress2 : null;
                        break;
                    case 3:
                        Optional findFirst = Arrays.stream(InetAddress.getAllByName(remoteAddress.host().asString())).filter(extensionMatcher).findFirst();
                        inetAddress = findFirst.isPresent() ? (InetAddress) findFirst.get() : null;
                        break;
                    default:
                        throw new RuntimeException("Unexpected address kind");
                }
                if (inetAddress != null) {
                    inetSocketAddress = new InetSocketAddress(inetAddress, remotePort);
                }
            }
        } catch (UnknownHostException e) {
        }
        return inetSocketAddress;
    }

    private Predicate<? super InetAddress> extensionMatcher(String str) throws UnknownHostException {
        Predicate<? super InetAddress> computeIfAbsent;
        if (str.contains("/")) {
            computeIfAbsent = this.targetToCidrMatch.computeIfAbsent(str, this::inetMatchesCIDR);
        } else {
            InetAddress.getByName(str);
            computeIfAbsent = this.targetToCidrMatch.computeIfAbsent(str, this::inetMatchesInet);
        }
        return computeIfAbsent;
    }

    private Predicate<InetAddress> inetMatchesCIDR(String str) {
        CIDR cidr = new CIDR(str);
        return inetAddress -> {
            return cidr.isInRange(inetAddress.getHostAddress());
        };
    }

    private Predicate<InetAddress> inetMatchesInet(String str) {
        try {
            InetAddress byName = InetAddress.getByName(str);
            return inetAddress -> {
                return byName.equals(inetAddress);
            };
        } catch (UnknownHostException e) {
            LangUtil.rethrowUnchecked(e);
            return inetAddress2 -> {
                return false;
            };
        }
    }

    private SocketChannel newSocketChannel() {
        try {
            SocketChannel open = SocketChannel.open();
            open.configureBlocking(false);
            open.setOption((SocketOption<SocketOption>) StandardSocketOptions.TCP_NODELAY, (SocketOption) true);
            return open;
        } catch (IOException e) {
            LangUtil.rethrowUnchecked(e);
            return null;
        }
    }

    private RouteFW wrapRoute(int i, DirectBuffer directBuffer, int i2, int i3) {
        return this.routeRO.wrap(directBuffer, i2, i2 + i3);
    }

    public void doConnect(WriteStream writeStream, SocketChannel socketChannel, InetSocketAddress inetSocketAddress, String str, long j, MessageConsumer messageConsumer, long j2, LongObjectBiConsumer<MessageConsumer> longObjectBiConsumer, LongSupplier longSupplier, LongConsumer longConsumer) {
        Request request = new Request(socketChannel, writeStream, str, j, messageConsumer, j2, longObjectBiConsumer, longSupplier, longConsumer);
        try {
            if (socketChannel.connect(inetSocketAddress)) {
                handleConnected(request);
            } else {
                this.poller.doRegister(socketChannel, 8, request);
            }
        } catch (IOException e) {
            handleConnectFailed(request);
            LangUtil.rethrowUnchecked(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleConnected(Request request) {
        request.stream.doConnected();
        newConnectReplyStream(request);
    }

    private void newConnectReplyStream(Request request) {
        SocketChannel socketChannel = request.channel;
        String str = request.acceptReplyName;
        long asLong = this.supplyStreamId.getAsLong();
        long j = request.correlationId;
        MessageConsumer messageConsumer = request.outputThrottle;
        long j2 = request.outputStreamdId;
        try {
            InetSocketAddress inetSocketAddress = (InetSocketAddress) socketChannel.getLocalAddress();
            InetSocketAddress inetSocketAddress2 = (InetSocketAddress) socketChannel.getRemoteAddress();
            MessageConsumer supplyTarget = this.router.supplyTarget(str);
            request.setCorrelatedInput.accept(asLong, (long) supplyTarget);
            this.writer.doTcpBegin(supplyTarget, asLong, 0L, j, inetSocketAddress, inetSocketAddress2);
            PollerKey doRegister = this.poller.doRegister(socketChannel, 0, null);
            ReadStream readStream = new ReadStream(supplyTarget, asLong, doRegister, socketChannel, this.readByteBuffer, this.readBuffer, this.writer, request.readFrameCounter, request.readBytesAccumulator);
            readStream.setCorrelatedThrottle(j2, messageConsumer);
            RouteManager routeManager = this.router;
            Objects.requireNonNull(readStream);
            routeManager.setThrottle(str, asLong, readStream::handleThrottle);
            Objects.requireNonNull(readStream);
            doRegister.handler(1, readStream::handleStream);
        } catch (IOException e) {
            CloseHelper.quietClose(socketChannel);
            LangUtil.rethrowUnchecked(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleConnectFailed(Request request) {
        request.stream.doConnectFailed();
    }

    static {
        $assertionsDisabled = !ClientStreamFactory.class.desiredAssertionStatus();
    }
}
