package org.reaktivity.reaktor.internal.router;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.route.RouteKind;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactoryBuilder;
import org.reaktivity.reaktor.internal.Context;
import org.reaktivity.reaktor.internal.State;
import org.reaktivity.reaktor.internal.conductor.Conductor;
import org.reaktivity.reaktor.internal.layouts.RoutesLayout;
import org.reaktivity.reaktor.internal.layouts.StreamsLayout;
import org.reaktivity.reaktor.internal.types.OctetsFW;
import org.reaktivity.reaktor.internal.types.StringFW;
import org.reaktivity.reaktor.internal.types.control.Role;
import org.reaktivity.reaktor.internal.types.control.RouteFW;
import org.reaktivity.reaktor.internal.types.control.UnrouteFW;
import org.reaktivity.reaktor.internal.types.state.RouteEntryFW;
import org.reaktivity.reaktor.internal.types.state.RouteTableFW;

/* loaded from: input_file:org/reaktivity/reaktor/internal/router/Router.class */
public final class Router extends Nukleus.Composite implements RouteManager {
    private final RouteFW routeRO;
    private final RouteTableFW routeTableRO;
    private final RouteFW.Builder routeRW;
    private final RouteTableFW.Builder routeTableRW;
    private final Context context;
    private final MutableDirectBuffer writeBuffer;
    private final Map<String, Source> sourcesByName;
    private final Map<String, Target> targetsByName;
    private final AtomicCounter routeRefs;
    private final MutableDirectBuffer routeBuf;
    private final AtomicLong correlations;
    private final GroupBudgetManager groupBudgetManager;
    private final RoutesLayout routesLayout;
    private final MutableDirectBuffer routesBuffer;
    private final int routesBufferCapacity;
    private final Long2ObjectHashMap<MessageConsumer> streams;
    private final Long2ObjectHashMap<MessageConsumer> throttles;
    private Conductor conductor;
    private State state;
    private Function<RouteKind, StreamFactoryBuilder> supplyStreamFactoryBuilder;
    private boolean timestamps;
    private Function<Role, MessagePredicate> supplyRouteHandler;
    private Predicate<RouteKind> allowZeroSourceRef;
    private Predicate<RouteKind> allowZeroTargetRef;
    private Predicate<RouteKind> layoutSource;
    private Predicate<RouteKind> layoutTarget;
    static final /* synthetic */ boolean $assertionsDisabled;

    public Router(Context context) {
        super(new Nukleus[0]);
        this.routeRO = new RouteFW();
        this.routeTableRO = new RouteTableFW();
        this.routeRW = new RouteFW.Builder();
        this.routeTableRW = new RouteTableFW.Builder();
        this.context = context;
        this.writeBuffer = new UnsafeBuffer(new byte[context.maxMessageLength()]);
        this.routeRefs = context.counters().routes();
        this.sourcesByName = new HashMap();
        this.targetsByName = new HashMap();
        this.routeBuf = new UnsafeBuffer(ByteBuffer.allocateDirect(context.maxControlCommandLength()));
        this.correlations = new AtomicLong();
        this.groupBudgetManager = new GroupBudgetManager();
        this.routesLayout = context.routesLayout();
        this.routesBuffer = this.routesLayout.routesBuffer();
        this.routesBufferCapacity = this.routesLayout.capacity();
        this.streams = new Long2ObjectHashMap<>();
        this.throttles = new Long2ObjectHashMap<>();
    }

    public void setConductor(Conductor conductor) {
        this.conductor = conductor;
    }

    public void setState(State state) {
        this.state = state;
    }

    public void setStreamFactoryBuilderSupplier(Function<RouteKind, StreamFactoryBuilder> function) {
        this.supplyStreamFactoryBuilder = function;
    }

    public void setTimestamps(boolean z) {
        this.timestamps = z;
    }

    public void setRouteHandlerSupplier(Function<Role, MessagePredicate> function) {
        this.supplyRouteHandler = function;
    }

    public void setAllowZeroSourceRef(Predicate<RouteKind> predicate) {
        this.allowZeroSourceRef = predicate;
    }

    public void setAllowZeroTargetRef(Predicate<RouteKind> predicate) {
        this.allowZeroTargetRef = predicate;
    }

    public void setLayoutSource(Predicate<RouteKind> predicate) {
        this.layoutSource = predicate;
    }

    public void setLayoutTarget(Predicate<RouteKind> predicate) {
        this.layoutTarget = predicate;
    }

