package org.reaktivity.nukleus.tcp.internal.writer;

import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.LongFunction;
import org.agrona.LangUtil;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.tcp.internal.Context;
import org.reaktivity.nukleus.tcp.internal.InternalSystemProperty;
import org.reaktivity.nukleus.tcp.internal.conductor.Conductor;
import org.reaktivity.nukleus.tcp.internal.connector.Connector;
import org.reaktivity.nukleus.tcp.internal.layouts.StreamsLayout;
import org.reaktivity.nukleus.tcp.internal.poller.Poller;
import org.reaktivity.nukleus.tcp.internal.router.Correlation;

/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/writer/Writer.class */
public final class Writer extends Nukleus.Composite {
    private static final List<Route> EMPTY_ROUTES = Collections.emptyList();
    private final Context context;
    private final Conductor conductor;
    private final Connector connector;
    private final Poller poller;
    private final String name;
    private final String sourceName;
    private final AtomicBuffer writeBuffer;
    private final Map<String, Source> sourcesByPartitionName;
    private final Map<String, Target> targetsByName;
    private final Long2ObjectHashMap<List<Route>> routesByRef;
    private final LongFunction<Correlation> resolveCorrelation;
    private final Long2ObjectHashMap<MessageHandler> streams;

    public Writer(Context context, Conductor conductor, Connector connector, Poller poller, String str, LongFunction<Correlation> longFunction) {
        super(new Nukleus[0]);
        this.context = context;
        this.conductor = conductor;
        this.connector = connector;
        this.poller = poller;
        this.sourceName = str;
        this.resolveCorrelation = longFunction;
        this.name = str;
        this.writeBuffer = new UnsafeBuffer(new byte[context.maxMessageLength()]);
        this.sourcesByPartitionName = new HashMap();
        this.targetsByName = new HashMap();
        this.routesByRef = new Long2ObjectHashMap<>();
        this.streams = new Long2ObjectHashMap<>();
    }

    @Override // org.reaktivity.nukleus.Nukleus
    public String name() {
        return this.name;
    }

    public void onReadable(String str) {
        this.sourcesByPartitionName.computeIfAbsent(str, this::newSource);
    }

    public void onConnected(String str, long j, long j2, String str2, long j3, SocketChannel socketChannel) {
        this.sourcesByPartitionName.get(str).onConnected(j, j2, this.targetsByName.computeIfAbsent(str2, this::newTarget), socketChannel, j3);
    }

    public void onConnectFailed(String str, long j) {
        this.sourcesByPartitionName.get(str).doReset(j);
    }

    public void doRoute(long j, long j2, String str, long j3, InetSocketAddress inetSocketAddress) {
        try {
            this.routesByRef.computeIfAbsent(j2, this::newRoutes).add(new Route(this.sourceName, j2, this.targetsByName.computeIfAbsent(str, this::newTarget), j3, inetSocketAddress));
            this.conductor.onRoutedResponse(j, j2);
        } catch (Exception e) {
            this.conductor.onErrorResponse(j);
            LangUtil.rethrowUnchecked(e);
        }
    }

    public void doUnroute(long j, long j2, String str, long j3, InetSocketAddress inetSocketAddress) {
        if (lookupRoutes(j2).removeIf(Route.sourceMatches(this.sourceName).and(Route.sourceRefMatches(j2)).and(Route.targetMatches(str)).and(Route.targetRefMatches(j3)).and(Route.addressMatches(inetSocketAddress)))) {
            this.conductor.onUnroutedResponse(j);
        } else {
            this.conductor.onErrorResponse(j);
        }
    }

    @Override // org.reaktivity.nukleus.Nukleus.Composite
    protected void toString(StringBuilder sb) {
        sb.append(String.format("%s[name=%s]", getClass().getSimpleName(), this.name));
    }

    private Target newTarget(String str) {
        return (Target) include(new Target(this.poller, str));
    }

    private List<Route> newRoutes(long j) {
        return new ArrayList();
    }

    private List<Route> lookupRoutes(long j) {
        return this.routesByRef.getOrDefault(Long.valueOf(j), EMPTY_ROUTES);
    }

    private Source newSource(String str) {
        StreamsLayout build = new StreamsLayout.Builder().path(this.context.captureStreamsPath().apply(str)).streamsCapacity(this.context.streamsBufferCapacity()).throttleCapacity(this.context.throttleBufferCapacity()).readonly(true).build();
        Function function = str2 -> {
            return this.targetsByName.computeIfAbsent(str2, this::newTarget);
        };
        int intValue = InternalSystemProperty.MAXIMUM_STREAMS_WITH_PENDING_WRITES.intValue(() -> {
            return this.context.maximumStreamsCount() < 1001 ? this.context.maximumStreamsCount() : this.context.maximumStreamsCount() / 10;
        }).intValue();
        Connector connector = this.connector;
        LongFunction longFunction = this::lookupRoutes;
        LongFunction<Correlation> longFunction2 = this.resolveCorrelation;
        Long2ObjectHashMap<MessageHandler> long2ObjectHashMap = this.streams;
        AtomicBuffer atomicBuffer = this.writeBuffer;
        AtomicCounter overflows = this.context.counters().overflows();
        overflows.getClass();
        return (Source) include(new Source(str, connector, longFunction, longFunction2, function, long2ObjectHashMap, build, atomicBuffer, intValue, overflows::increment));
    }
}
