/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.internal.agent;

import java.util.BitSet;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongConsumer;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;
import org.agrona.hints.ThreadHints;
import org.reaktivity.nukleus.AgentBuilder;
import org.reaktivity.nukleus.Elektron;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.buffer.CountingBufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.route.RouteKind;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;
import org.reaktivity.nukleus.stream.StreamFactoryBuilder;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.internal.Counters;
import org.reaktivity.reaktor.internal.LabelManager;
import org.reaktivity.reaktor.internal.buffer.DefaultBufferPool;
import org.reaktivity.reaktor.internal.layouts.MetricsLayout;
import org.reaktivity.reaktor.internal.layouts.StreamsLayout;
import org.reaktivity.reaktor.internal.router.GroupBudgetManager;
import org.reaktivity.reaktor.internal.router.Resolver;
import org.reaktivity.reaktor.internal.router.RouteId;
import org.reaktivity.reaktor.internal.router.StreamId;
import org.reaktivity.reaktor.internal.router.Target;
import org.reaktivity.reaktor.internal.router.WriteCounters;
import org.reaktivity.reaktor.internal.types.stream.AbortFW;
import org.reaktivity.reaktor.internal.types.stream.BeginFW;
import org.reaktivity.reaktor.internal.types.stream.FrameFW;
import org.reaktivity.reaktor.internal.types.stream.ResetFW;
import org.reaktivity.reaktor.internal.types.stream.SignalFW;