    @Override // org.reaktivity.nukleus.Nukleus
    public String name() {
        return "router";
    }

    @Override // org.reaktivity.nukleus.route.RouteManager
    public MessageConsumer supplyTarget(String str) {
        return supplyTargetInternal(str).writeHandler();
    }

    @Override // org.reaktivity.nukleus.route.RouteManager
    public void setThrottle(String str, long j, MessageConsumer messageConsumer) {
        supplyTargetInternal(str).setThrottle(j, messageConsumer);
    }

    public void doRoute(RouteFW routeFW) {
        long correlationId = routeFW.correlationId();
        try {
            Role role = routeFW.role().get();
            MessagePredicate apply = this.supplyRouteHandler.apply(role);
            boolean z = !this.allowZeroSourceRef.test(RouteKind.valueOf(role.ordinal()));
            boolean z2 = !this.allowZeroTargetRef.test(RouteKind.valueOf(role.ordinal()));
            if (z || z2) {
                routeFW = generateTargetRefIfNecessary(generateSourceRefIfNecessary(routeFW, z), z2);
                long sourceRef = routeFW.sourceRef();
                MessagePredicate messagePredicate = (i, directBuffer, i2, i3) -> {
                    return ReferenceKind.resolve(sourceRef).ordinal() == role.ordinal();
                };
                apply = apply == null ? messagePredicate : messagePredicate.and(apply);
            }
            if (apply == null) {
                apply = (i4, directBuffer2, i5, i6) -> {
                    return true;
                };
            }
            RouteFW generateRouteId = generateRouteId(routeFW);
            if (doRouteInternal(generateRouteId, apply)) {
                this.conductor.onRouted(correlationId, generateRouteId.sourceRef(), generateRouteId.targetRef());
            } else {
                this.conductor.onError(correlationId);
            }
        } catch (Exception e) {
            this.conductor.onError(correlationId);
            LangUtil.rethrowUnchecked(e);
        }
    }

    public void doUnroute(UnrouteFW unrouteFW) {
        long correlationId = unrouteFW.correlationId();
        try {
            MessagePredicate apply = this.supplyRouteHandler.apply(unrouteFW.role().get());
            if (apply == null) {
                apply = (i, directBuffer, i2, i3) -> {
                    return true;
                };
            }
            if (doUnrouteInternal(unrouteFW, apply)) {
                this.conductor.onUnrouted(correlationId);
            } else {
                this.conductor.onError(correlationId);
            }
        } catch (Exception e) {
            this.conductor.onError(correlationId);
            LangUtil.rethrowUnchecked(e);
        }
    }

    @Override // org.reaktivity.nukleus.route.RouteManager
    public <R> R resolve(long j, MessagePredicate messagePredicate, MessageFunction<R> messageFunction) {
        RouteTableFW wrap = this.routeTableRO.wrap((DirectBuffer) this.routesBuffer, 0, this.routesBufferCapacity);
        if (!$assertionsDisabled && wrap.writeLockReleases() != wrap.writeLockAcquires()) {
            throw new AssertionError();
        }
        RouteEntryFW matchFirst = wrap.routeEntries().matchFirst(routeEntryFW -> {
            OctetsFW route = routeEntryFW.route();
            RouteFW wrap2 = this.routeRO.wrap(route.buffer(), route.offset(), route.limit());
            return (j & wrap2.authorization()) == wrap2.authorization() && messagePredicate.test(wrap2.typeId(), wrap2.buffer(), wrap2.offset(), wrap2.sizeof());
        });
        R r = null;
        if (matchFirst != null) {
            OctetsFW route = matchFirst.route();
            RouteFW wrap2 = this.routeRO.wrap(route.buffer(), route.offset(), route.limit());
            r = messageFunction.apply(wrap2.typeId(), wrap2.buffer(), wrap2.offset(), wrap2.sizeof());
        }
        return r;
    }

    @Override // org.reaktivity.nukleus.route.RouteManager
    public void forEach(MessageConsumer messageConsumer) {
        RouteTableFW wrap = this.routeTableRO.wrap((DirectBuffer) this.routesBuffer, 0, this.routesBufferCapacity);
        if (!$assertionsDisabled && wrap.writeLockReleases() != wrap.writeLockAcquires()) {
            throw new AssertionError();
        }
        wrap.routeEntries().forEach(routeEntryFW -> {
            OctetsFW route = routeEntryFW.route();
            RouteFW wrap2 = this.routeRO.wrap(route.buffer(), route.offset(), route.limit());
            messageConsumer.accept(wrap2.typeId(), wrap2.buffer(), wrap2.offset(), wrap2.sizeof());
        });
    }

