package org.reaktivity.nukleus.tcp.internal.writer;

import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.tcp.internal.InternalSystemProperty;
import org.reaktivity.nukleus.tcp.internal.TcpNukleus;
import org.reaktivity.nukleus.tcp.internal.connector.Connector;
import org.reaktivity.nukleus.tcp.internal.layouts.StreamsLayout;
import org.reaktivity.nukleus.tcp.internal.router.Correlation;
import org.reaktivity.nukleus.tcp.internal.router.RouteKind;
import org.reaktivity.nukleus.tcp.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.FrameFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.tcp.internal.writer.stream.StreamFactory;

/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/writer/Source.class */
public final class Source implements Nukleus {
    private static final DirectBuffer SOURCE_NAME_BUFFER = new UnsafeBuffer(TcpNukleus.NAME.getBytes(StandardCharsets.UTF_8));
    private final String partitionName;
    private final Connector connector;
    private final LongFunction<List<Route>> lookupRoutes;
    private final LongFunction<Correlation> resolveCorrelation;
    private final Function<String, Target> supplyTarget;
    private final StreamsLayout layout;
    private final AtomicBuffer writeBuffer;
    private final RingBuffer streamsBuffer;
    private final RingBuffer throttleBuffer;
    private final StreamFactory streamFactory;
    private final Long2ObjectHashMap<MessageHandler> streams;
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final MessageHandler readHandler = this::handleRead;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Source(String str, Connector connector, LongFunction<List<Route>> longFunction, LongFunction<Correlation> longFunction2, Function<String, Target> function, Long2ObjectHashMap<MessageHandler> long2ObjectHashMap, StreamsLayout streamsLayout, AtomicBuffer atomicBuffer, int i, LongSupplier longSupplier) {
        this.partitionName = str;
        this.connector = connector;
        this.lookupRoutes = longFunction;
        this.resolveCorrelation = longFunction2;
        this.supplyTarget = function;
        this.layout = streamsLayout;
        this.writeBuffer = atomicBuffer;
        this.streamsBuffer = streamsLayout.streamsBuffer();
        this.throttleBuffer = streamsLayout.throttleBuffer();
        this.streamFactory = new StreamFactory(this, InternalSystemProperty.WINDOW_SIZE.intValue().intValue(), i, longSupplier);
        this.streams = long2ObjectHashMap;
    }

    public int process() {
        return this.streamsBuffer.read(this.readHandler);
    }

    public void close() throws Exception {
        this.layout.close();
    }

    public String name() {
        return this.partitionName;
    }

    public String toString() {
        return String.format("%s[name=%s]", getClass().getSimpleName(), this.partitionName);
    }

    private void handleRead(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        this.frameRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
        long streamId = this.frameRO.streamId();
        MessageHandler messageHandler = (MessageHandler) this.streams.get(streamId);
        if (messageHandler != null) {
            messageHandler.onMessage(i, mutableDirectBuffer, i2, i3);
        } else if (i == 1) {
            handleBegin(mutableDirectBuffer, i2, i3);
        } else {
            doReset(streamId);
        }
    }

    private void handleBegin(MutableDirectBuffer mutableDirectBuffer, int i, int i2) {
        this.beginRO.wrap((DirectBuffer) mutableDirectBuffer, i, i + i2);
        long streamId = this.beginRO.streamId();
        long sourceRef = this.beginRO.sourceRef();
        long correlationId = this.beginRO.correlationId();
        if (sourceRef == 0) {
            handleBeginServerReply(mutableDirectBuffer, i, i2, streamId, correlationId);
            return;
        }
        switch (RouteKind.match(sourceRef)) {
            case OUTPUT_NEW:
                handleBeginClient(streamId, sourceRef, correlationId);
                return;
            default:
                doReset(streamId);
                return;
        }
    }

    private void handleBeginServerReply(MutableDirectBuffer mutableDirectBuffer, int i, int i2, long j, long j2) {
        Correlation apply = this.resolveCorrelation.apply(j2);
        if (apply == null) {
            doReset(j);
            return;
        }
        SocketChannel channel = apply.channel();
        MessageHandler newStream = this.streamFactory.newStream(j, this.supplyTarget.apply(apply.source()), channel);
        this.streams.put(j, newStream);
        newStream.onMessage(1, mutableDirectBuffer, i, i2);
    }

    private void handleBeginClient(long j, long j2, long j3) {
        Optional<Route> findFirst = this.lookupRoutes.apply(j2).stream().findFirst();
        if (!findFirst.isPresent()) {
            doReset(j);
            return;
        }
        Route route = findFirst.get();
        Target target = route.target();
        long targetRef = route.targetRef();
        SocketChannel newSocketChannel = newSocketChannel();
        this.streams.put(j, this.streamFactory.newStream(j, target, newSocketChannel));
        this.connector.doConnect(this.partitionName, j2, j, j3, route.target().name(), targetRef, newSocketChannel, route.address());
    }

    private SocketChannel newSocketChannel() {
        try {
            SocketChannel open = SocketChannel.open();
            open.configureBlocking(false);
            return open;
        } catch (IOException e) {
            LangUtil.rethrowUnchecked(e);
            return null;
        }
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [org.reaktivity.nukleus.tcp.internal.types.stream.BeginFW$Builder] */
    public void onConnected(long j, long j2, Target target, SocketChannel socketChannel, long j3) {
        MessageHandler newStream = this.streamFactory.newStream(j, target, socketChannel);
        this.streams.put(j, newStream);
        BeginFW build = this.beginRW.wrap2((MutableDirectBuffer) this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).source(SOURCE_NAME_BUFFER, 0, SOURCE_NAME_BUFFER.capacity()).sourceRef(j2).correlationId(j3).build();
        newStream.onMessage(build.typeId(), this.writeBuffer, build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.tcp.internal.types.stream.WindowFW$Builder] */
    public void doWindow(long j, int i) {
        WindowFW build = this.windowRW.wrap2((MutableDirectBuffer) this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).update(i).build();
        this.throttleBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.tcp.internal.types.stream.ResetFW$Builder] */
    public void doReset(long j) {
        ResetFW build = this.resetRW.wrap2((MutableDirectBuffer) this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).build();
        this.throttleBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    public void replaceStream(long j, MessageHandler messageHandler) {
        this.streams.put(j, messageHandler);
    }

    public void removeStream(long j) {
        this.streams.remove(j);
    }
}