public class ElektronAgent
implements Agent {
    private static final Pattern ADDRESS_PATTERN = Pattern.compile("^([^#]+)(:?#.*)$");
    private final ThreadLocal<SignalFW.Builder> signalRW = ThreadLocal.withInitial(ElektronAgent::newSignalRW);
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final int localIndex;
    private final ReaktorConfiguration config;
    private final LabelManager labels;
    private final ExecutorService executor;
    private final Function<String, BitSet> affinityMask;
    private final String elektronName;
    private final Counters counters;
    private final boolean timestamps;
    private final MetricsLayout metricsLayout;
    private final StreamsLayout streamsLayout;
    private final RingBuffer streamsBuffer;
    private final MutableDirectBuffer writeBuffer;
    private final Int2ObjectHashMap<MessageConsumer>[] streams;
    private final Int2ObjectHashMap<MessageConsumer>[] throttles;
    private final Long2ObjectHashMap<ReadCounters> countersByRouteId;
    private final Int2ObjectHashMap<MessageConsumer> writersByIndex;
    private final Int2ObjectHashMap<Target> targetsByIndex;
    private final Map<String, ElektronRef> elektronByName;
    private final BufferPool bufferPool;
    private final long mask;
    private final MessageHandler readHandler;
    private final int readLimit;
    private final GroupBudgetManager groupBudgetManager;
    private final LongFunction<? extends ReadCounters> newReadCounters;
    private final IntFunction<MessageConsumer> supplyWriter;
    private final IntFunction<Target> newTarget;
    private final LongFunction<WriteCounters> newWriteCounters;
    private final LongFunction<Affinity> resolveAffinity;
    private final RouteManager resolver;
    private final Int2ObjectHashMap<StreamFactory> streamFactoriesByAddressId;
    private final Long2ObjectHashMap<Affinity> affinityByRemoteId;
    private final Supplier<DirectBuffer> routesBufferRef;
    private long streamId;
    private long traceId;
    private long groupId;
    private volatile Agent[] agents;

    public ElektronAgent(int index, int count, ReaktorConfiguration config, LabelManager labels, ExecutorService executor, Function<String, BitSet> affinityMask, Supplier<DirectBuffer> routesBufferRef, Supplier<AgentBuilder> supplyAgentBuilder) {
        long mask;
        this.localIndex = index;
        this.config = config;
        this.labels = labels;
        this.executor = executor;
        this.affinityMask = affinityMask;
        this.routesBufferRef = routesBufferRef;
        MetricsLayout metricsLayout = new MetricsLayout.Builder().path(config.directory().resolve(String.format("metrics%d", index))).labelsBufferCapacity(config.counterLabelsBufferCapacity()).valuesBufferCapacity(config.counterValuesBufferCapacity()).readonly(false).build();
        StreamsLayout streamsLayout = new StreamsLayout.Builder().path(config.directory().resolve(String.format("data%d", index))).streamsCapacity(config.streamsBufferCapacity()).readonly(false).build();
        this.elektronName = String.format("reaktor/data#%d", index);
        this.metricsLayout = metricsLayout;
        this.streamsLayout = streamsLayout;
        this.groupBudgetManager = new GroupBudgetManager();
        CountersManager countersManager = new CountersManager(metricsLayout.labelsBuffer(), metricsLayout.valuesBuffer());
        this.counters = new Counters(countersManager);
        this.timestamps = config.timestamps();
        this.readLimit = config.maximumMessagesPerRead();
        this.streamsBuffer = streamsLayout.streamsBuffer();
        this.writeBuffer = new UnsafeBuffer(new byte[this.streamsBuffer.maxMsgLength()]);
        this.streams = this.initDispatcher();
        this.throttles = this.initDispatcher();
        this.countersByRouteId = new Long2ObjectHashMap();
        this.streamFactoriesByAddressId = new Int2ObjectHashMap();
        this.readHandler = this::handleRead;
        this.newReadCounters = this::newReadCounters;
        this.supplyWriter = this::supplyWriter;
        this.newTarget = this::newTarget;
        this.newWriteCounters = this::newWriteCounters;
        this.resolveAffinity = this::resolveAffinity;
        this.elektronByName = new ConcurrentHashMap<String, ElektronRef>();
        this.affinityByRemoteId = new Long2ObjectHashMap();
        this.targetsByIndex = new Int2ObjectHashMap();
        this.writersByIndex = new Int2ObjectHashMap();
        this.agents = new Agent[0];
        this.resolver = new ResolverRef(this::newResolver);
        int bufferPoolCapacity = config.bufferPoolCapacity();
        int bufferSlotCapacity = config.bufferSlotCapacity();
        DefaultBufferPool bufferPool = new DefaultBufferPool(bufferPoolCapacity, bufferSlotCapacity);
        int reserved = 8;
        int bits = 56;
        long initial = (long)index << 56;
        this.mask = mask = initial | 0xFFFFFFFFFFFFFFL;
        this.bufferPool = bufferPool;
        this.streamId = initial;
        this.traceId = initial;
        this.groupId = initial;
        if (supplyAgentBuilder != null) {
            AgentBuilder agentBuilder = supplyAgentBuilder.get();
            Agent agent = agentBuilder.setRouteManager(this.resolver).setExecutor(this::executeAndSignal).setWriteBuffer(this.writeBuffer).setAddressIdSupplier(labels::supplyLabelId).setStreamFactorySupplier(this::supplyStreamFactory).setThrottleSupplier(this::supplyThrottle).setThrottleRemover(this::removeThrottle).setInitialIdSupplier(this::supplyInitialId).setReplyIdSupplier(this::supplyReplyId).setTraceIdSupplier(this::supplyTrace).setGroupIdSupplier(this::supplyGroupId).setGroupBudgetClaimer(this.groupBudgetManager::claim).setGroupBudgetReleaser(this.groupBudgetManager::release).setBufferPool(bufferPool).build();
            this.agents = ArrayUtil.add(this.agents, agent);
        }
    }

    private Resolver newResolver() {
        return new Resolver(this.routesBufferRef, this.throttles, this::supplyInitialWriter);
    }

    @Override
    public String roleName() {
        return this.elektronName;
    }

    @Override
    public int doWork() throws Exception {
        int workDone = 0;
        for (Agent agent : this.agents) {
            workDone += agent.doWork();
        }
        try {
            workDone += this.streamsBuffer.read(this.readHandler, this.readLimit);
        }
        catch (Throwable ex) {
            ex.addSuppressed(new Exception(String.format("[%s]\t[0x%016x] %s", this.elektronName, this.streamId, this.streamsLayout)));
            LangUtil.rethrowUnchecked(ex);
        }
        return workDone;
    }

    public long counter(String name) {
        LongSupplier counter = this.counters.readonlyCounter(name);
        return counter != null ? counter.getAsLong() : 0L;
    }

    @Override
    public void onClose() {
        while (this.config.drainOnClose() && this.streamsBuffer.consumerPosition() < this.streamsBuffer.producerPosition()) {
            ThreadHints.onSpinWait();
        }
        for (Agent agent : this.agents) {
            agent.onClose();
        }
        for (int senderIndex = 0; senderIndex < this.streams.length; ++senderIndex) {
            int senderIndex0 = senderIndex;
            this.streams[senderIndex].forEach((id, handler) -> this.doSyntheticAbort(StreamId.streamId(this.localIndex, senderIndex0, id), (MessageConsumer)handler));
        }
        this.targetsByIndex.forEach((k, v) -> v.detach());
        this.targetsByIndex.forEach((k, v) -> CloseHelper.quietClose(v));
        CloseHelper.quietClose(this.streamsLayout);
        CloseHelper.quietClose(this.metricsLayout);
        if (this.bufferPool.acquiredSlots() != 0) {
            throw new IllegalStateException("Buffer pool has unreleased slots: " + this.bufferPool.acquiredSlots());
        }
    }

    public String toString() {
        return this.elektronName;
    }

    public void onRouteable(long routeId, Nukleus nukleus) {
        String nukleusName = nukleus.name();
        int localAddressId = RouteId.localId(routeId);
        String localAddress = this.labels.lookupLabel(localAddressId);
        BitSet affinity = this.affinityMask.apply(localAddress);
        if (affinity.get(this.localIndex)) {
            this.elektronByName.computeIfAbsent(nukleusName, name -> new ElektronRef((String)name, nukleus.supplyElektron()));
        }
    }

    public void onRouted(Nukleus nukleus, RouteKind routeKind, long routeId) {
        String nukleusName = nukleus.name();
        int localAddressId = RouteId.localId(routeId);
        String localAddress = this.labels.lookupLabel(localAddressId);
        BitSet affinity = this.affinityMask.apply(localAddress);
        if (affinity.get(this.localIndex)) {
            this.elektronByName.computeIfPresent(nukleusName, (a, r) -> r.assign(routeKind, localAddressId));
        }
    }

    public void onUnrouted(Nukleus nukleus, RouteKind routeKind, long routeId) {
        String nukleusName = nukleus.name();
        int localAddressId = RouteId.localId(routeId);
        String localAddress = this.labels.lookupLabel(localAddressId);
        BitSet affinity = this.affinityMask.apply(localAddress);
        if (affinity.get(this.localIndex)) {
            this.elektronByName.computeIfPresent(nukleusName, (a, r) -> r.unassign(routeKind, localAddressId));
        }
    }

    private void handleRead(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        FrameFW frame = this.frameRO.wrap(buffer, index, index + length);
        long streamId = frame.streamId();
        long routeId = frame.routeId();
        if (StreamId.isInitial(streamId)) {
            this.handleReadInitial(routeId, streamId, msgTypeId, buffer, index, length);
        } else {
            this.handleReadReply(routeId, streamId, msgTypeId, buffer, index, length);
        }
    }

    private void handleReadInitial(long routeId, long streamId, int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        int instanceId = StreamId.instanceId(streamId);
        if ((msgTypeId & 0x40000000) == 0) {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.streams[StreamId.streamIndex(streamId)];
            MessageConsumer handler = dispatcher.get(instanceId);
            if (handler != null) {
                switch (msgTypeId) {
                    case 1: {
                        handler.accept(msgTypeId, buffer, index, length);
                        break;
                    }
                    case 2: {
                        handler.accept(msgTypeId, buffer, index, length);
                        break;
                    }
                    case 3: {
                        handler.accept(msgTypeId, buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 4: {
                        handler.accept(msgTypeId, buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 5: {
                        handler.accept(msgTypeId, buffer, index, length);
                        break;
                    }
                    default: {
                        this.doReset(routeId, streamId);
                        break;
                    }
                }
            } else if (msgTypeId == 1) {
                MessageConsumer newHandler = this.handleBeginInitial(msgTypeId, buffer, index, length);
                if (newHandler != null) {
                    newHandler.accept(msgTypeId, buffer, index, length);
                } else {
                    this.doReset(routeId, streamId);
                }
            }
        } else {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
            MessageConsumer throttle = dispatcher.get(instanceId);
            if (throttle != null) {
                ReadCounters counters = this.countersByRouteId.computeIfAbsent(routeId, this.newReadCounters);
                switch (msgTypeId) {
                    case 0x40000002: {
                        counters.windows.increment();
                        throttle.accept(msgTypeId, buffer, index, length);
                        break;
                    }
                    case 0x40000001: {
                        counters.resets.increment();
                        throttle.accept(msgTypeId, buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                }
            }
        }
    }

    private void handleReadReply(long routeId, long streamId, int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        int instanceId = StreamId.instanceId(streamId);
        if ((msgTypeId & 0x40000000) == 0) {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.streams[StreamId.streamIndex(streamId)];
            MessageConsumer handler = dispatcher.get(instanceId);
            if (handler != null) {
                ReadCounters counters = this.countersByRouteId.computeIfAbsent(routeId, this.newReadCounters);
                switch (msgTypeId) {
                    case 1: {
                        counters.opens.increment();
                        handler.accept(msgTypeId, buffer, index, length);
                        break;
                    }
                    case 2: {
                        counters.frames.increment();
                        counters.bytes.getAndAdd(buffer.getInt(index + 53));
                        handler.accept(msgTypeId, buffer, index, length);
                        break;
                    }
                    case 3: {
                        counters.closes.increment();
                        handler.accept(msgTypeId, buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 4: {
                        counters.aborts.increment();
                        handler.accept(msgTypeId, buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 5: {
                        handler.accept(msgTypeId, buffer, index, length);
                        break;
                    }
                    default: {
                        this.doReset(routeId, streamId);
                        break;
                    }
                }
            } else if (msgTypeId == 1) {
                MessageConsumer newHandler = this.handleBeginReply(msgTypeId, buffer, index, length);
                if (newHandler != null) {
                    ReadCounters counters = this.countersByRouteId.computeIfAbsent(routeId, this.newReadCounters);
                    counters.opens.increment();
                    newHandler.accept(msgTypeId, buffer, index, length);
                } else {
                    this.doReset(routeId, streamId);
                }
            }
        } else {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
            MessageConsumer throttle = dispatcher.get(instanceId);
            if (throttle != null) {
                switch (msgTypeId) {
                    case 0x40000002: {
                        throttle.accept(msgTypeId, buffer, index, length);
                        break;
                    }
                    case 0x40000001: {
                        throttle.accept(msgTypeId, buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                }
            }
        }
    }

    private MessageConsumer handleBeginInitial(int msgTypeId, DirectBuffer buffer, int index, int length) {
        MessageConsumer replyTo;
        BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
        long routeId = begin.routeId();
        long streamId = begin.streamId();
        int addressId = RouteId.remoteId(routeId);
        MessageConsumer newStream = null;
        StreamFactory streamFactory = this.streamFactoriesByAddressId.get(addressId);
        if (streamFactory != null && (newStream = streamFactory.newStream(msgTypeId, buffer, index, length, replyTo = this.supplyReplyTo(streamId))) != null) {
            this.streams[StreamId.streamIndex(streamId)].put(StreamId.instanceId(streamId), newStream);
        }
        return newStream;
    }

    private MessageConsumer handleBeginReply(int msgTypeId, DirectBuffer buffer, int index, int length) {
        MessageConsumer replyTo;
        BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
        long routeId = begin.routeId();
        long streamId = begin.streamId();
        int labelId = RouteId.localId(routeId);
        MessageConsumer newStream = null;
        StreamFactory streamFactory = this.streamFactoriesByAddressId.get(labelId);
        if (streamFactory != null && (newStream = streamFactory.newStream(msgTypeId, buffer, index, length, replyTo = this.supplyReplyTo(streamId))) != null) {
            this.streams[StreamId.streamIndex(streamId)].put(StreamId.instanceId(streamId), newStream);
        }
        return newStream;
    }

    private void doReset(long routeId, long streamId) {
        ResetFW reset = this.resetRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).build();
        MessageConsumer replyTo = this.supplyReplyTo(streamId);
        replyTo.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof());
    }

    private void doSyntheticAbort(long streamId, MessageConsumer stream) {
        long syntheticAbortRouteId = 0L;
        AbortFW abort = this.abortRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(streamId).build();
        stream.accept(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof());
    }

    private MessageConsumer supplyReplyTo(long streamId) {
        int index = StreamId.streamIndex(streamId);
        return this.writersByIndex.computeIfAbsent(index, this.supplyWriter);
    }

    private MessageConsumer supplyInitialWriter(long streamId) {
        int index = StreamId.remoteIndex(streamId);
        return this.writersByIndex.computeIfAbsent(index, this.supplyWriter);
    }

    private MessageConsumer supplyWriter(int index) {
        return this.supplyTarget(index).writeHandler();
    }

    private Target supplyTarget(int index) {
        return this.targetsByIndex.computeIfAbsent(index, this.newTarget);
    }

    private Target newTarget(int index) {
        return new Target(this.config, index, this.writeBuffer, this.streams, this.throttles, this.newWriteCounters);
    }

    private ReadCounters newReadCounters(long routeId) {
        int localId = RouteId.localId(routeId);
        String nukleus = this.nukleus(localId);
        return new ReadCounters(this.counters, nukleus, routeId);
    }

    private WriteCounters newWriteCounters(long routeId) {
        int localId = RouteId.localId(routeId);
        String nukleus = this.nukleus(localId);
        return new WriteCounters(this.counters, nukleus, routeId);
    }

    private String nukleus(int localId) {
        String localAddress = this.labels.lookupLabel(localId);
        Matcher matcher = ADDRESS_PATTERN.matcher(localAddress);
        matcher.matches();
        return matcher.group(1);
    }

    private StreamFactory newStreamFactory(Function<String, LongSupplier> supplyCounter, Function<String, LongConsumer> supplyAccumulator, Supplier<BufferPool> supplyCountingBufferPool, StreamFactoryBuilder streamFactoryBuilder) {
        return streamFactoryBuilder.setRouteManager(this.resolver).setExecutor(this::executeAndSignal).setWriteBuffer(this.writeBuffer).setInitialIdSupplier(this::supplyInitialId).setReplyIdSupplier(this::supplyReplyId).setTraceSupplier(this::supplyTrace).setGroupIdSupplier(this::supplyGroupId).setGroupBudgetClaimer(this.groupBudgetManager::claim).setGroupBudgetReleaser(this.groupBudgetManager::release).setCounterSupplier(supplyCounter).setAccumulatorSupplier(supplyAccumulator).setBufferPoolSupplier(supplyCountingBufferPool).build();
    }

    private StreamFactory supplyStreamFactory(int addressId) {
        return this.streamFactoriesByAddressId.get(addressId);
    }

    private MessageConsumer supplyThrottle(long streamId) {
        int instanceId = StreamId.instanceId(streamId);
        Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
        return dispatcher.get(instanceId);
    }

    private void removeThrottle(long streamId) {
        int instanceId = StreamId.instanceId(streamId);
        Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
        dispatcher.remove(instanceId);
    }

    private long supplyInitialId(long routeId) {
        int remoteId = RouteId.remoteId(routeId);
        int remoteIndex = this.resolveRemoteIndex(remoteId);
        this.streamId += 2L;
        this.streamId &= this.mask;
        return (long)remoteIndex << 48 & 0xFF000000000000L | this.streamId & 0xFF00FFFFFFFFFFFFL | 1L;
    }

    private long supplyReplyId(long initialId) {
        assert (StreamId.isInitial(initialId));
        return initialId & 0xFFFFFFFFFFFFFFFEL;
    }

    private long supplyGroupId() {
        ++this.groupId;
        this.groupId &= this.mask;
        return this.groupId;
    }

    private long supplyTrace() {
        ++this.traceId;
        this.traceId &= this.mask;
        return this.traceId;
    }

    private Future<?> executeAndSignal(Runnable task, long routeId, long streamId, long signalId) {
        if (this.executor != null) {
            return this.executor.submit(() -> this.invokeAndSignal(task, routeId, streamId, signalId));
        }
        this.invokeAndSignal(task, routeId, streamId, signalId);
        return new Future<Void>(){

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return false;
            }

            @Override
            public boolean isCancelled() {
                return false;
            }

            @Override
            public boolean isDone() {
                return true;
            }

            @Override
            public Void get() throws InterruptedException, ExecutionException {
                return null;
            }

            @Override
            public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return null;
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void invokeAndSignal(Runnable task, long routeId, long streamId, long signalId) {
        long l;
        try {
            task.run();
            l = this.timestamps ? System.nanoTime() : 0L;
        }
        catch (Throwable throwable) {
            long timestamp = this.timestamps ? System.nanoTime() : 0L;
            SignalFW signal = this.signalRW.get().rewrap().routeId(routeId).streamId(streamId).timestamp(timestamp).signalId(signalId).build();
            this.streamsBuffer.write(signal.typeId(), signal.buffer(), signal.offset(), signal.sizeof());
            throw throwable;
        }
        long timestamp = l;
        SignalFW signal = this.signalRW.get().rewrap().routeId(routeId).streamId(streamId).timestamp(timestamp).signalId(signalId).build();
        this.streamsBuffer.write(signal.typeId(), signal.buffer(), signal.offset(), signal.sizeof());
    }

    private int resolveRemoteIndex(int remoteId) {
        Affinity affinity = this.supplyAffinity(remoteId);
        BitSet mask = affinity.mask;
        int remoteIndex = affinity.nextIndex;
        assert (mask.cardinality() != 0);
        if (remoteIndex != this.localIndex) {
            int nextIndex = affinity.mask.nextSetBit(remoteIndex + 1);
            if (nextIndex == -1) {
                nextIndex = affinity.mask.nextSetBit(0);
            }
            affinity.nextIndex = nextIndex;
        }
        return remoteIndex;
    }

    private Affinity supplyAffinity(int remoteId) {
        return this.affinityByRemoteId.computeIfAbsent(remoteId, this.resolveAffinity);
    }

    public Affinity resolveAffinity(long remoteIdAsLong) {
        int remoteId = (int)(remoteIdAsLong & 0xFFFFFFFFL);
        String remoteAddress = this.labels.lookupLabel(remoteId);
        BitSet mask = this.affinityMask.apply(remoteAddress);
        if (mask.cardinality() == 0) {
            throw new IllegalStateException(String.format("affinity mask must specify at least one bit: %s %d", remoteAddress, mask));
        }
        Affinity affinity = new Affinity();
        affinity.mask = mask;
        affinity.nextIndex = mask.get(this.localIndex) ? this.localIndex : mask.nextSetBit(0);
        return affinity;
    }

    private static SignalFW.Builder newSignalRW() {
        UnsafeBuffer buffer = new UnsafeBuffer(new byte[512]);
        return new SignalFW.Builder().wrap(buffer, 0, buffer.capacity());
    }

    private Int2ObjectHashMap<MessageConsumer>[] initDispatcher() {
        Int2ObjectHashMap[] dispatcher = new Int2ObjectHashMap[64];
        for (int i = 0; i < dispatcher.length; ++i) {
            dispatcher[i] = new Int2ObjectHashMap();
        }
        return dispatcher;
    }

    static /* synthetic */ Agent[] access$1002(ElektronAgent x0, Agent[] x1) {
        x0.agents = x1;
        return x1;
    }

    private static class Affinity {
        BitSet mask;
        int nextIndex;

        private Affinity() {
        }
    }

    private final class ElektronRef {
        private final Elektron elektron;
        private final Map<RouteKind, StreamFactory> streamFactories;
        private int count;

        private ElektronRef(String nukleusName, Elektron elekron) {
            this.elektron = Objects.requireNonNull(elekron);
            EnumMap<RouteKind, StreamFactory> streamFactories = new EnumMap<RouteKind, StreamFactory>(RouteKind.class);
            HashMap countersByName = new HashMap();
            Function<String, AtomicCounter> newCounter = ElektronAgent.this.counters::counter;
            Function<String, LongSupplier> supplyCounter = name -> () -> ((AtomicCounter)countersByName.computeIfAbsent(name, newCounter)).increment() + 1L;
            Function<String, LongConsumer> supplyAccumulator = name -> inc -> ElektronAgent.this.counters.counter((String)name).getAndAdd(inc);
            AtomicCounter acquires = ElektronAgent.this.counters.counter(String.format("%s.acquires", nukleusName));
            AtomicCounter releases = ElektronAgent.this.counters.counter(String.format("%s.releases", nukleusName));
            CountingBufferPool countingPool = new CountingBufferPool(ElektronAgent.this.bufferPool, acquires::increment, releases::increment);
            Supplier<BufferPool> supplyCountingBufferPool = () -> countingPool;
            for (RouteKind routeKind : EnumSet.allOf(RouteKind.class)) {
                StreamFactoryBuilder streamFactoryBuilder = this.elektron.streamFactoryBuilder(routeKind);
                if (streamFactoryBuilder == null) continue;
                StreamFactory streamFactory = ElektronAgent.this.newStreamFactory(supplyCounter, supplyAccumulator, supplyCountingBufferPool, streamFactoryBuilder);
                streamFactories.put(routeKind, streamFactory);
            }
            this.streamFactories = streamFactories;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ElektronRef assign(RouteKind routeKind, int labelId) {
            ElektronRef elektronRef = this;
            synchronized (elektronRef) {
                StreamFactory streamFactory;
                Agent agent;
                if (this.count == 0 && (agent = this.elektron.agent()) != null) {
                    ElektronAgent.access$1002(ElektronAgent.this, ArrayUtil.add(ElektronAgent.this.agents, agent));
                }
                if ((streamFactory = this.streamFactories.get((Object)routeKind)) != null) {
                    ElektronAgent.this.streamFactoriesByAddressId.put(labelId, streamFactory);
                }
                ++this.count;
            }
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ElektronRef unassign(RouteKind routeKind, int labelId) {
            ElektronRef elektronRef = this;
            synchronized (elektronRef) {
                --this.count;
                if (this.count == 0) {
                    StreamFactory streamFactory = (StreamFactory)ElektronAgent.this.streamFactoriesByAddressId.remove(labelId);
                    assert (streamFactory == this.streamFactories.get((Object)routeKind));
                    final Agent agent = this.elektron.agent();
                    if (agent != null) {
                        ElektronAgent.access$1002(ElektronAgent.this, ArrayUtil.remove(ElektronAgent.this.agents, agent));
                        Agent closeAgent = new Agent(){

                            @Override
                            public int doWork() throws Exception {
                                CloseHelper.quietClose(agent::onClose);
                                ElektronAgent.access$1002(ElektronAgent.this, ArrayUtil.remove(ElektronAgent.this.agents, this));
                                return 1;
                            }

                            @Override
                            public String roleName() {
                                return String.format("%s (deferred close)", agent.roleName());
                            }
                        };
                        ElektronAgent.access$1002(ElektronAgent.this, ArrayUtil.add(ElektronAgent.this.agents, closeAgent));
                    }
                }
            }
            return this;
        }
    }

    private static final class ReadCounters {
        private final AtomicCounter opens;
        private final AtomicCounter closes;
        private final AtomicCounter aborts;
        private final AtomicCounter windows;
        private final AtomicCounter resets;
        private final AtomicCounter bytes;
        private final AtomicCounter frames;

        ReadCounters(Counters counters, String nukleus, long routeId) {
            this.opens = counters.counter(String.format("%s.%d.opens.read", nukleus, routeId));
            this.closes = counters.counter(String.format("%s.%d.closes.read", nukleus, routeId));
            this.aborts = counters.counter(String.format("%s.%d.aborts.read", nukleus, routeId));
            this.windows = counters.counter(String.format("%s.%d.windows.read", nukleus, routeId));
            this.resets = counters.counter(String.format("%s.%d.resets.read", nukleus, routeId));
            this.bytes = counters.counter(String.format("%s.%d.bytes.read", nukleus, routeId));
            this.frames = counters.counter(String.format("%s.%d.frames.read", nukleus, routeId));
        }
    }

    private static class ResolverRef
    implements RouteManager {
        private final ThreadLocal<Resolver> resolver;

        ResolverRef(Supplier<Resolver> supplyResolver) {
            this.resolver = ThreadLocal.withInitial(supplyResolver);
        }

        @Override
        public <R> R resolveExternal(long authorization, MessagePredicate filter, MessageFunction<R> mapper) {
            return this.resolver.get().resolveExternal(authorization, filter, mapper);
        }

        @Override
        public <R> R resolve(long routeId, long authorization, MessagePredicate filter, MessageFunction<R> mapper) {
            return this.resolver.get().resolve(routeId, authorization, filter, mapper);
        }

        @Override
        public void forEach(MessageConsumer consumer) {
            this.resolver.get().forEach(consumer);
        }

        @Override
        public MessageConsumer supplyReceiver(long streamId) {
            return this.resolver.get().supplyReceiver(streamId);
        }

        @Override
        public void setThrottle(long streamId, MessageConsumer throttle) {
            this.resolver.get().setThrottle(streamId, throttle);
        }
    }
}