    /* JADX WARN: Type inference failed for: r0v13, types: [org.reaktivity.reaktor.internal.types.state.RouteTableFW$Builder] */
    private boolean doRouteInternal(RouteFW routeFW, MessagePredicate messagePredicate) {
        RouteTableFW wrap = this.routeTableRO.wrap((DirectBuffer) this.routesBuffer, 0, this.routesBufferCapacity);
        if (!$assertionsDisabled && wrap.writeLockReleases() != wrap.writeLockAcquires()) {
            throw new AssertionError();
        }
        boolean test = messagePredicate.test(routeFW.typeId(), routeFW.buffer(), routeFW.offset(), routeFW.sizeof());
        if (test) {
            int lock = this.routesLayout.lock();
            this.routeTableRW.wrap2(this.routesBuffer, 0, this.routesBufferCapacity).writeLockAcquires(lock).writeLockReleases(lock - 1).routeEntries(builder -> {
                wrap.routeEntries().forEach(routeEntryFW -> {
                    builder.item(builder -> {
                        builder.route(routeEntryFW.route());
                    });
                });
                builder.item(builder -> {
                    builder.route(routeFW.buffer(), routeFW.offset(), routeFW.sizeof());
                });
            }).build();
            RouteKind routeKind = ReferenceKind.sourceKind(routeFW.role().get()).toRouteKind();
            if (this.layoutSource.test(routeKind)) {
                supplySource(routeFW.source().asString());
            }
            if (this.layoutTarget.test(routeKind)) {
                supplySource(routeFW.target().asString());
            }
            this.routesLayout.unlock();
        }
        return test;
    }

    /* JADX WARN: Type inference failed for: r0v11, types: [org.reaktivity.reaktor.internal.types.state.RouteTableFW$Builder] */
    private boolean doUnrouteInternal(UnrouteFW unrouteFW, MessagePredicate messagePredicate) {
        RouteTableFW wrap = this.routeTableRO.wrap((DirectBuffer) this.routesBuffer, 0, this.routesBufferCapacity);
        if (!$assertionsDisabled && wrap.writeLockReleases() != wrap.writeLockAcquires()) {
            throw new AssertionError();
        }
        int lock = this.routesLayout.lock();
        int sizeof = wrap.sizeof();
        int sizeof2 = this.routeTableRW.wrap2(this.routesBuffer, 0, this.routesBufferCapacity).writeLockAcquires(lock).writeLockReleases(lock - 1).routeEntries(builder -> {
            wrap.routeEntries().forEach(routeEntryFW -> {
                OctetsFW route = routeEntryFW.route();
                RouteFW wrap2 = this.routeRO.wrap(route.buffer(), route.offset(), route.limit());
                if (routeMatchesUnroute(messagePredicate, wrap2, unrouteFW)) {
                    return;
                }
                builder.item(builder -> {
                    builder.route(wrap2.buffer(), wrap2.offset(), wrap2.sizeof());
                });
            });
        }).build().sizeof();
        this.routesLayout.unlock();
        return sizeof > sizeof2;
    }

    @Override // org.reaktivity.nukleus.Nukleus.Composite, org.reaktivity.nukleus.Nukleus, java.lang.AutoCloseable
    public void close() throws Exception {
        this.sourcesByName.forEach((str, source) -> {
            source.detach();
        });
        this.targetsByName.forEach((str2, target) -> {
            target.detach();
        });
        this.targetsByName.forEach((str3, target2) -> {
            CloseHelper.quietClose(target2);
        });
        super.close();
    }

    private Target supplyTargetInternal(String str) {
        return this.targetsByName.computeIfAbsent(str, this::newTarget);
    }

    private Target newTarget(String str) {
        return new Target(str, new StreamsLayout.Builder().path(this.context.targetStreamsPath().apply(str)).streamsCapacity(this.context.streamsBufferCapacity()).readonly(true).build(), this.writeBuffer, this.context.counters(), this.timestamps, this.context.maximumMessagesPerRead(), this.streams, this.throttles);
    }

    private Source supplySource(String str) {
        return this.sourcesByName.computeIfAbsent(str, this::newSource);
    }

    private Source newSource(String str) {
        Context context = this.context;
        MutableDirectBuffer mutableDirectBuffer = this.writeBuffer;
        State state = this.state;
        GroupBudgetManager groupBudgetManager = this.groupBudgetManager;
        Objects.requireNonNull(groupBudgetManager);
        LongFunction longFunction = groupBudgetManager::claim;
        GroupBudgetManager groupBudgetManager2 = this.groupBudgetManager;
        Objects.requireNonNull(groupBudgetManager2);
        return (Source) include(new Source(context, mutableDirectBuffer, this, str, state, longFunction, groupBudgetManager2::release, this.supplyStreamFactoryBuilder, this.correlations, this::supplyTarget, this.streams, this.throttles));
    }

    /* JADX WARN: Type inference failed for: r0v24, types: [org.reaktivity.reaktor.internal.types.control.RouteFW$Builder] */
    private RouteFW generateSourceRefIfNecessary(RouteFW routeFW, boolean z) {
        if (z && routeFW.sourceRef() == 0) {
            Role role = routeFW.role().get();
            long nextRef = ReferenceKind.sourceKind(role).nextRef(this.routeRefs);
            StringFW source = routeFW.source();
            StringFW target = routeFW.target();
            long targetRef = routeFW.targetRef();
            long authorization = routeFW.authorization();
            OctetsFW extension = routeFW.extension();
            routeFW = this.routeRW.wrap2(this.routeBuf, 0, this.routeBuf.capacity()).correlationId(routeFW.correlationId()).role(builder -> {
                builder.set(role);
            }).source(source).sourceRef(nextRef).target(target).targetRef(targetRef).authorization(authorization).extension(builder2 -> {
                builder2.set(extension);
            }).build();
        }
        return routeFW;
    }

    /* JADX WARN: Type inference failed for: r0v24, types: [org.reaktivity.reaktor.internal.types.control.RouteFW$Builder] */
    private RouteFW generateTargetRefIfNecessary(RouteFW routeFW, boolean z) {
        if (z && routeFW.targetRef() == 0) {
            Role role = routeFW.role().get();
            ReferenceKind targetKind = ReferenceKind.targetKind(role);
            long sourceRef = routeFW.sourceRef();
            StringFW source = routeFW.source();
            StringFW target = routeFW.target();
            long nextRef = targetKind.nextRef(this.routeRefs);
            long authorization = routeFW.authorization();
            OctetsFW extension = routeFW.extension();
            routeFW = this.routeRW.wrap2(this.routeBuf, 0, this.routeBuf.capacity()).correlationId(routeFW.correlationId()).role(builder -> {
                builder.set(role);
            }).source(source).sourceRef(sourceRef).target(target).targetRef(nextRef).authorization(authorization).extension(builder2 -> {
                builder2.set(extension);
            }).build();
        }
        return routeFW;
    }

    /* JADX WARN: Type inference failed for: r0v20, types: [org.reaktivity.reaktor.internal.types.control.RouteFW$Builder] */
    private RouteFW generateRouteId(RouteFW routeFW) {
        Role role = routeFW.role().get();
        StringFW source = routeFW.source();
        long sourceRef = routeFW.sourceRef();
        StringFW target = routeFW.target();
        long targetRef = routeFW.targetRef();
        long authorization = routeFW.authorization();
        OctetsFW extension = routeFW.extension();
        return this.routeRW.wrap2(this.routeBuf, 0, this.routeBuf.capacity()).correlationId(0 | (role.ordinal() << 28) | (this.state.supplyRouteId() & 268435455)).role(builder -> {
            builder.set(role);
        }).source(source).sourceRef(sourceRef).target(target).targetRef(targetRef).authorization(authorization).extension(builder2 -> {
            builder2.set(extension);
        }).build();
    }

    private static boolean routeMatchesUnroute(MessagePredicate messagePredicate, RouteFW routeFW, UnrouteFW unrouteFW) {
        return routeFW.role().get() == unrouteFW.role().get() && unrouteFW.source().asString().equals(routeFW.source().asString()) && unrouteFW.sourceRef() == routeFW.sourceRef() && unrouteFW.target().asString().equals(routeFW.target().asString()) && unrouteFW.targetRef() == routeFW.targetRef() && unrouteFW.authorization() == routeFW.authorization() && unrouteFW.extension().equals(routeFW.extension()) && messagePredicate.test(2, routeFW.buffer(), routeFW.offset(), routeFW.sizeof());
    }

    static {
        $assertionsDisabled = !Router.class.desiredAssertionStatus();
    }
}
