/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.network;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.IntStream;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.bytes.OnHeapBytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.util.ThrowingFunction;
import net.openhft.chronicle.network.ConnectionListener;
import net.openhft.chronicle.network.HeaderTcpHandler;
import net.openhft.chronicle.network.NetworkContext;
import net.openhft.chronicle.network.NetworkTestCommon;
import net.openhft.chronicle.network.ServerThreadingStrategy;
import net.openhft.chronicle.network.TCPRegistry;
import net.openhft.chronicle.network.TcpEventHandler;
import net.openhft.chronicle.network.WireTypeSniffingTcpHandler;
import net.openhft.chronicle.network.api.TcpHandler;
import net.openhft.chronicle.network.api.session.WritableSubHandler;
import net.openhft.chronicle.network.cluster.AbstractSubHandler;
import net.openhft.chronicle.network.cluster.Cluster;
import net.openhft.chronicle.network.cluster.ClusterContext;
import net.openhft.chronicle.network.cluster.HostDetails;
import net.openhft.chronicle.network.cluster.VanillaClusteredNetworkContext;
import net.openhft.chronicle.network.cluster.handlers.Registerable;
import net.openhft.chronicle.network.cluster.handlers.RejectedHandlerException;
import net.openhft.chronicle.network.cluster.handlers.UberHandler;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.VanillaWireOutPublisher;
import net.openhft.chronicle.network.connection.WireOutPublisher;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.threads.TimingPauser;
import net.openhft.chronicle.wire.BinaryWire;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import net.openhft.chronicle.wire.YamlLogging;
import org.apache.mina.util.IdentityHashSet;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class UberHandlerTest
extends NetworkTestCommon {
    private static final int SIZE_OF_BIG_PAYLOAD = 204800;
    private static final int ROUNDS_PER_SIZE_CYCLE = 100;
    private static final int NUM_HANDLERS = 4;
    private static final long RUN_TIME_MS = 5000L;
    private static final int FIRST_CID = 12345;
    private static final String TEST_HANDLERS_CSP = "testhandlercsp";
    private static final AtomicBoolean RUNNING = new AtomicBoolean(false);
    private static final AtomicInteger STOPPED = new AtomicInteger(0);
    private static final List<Long> MESSAGES_RECEIVED_CIDS = new ArrayList<Long>();
    private static final AtomicInteger SENDERS_INITIALIZED = new AtomicInteger();
    private static final Map<Long, Integer> COUNTERS_PER_CID = new ConcurrentHashMap<Long, Integer>();
    private static final AtomicBoolean REJECTED_HANDLER_ONREAD_CALLED = new AtomicBoolean(false);
    private static final AtomicBoolean REJECTED_HANDLER_ONWRITE_CALLED = new AtomicBoolean(false);
    private static final AtomicBoolean REJECTING_SUB_HANDLER_SHOULD_REJECT = new AtomicBoolean(false);
    private static final AtomicReference<Map<Object, RegisterableSubHandler>> REGISTRY = new AtomicReference();

    @Before
    public void before() {
        YamlLogging.setAll((boolean)false);
        System.setProperty("TcpEventHandler.tcpBufferSize", "131072");
        COUNTERS_PER_CID.clear();
        RUNNING.set(true);
        STOPPED.set(0);
        MESSAGES_RECEIVED_CIDS.clear();
        SENDERS_INITIALIZED.set(0);
        REJECTING_SUB_HANDLER_SHOULD_REJECT.set(false);
        REJECTED_HANDLER_ONREAD_CALLED.set(false);
        REJECTED_HANDLER_ONWRITE_CALLED.set(false);
        REGISTRY.set(null);
    }

    @After
    public void after() {
        System.clearProperty("TcpEventHandler.tcpBufferSize");
    }

    @Test
    public void testUberHandlerWithMultipleSubHandlersAndHeartbeats() throws IOException, TimeoutException {
        TCPRegistry.createServerSocketChannelFor((String[])new String[]{"initiator", "acceptor"});
        HostDetails initiatorHost = new HostDetails().hostId(2).connectUri("initiator");
        HostDetails acceptorHost = new HostDetails().hostId(1).connectUri("acceptor");
        try (MyClusterContext acceptorCtx = this.clusterContext(acceptorHost, initiatorHost);
             MyClusterContext initiatorCtx = this.clusterContext(initiatorHost, acceptorHost);){
            acceptorCtx.cluster().start(acceptorHost.hostId());
            initiatorCtx.cluster().start(initiatorHost.hostId());
            initiatorCtx.connectionManager(acceptorHost.hostId()).addListener((nc, isConnected) -> {
                if (isConnected) {
                    IntStream.range(0, 4).forEach(seq -> nc.wireOutPublisher().publish(w -> w.writeDocument(true, d -> this.sendHandler(d, 12345 + seq, new PingPongHandler(false)))));
                }
            });
            long startTime = System.currentTimeMillis();
            while (System.currentTimeMillis() - startTime < 5000L && !Thread.currentThread().isInterrupted()) {
                Jvm.pause((long)100L);
            }
            Jvm.startup().on(PingPongHandler.class, "Test complete, stopping handlers countersPerCid=" + COUNTERS_PER_CID);
            this.stopAndWaitTillAllHandlersEnd();
            Assert.assertTrue((boolean)this.pingPongsAllCompletedAtLeastOneRound());
        }
    }

    @Test(expected=IllegalArgumentException.class)
    public void constructorWillThrowIfLocalAndRemoteIdentifiersAreTheSame() {
        BinaryWire wire = new BinaryWire((Bytes)Bytes.allocateElasticOnHeap());
        UberHandler.uberHandler((int)123, (int)123, (WireType)WireType.BINARY).writeMarshallable((WireOut)wire);
    }

    private void stopAndWaitTillAllHandlersEnd() throws TimeoutException {
        RUNNING.set(false);
        TimingPauser pauser = Pauser.balanced();
        while (STOPPED.get() < 8) {
            pauser.pause(10L, TimeUnit.SECONDS);
        }
    }

    private boolean pingPongsAllCompletedAtLeastOneRound() {
        return COUNTERS_PER_CID.size() == 4 && COUNTERS_PER_CID.values().stream().allMatch(val -> val > 0);
    }

    @Test
    public void testHandlerWillCloseWhenHostIdsAreWrong() throws IOException {
        this.expectException("Received a handler for host ID: 98, my host ID is: 1 this is probably a configuration error");
        this.expectException("Closed");
        this.expectException("SubHandler HeartbeatHandler");
        TCPRegistry.createServerSocketChannelFor((String[])new String[]{"initiator", "acceptor"});
        HostDetails initiatorHost = new HostDetails().hostId(99).connectUri("initiator");
        HostDetails acceptorHost = new HostDetails().hostId(1).connectUri("acceptor");
        HostDetails acceptorHostWithInvalidId = new HostDetails().hostId(98).connectUri("acceptor");
        try (MyClusterContext acceptorCtx = this.clusterContext(acceptorHost, initiatorHost);
             MyClusterContext initiatorCtx = this.clusterContext(initiatorHost, acceptorHostWithInvalidId);){
            acceptorCtx.cluster().start(acceptorHost.hostId());
            initiatorCtx.cluster().start(initiatorHost.hostId());
            AtomicBoolean establishedConnection = new AtomicBoolean(false);
            initiatorCtx.connectionManager(acceptorHostWithInvalidId.hostId()).addListener((nc, isConnected) -> {
                if (isConnected) {
                    establishedConnection.set(true);
                }
            });
            Jvm.pause((long)2000L);
            Assert.assertFalse((boolean)establishedConnection.get());
            Assert.assertTrue((boolean)this.exceptions.keySet().stream().anyMatch(k -> k.throwable != null && k.throwable.getMessage().contains("Received a handler for host ID: 98, my host ID is: 1 this is probably a configuration error")));
        }
    }

    @Test
    public void newConnectionListenersAreExecutedOnEventLoopForExistingConnections() throws IOException, TimeoutException {
        TCPRegistry.createServerSocketChannelFor((String[])new String[]{"initiator", "acceptor"});
        HostDetails initiatorHost = new HostDetails().hostId(2).connectUri("initiator");
        HostDetails acceptorHost = new HostDetails().hostId(1).connectUri("acceptor");
        try (MyClusterContext acceptorCtx = this.clusterContext(acceptorHost, initiatorHost);
             MyClusterContext initiatorCtx = this.clusterContext(initiatorHost, acceptorHost);){
            acceptorCtx.cluster().start(acceptorHost.hostId());
            initiatorCtx.cluster().start(initiatorHost.hostId());
            AtomicBoolean establishedConnection = new AtomicBoolean(false);
            AtomicReference networkContext = new AtomicReference();
            initiatorCtx.connectionManager(acceptorHost.hostId()).addListener((nc, isConnected) -> {
                if (isConnected) {
                    establishedConnection.set(true);
                    networkContext.set(nc);
                }
            });
            TimingPauser pauser = Pauser.balanced();
            while (!establishedConnection.get()) {
                pauser.pause(3L, TimeUnit.SECONDS);
            }
            AtomicBoolean executedOnEventLoop = new AtomicBoolean(false);
            initiatorCtx.connectionManager(acceptorHost.hostId()).addListener((nc, isConnected) -> {
                if (isConnected) {
                    Assert.assertSame(networkContext.get(), (Object)nc);
                    executedOnEventLoop.set(EventLoop.inEventLoop());
                }
            });
            pauser.reset();
            while (!executedOnEventLoop.get()) {
                pauser.pause(3L, TimeUnit.SECONDS);
            }
        }
    }

    @Test
    public void testBusyWritingHandlersAreCalledFirstInRoundRobin() throws IOException, TimeoutException {
        TCPRegistry.createServerSocketChannelFor((String[])new String[]{"initiator", "acceptor"});
        HostDetails initiatorHost = new HostDetails().hostId(2).connectUri("initiator");
        HostDetails acceptorHost = new HostDetails().hostId(1).connectUri("acceptor");
        try (MyClusterContext acceptorCtx = this.clusterContext(acceptorHost, initiatorHost);
             MyClusterContext initiatorCtx = this.clusterContext(initiatorHost, acceptorHost);){
            acceptorCtx.cluster().start(acceptorHost.hostId());
            initiatorCtx.cluster().start(initiatorHost.hostId());
            initiatorCtx.connectionManager(acceptorHost.hostId()).addListener((nc, isConnected) -> {
                if (isConnected) {
                    IntStream.range(0, 4).forEach(seq -> nc.wireOutPublisher().publish(w -> w.writeDocument(true, d -> this.sendHandler(d, 12345 + seq, new Receiver()))));
                }
            });
            long startTime = System.currentTimeMillis();
            while (System.currentTimeMillis() - startTime < 5000L && !Thread.currentThread().isInterrupted()) {
                Jvm.pause((long)100L);
            }
            Jvm.startup().on(UberHandlerTest.class, String.format("Test complete, stopping handlers messages received=%,d", MESSAGES_RECEIVED_CIDS.size()));
            this.stopAndWaitTillAllHandlersEnd();
            Assert.assertTrue((boolean)this.messagesWereReceivedInRoundRobinOrder());
        }
    }

    @Test
    public void rejectedOnInitializeHandlersAreRemovedFromReadAndWrite() {
        try (UberHandlerTestHarness testHarness = new UberHandlerTestHarness();){
            this.expectException("Rejected in onInitialize");
            REJECTING_SUB_HANDLER_SHOULD_REJECT.set(true);
            testHarness.registerSubHandler((WriteMarshallable)new WritableRejectingSubHandler());
            this.expectException("handler == null, check that the Csp/Cid has been sent");
            testHarness.sendMessageToCurrentHandler();
            testHarness.callProcess();
            Assert.assertFalse((boolean)REJECTED_HANDLER_ONREAD_CALLED.get());
            Assert.assertFalse((boolean)REJECTED_HANDLER_ONWRITE_CALLED.get());
        }
    }

    @Test
    public void rejectedOnReadHandlersAreRemoveFromReadAndWrite() {
        try (UberHandlerTestHarness testHarness = new UberHandlerTestHarness();){
            testHarness.registerSubHandler((WriteMarshallable)new WritableRejectingSubHandler());
            this.expectException("Rejected in onRead");
            REJECTING_SUB_HANDLER_SHOULD_REJECT.set(true);
            testHarness.sendMessageToCurrentHandler();
            this.expectException("handler == null, check that the Csp/Cid has been sent");
            testHarness.sendMessageToCurrentHandler();
            testHarness.callProcess();
            Assert.assertFalse((boolean)REJECTED_HANDLER_ONWRITE_CALLED.get());
            Assert.assertFalse((boolean)REJECTED_HANDLER_ONREAD_CALLED.get());
        }
    }

    @Test
    public void rejectedOnWriteHandlersAreRemoveFromReadAndWrite() {
        try (UberHandlerTestHarness testHarness = new UberHandlerTestHarness();){
            testHarness.registerSubHandler((WriteMarshallable)new WritableRejectingSubHandler());
            this.expectException("Rejected in onWrite");
            REJECTING_SUB_HANDLER_SHOULD_REJECT.set(true);
            testHarness.callProcess();
            this.expectException("handler == null, check that the Csp/Cid has been sent");
            testHarness.sendMessageToCurrentHandler();
            testHarness.callProcess();
            Assert.assertFalse((boolean)REJECTED_HANDLER_ONWRITE_CALLED.get());
            Assert.assertFalse((boolean)REJECTED_HANDLER_ONREAD_CALLED.get());
        }
    }

    @Test
    public void addHandlerRegistersRegisterableHandlers() {
        try (UberHandlerTestHarness testHarness = new UberHandlerTestHarness();){
            testHarness.registerSubHandler((WriteMarshallable)new RegisterableSubHandler());
            Assert.assertEquals(((Object)((Object)REGISTRY.get().get("registryKey"))).getClass(), RegisterableSubHandler.class);
        }
    }

    @Test
    public void removeHandlerUnregistersRegisterableHandlers() {
        try (UberHandlerTestHarness testHarness = new UberHandlerTestHarness();){
            testHarness.registerSubHandler((WriteMarshallable)new RegisterableSubHandler());
            Assert.assertEquals(((Object)((Object)REGISTRY.get().get("registryKey"))).getClass(), RegisterableSubHandler.class);
            REJECTING_SUB_HANDLER_SHOULD_REJECT.set(true);
            this.expectException("Rejected in onRead");
            testHarness.sendMessageToCurrentHandler();
            Assert.assertTrue((boolean)REGISTRY.get().isEmpty());
        }
    }

    @Test
    public void addHandlerAddsConnectionListenerHandlersToNetworkContext() {
        try (UberHandlerTestHarness testHarness = new UberHandlerTestHarness();){
            testHarness.registerSubHandler((WriteMarshallable)new ConnectionListenerSubHandler());
            Assert.assertEquals((long)1L, (long)testHarness.nc().connectionListeners.size());
        }
    }

    @Test
    public void removeHandlerRemovesConnectionListenerHandlersFromNetworkContext() {
        try (UberHandlerTestHarness testHarness = new UberHandlerTestHarness();){
            testHarness.registerSubHandler((WriteMarshallable)new ConnectionListenerSubHandler());
            Assert.assertEquals((long)1L, (long)testHarness.nc().connectionListeners.size());
            REJECTING_SUB_HANDLER_SHOULD_REJECT.set(true);
            this.expectException("Rejected in onRead");
            testHarness.sendMessageToCurrentHandler();
            Assert.assertEquals((long)0L, (long)testHarness.nc().connectionListeners.size());
        }
    }

    private boolean messagesWereReceivedInRoundRobinOrder() {
        long offset = MESSAGES_RECEIVED_CIDS.get(0) - 12345L;
        for (int i = 0; i < MESSAGES_RECEIVED_CIDS.size(); ++i) {
            long expectedCID = 12345L + ((long)i + offset) % 4L;
            if (expectedCID != MESSAGES_RECEIVED_CIDS.get(i)) {
                return false;
            }
            if (i % 4 != 3) continue;
            ++offset;
        }
        return true;
    }

    private void sendHandler(WireOut wireOut, int cid, Marshallable handler) {
        wireOut.writeEventName((WireKey)CoreFields.csp).text(TEST_HANDLERS_CSP).writeEventName((WireKey)CoreFields.cid).int64((long)cid).writeEventName((WireKey)CoreFields.handler).typedMarshallable((WriteMarshallable)handler);
    }

    private void sendMessageToHandler(WireOut wireOut, int cid) {
        wireOut.writeEventName((WireKey)CoreFields.csp).text(TEST_HANDLERS_CSP).writeEventName((WireKey)CoreFields.cid).int64((long)cid);
    }

    @NotNull
    private MyClusterContext clusterContext(HostDetails ... clusterHosts) {
        MyClusterContext ctx = (MyClusterContext)((MyClusterContext)new MyClusterContext().wireType(WireType.BINARY)).localIdentifier((byte)clusterHosts[0].hostId());
        ctx.heartbeatIntervalMs(500L);
        MyCluster cluster = new MyCluster(ctx);
        for (HostDetails details : clusterHosts) {
            cluster.hostDetails.put(String.valueOf(details.hostId()), details);
        }
        return ctx;
    }

    static class UberHandlerTestHarness
    extends AbstractCloseable {
        private final MyClusterContext clusterContext = new MyClusterContext();
        private final MyClusteredNetworkContext nc = new MyClusteredNetworkContext(this.clusterContext);
        private final UberHandler<MyClusteredNetworkContext> uberHandler;
        private final Wire inWire;
        private final Wire outWire;

        public UberHandlerTestHarness() {
            this.nc.wireOutPublisher((WireOutPublisher)new VanillaWireOutPublisher(this.clusterContext.wireType()));
            this.uberHandler = this.createHandler();
            this.uberHandler.nc((NetworkContext)this.nc);
            this.inWire = (Wire)WireType.BINARY.apply((Object)Bytes.allocateElasticOnHeap());
            this.outWire = (Wire)WireType.BINARY.apply((Object)Bytes.allocateElasticOnHeap());
        }

        private UberHandler<MyClusteredNetworkContext> createHandler() {
            Wire wire = (Wire)WireType.BINARY.apply((Object)Bytes.allocateElasticOnHeap());
            UberHandler.uberHandler((int)123, (int)456, (WireType)WireType.BINARY).writeMarshallable((WireOut)wire);
            try (DocumentContext documentContext = wire.readingDocument();){
                UberHandler uberHandler = (UberHandler)wire.read("handler").object(UberHandler.class);
                return uberHandler;
            }
        }

        private void registerSubHandler(WriteMarshallable subHandler) {
            try (DocumentContext documentContext = this.inWire.writingDocument(true);){
                Wire documentWire = documentContext.wire();
                documentWire.write((WireKey)CoreFields.csp).text("12345");
                documentWire.write((WireKey)CoreFields.cid).int64(12345L);
                documentWire.write((WireKey)CoreFields.handler).typedMarshallable(subHandler);
            }
            this.uberHandler.process(this.inWire.bytes(), this.outWire.bytes(), (NetworkContext)this.nc);
        }

        public void callProcess() {
            this.uberHandler.process(this.inWire.bytes(), this.outWire.bytes(), (NetworkContext)this.nc);
        }

        private void sendMessageToCurrentHandler() {
            try (DocumentContext documentContext = this.inWire.writingDocument(false);){
                Wire documentWire = documentContext.wire();
                documentWire.write((CharSequence)"junk").text("to trigger an onRead");
            }
            this.uberHandler.process(this.inWire.bytes(), this.outWire.bytes(), (NetworkContext)this.nc);
        }

        public MyClusteredNetworkContext nc() {
            return this.nc;
        }

        protected void performClose() throws IllegalStateException {
            Closeable.closeQuietly((Object[])new Object[]{this.clusterContext, this.nc, this.uberHandler, this.inWire, this.outWire});
        }
    }

    static abstract class AbstractCompleteFlaggingHandler
    extends AbstractSubHandler<MyClusteredNetworkContext> {
        private boolean flaggedComplete = false;

        AbstractCompleteFlaggingHandler() {
        }

        protected void flagComplete() {
            if (!this.flaggedComplete) {
                STOPPED.incrementAndGet();
                this.flaggedComplete = true;
                Jvm.startup().on(((Object)((Object)this)).getClass(), String.format("Handler with cid=%s finished", this.cid()));
            }
        }
    }

    static class PingPongHandler
    extends AbstractCompleteFlaggingHandler
    implements Marshallable,
    WritableSubHandler<MyClusteredNetworkContext> {
        private static final int LOGGING_INTERVAL = 50;
        private boolean initiator;
        private boolean initiated = false;
        private int round = 0;

        public PingPongHandler(boolean initiator) {
            this.initiator = initiator;
        }

        public void onInitialize(WireOut outWire) throws RejectedExecutionException {
            Jvm.startup().on(PingPongHandler.class, "Initializing PingPongHandler (cid=" + this.cid() + ", initiator=" + this.initiator + ")");
            if (!this.initiator) {
                outWire.writeDocument(true, d -> d.writeEventName((WireKey)CoreFields.csp).text(UberHandlerTest.TEST_HANDLERS_CSP).writeEventName((WireKey)CoreFields.cid).int64(this.cid()).writeEventName((WireKey)CoreFields.handler).typedMarshallable((WriteMarshallable)new PingPongHandler(!this.initiator)));
            }
        }

        public void onRead(@NotNull WireIn inWire, @NotNull WireOut outWire) {
            block19: {
                this.throwExceptionIfClosed();
                if (!RUNNING.get()) {
                    this.flagComplete();
                    return;
                }
                StringBuilder eventName = Wires.acquireStringBuilder();
                ValueIn valueIn = inWire.readEventName(eventName);
                this.sendPingPongCid(outWire);
                try (DocumentContext dc = outWire.writingDocument(false);){
                    if (this.initiator) {
                        COUNTERS_PER_CID.put(this.cid(), this.round);
                    }
                    ++this.round;
                    if (this.round % 50 == 0) {
                        Jvm.startup().on(PingPongHandler.class, "PingPongHandler at round " + this.round + "(cid=" + this.cid() + ", initiator=" + this.initiator + ")");
                    }
                    if ("ping".equals(eventName.toString())) {
                        assert (valueIn.bytes().length == inWire.read("bytesLength").int32());
                        this.writeRandomJunk("pong", dc, this.round);
                        break block19;
                    }
                    if ("pong".equals(eventName.toString())) {
                        assert (valueIn.bytes().length == inWire.read("bytesLength").int32());
                        this.writeRandomJunk("ping", dc, this.round);
                        break block19;
                    }
                    throw new IllegalStateException("Got unknown event: " + eventName);
                }
            }
        }

        private void writeRandomJunk(String eventName, DocumentContext dc, int counter) {
            OnHeapBytes bigJunk = Bytes.allocateElasticOnHeap();
            int payloadSize = (int)((float)this.round / 100.0f * 204800.0f);
            for (int i = 0; i < payloadSize; ++i) {
                bigJunk.writeByte((byte)i);
            }
            dc.wire().write((CharSequence)eventName).bytes((BytesStore)bigJunk).write((CharSequence)"bytesLength").int32(payloadSize).write((CharSequence)"counter").int32(counter);
        }

        public void onWrite(WireOut outWire) {
            this.throwExceptionIfClosed();
            if (!RUNNING.get()) {
                this.flagComplete();
                return;
            }
            if (this.initiator && !this.initiated) {
                this.sendPingPongCid(outWire);
                try (DocumentContext dc = outWire.writingDocument();){
                    this.writeRandomJunk("ping", dc, this.round++);
                }
                this.initiated = true;
            }
        }

        public void writeMarshallable(@NotNull WireOut wire) {
            wire.write((CharSequence)"initiator").bool(Boolean.valueOf(this.initiator));
        }

        public void readMarshallable(@NotNull WireIn wire) throws IORuntimeException {
            this.initiator = wire.read("initiator").bool();
        }

        private void sendPingPongCid(WireOut outWire) {
            try (DocumentContext dc = outWire.writingDocument(true);){
                dc.wire().write((WireKey)CoreFields.cid).int64(this.cid());
            }
        }
    }

    static class Receiver
    extends AbstractCompleteFlaggingHandler
    implements WritableSubHandler<MyClusteredNetworkContext>,
    Marshallable {
        Receiver() {
        }

        public void onRead(@NotNull WireIn inWire, @NotNull WireOut outWire) {
            MESSAGES_RECEIVED_CIDS.add(this.cid());
        }

        public void onInitialize(WireOut outWire) throws RejectedExecutionException {
            Jvm.startup().on(Receiver.class, "Initializing (cid=" + this.cid() + ")");
            outWire.writeDocument(true, d -> d.writeEventName((WireKey)CoreFields.csp).text(UberHandlerTest.TEST_HANDLERS_CSP).writeEventName((WireKey)CoreFields.cid).int64(this.cid()).writeEventName((WireKey)CoreFields.handler).typedMarshallable((WriteMarshallable)new Sender()));
        }

        public void onWrite(WireOut outWire) {
            if (!RUNNING.get()) {
                this.flagComplete();
            }
        }
    }

    static class Sender
    extends AbstractCompleteFlaggingHandler
    implements WritableSubHandler<MyClusteredNetworkContext>,
    Marshallable {
        public void onRead(@NotNull WireIn inWire, @NotNull WireOut outWire) {
            throw new AssertionError((Object)"The Sender should never receive any messages, the test depends that it is solely a writer");
        }

        public void onInitialize(WireOut outWire) throws RejectedExecutionException {
            Jvm.startup().on(Sender.class, "Initializing (cid=" + this.cid() + ")");
            SENDERS_INITIALIZED.incrementAndGet();
        }

        public void onWrite(WireOut outWire) {
            if (!RUNNING.get()) {
                this.flagComplete();
                return;
            }
            if (SENDERS_INITIALIZED.get() != 4) {
                return;
            }
            try (DocumentContext dc = outWire.writingDocument(true);){
                dc.wire().write((WireKey)CoreFields.cid).int64(this.cid());
            }
            var3_3 = null;
            try (DocumentContext documentContext = outWire.writingDocument();){
                documentContext.wire().write((CharSequence)"hello").text("world");
            }
            catch (Throwable throwable) {
                var3_3 = throwable;
                throw throwable;
            }
        }
    }

    static class MyClusterContext
    extends ClusterContext<MyClusterContext, MyClusteredNetworkContext> {
        MyClusterContext() {
        }

        protected String clusterNamePrefix() {
            return "";
        }

        @NotNull
        public ThrowingFunction<MyClusteredNetworkContext, TcpEventHandler<MyClusteredNetworkContext>, IOException> tcpEventHandlerFactory() {
            return nc -> {
                if (nc.isAcceptor()) {
                    nc.wireOutPublisher((WireOutPublisher)new VanillaWireOutPublisher(this.wireType()));
                }
                TcpEventHandler handler = new TcpEventHandler((NetworkContext)nc);
                Function<MyClusteredNetworkContext, TcpHandler> factory = unused -> new HeaderTcpHandler(handler, o -> (TcpHandler)o);
                WireTypeSniffingTcpHandler sniffer = new WireTypeSniffingTcpHandler(handler, factory);
                handler.tcpHandler((TcpHandler)sniffer);
                return handler;
            };
        }

        protected void defaults() {
            if (this.wireType() == null) {
                this.wireType(WireType.BINARY);
            }
            if (this.wireOutPublisherFactory() == null) {
                this.wireOutPublisherFactory(VanillaWireOutPublisher::new);
            }
            if (this.serverThreadingStrategy() == null) {
                this.serverThreadingStrategy(ServerThreadingStrategy.SINGLE_THREADED);
            }
            if (this.networkContextFactory() == null) {
                this.networkContextFactory(MyClusteredNetworkContext::new);
            }
        }
    }

    static class MyCluster
    extends Cluster<MyClusteredNetworkContext, MyClusterContext> {
        MyCluster(MyClusterContext clusterContext) {
            this.clusterContext(clusterContext);
            clusterContext.cluster(this);
        }
    }

    static class MyClusteredNetworkContext
    extends VanillaClusteredNetworkContext<MyClusteredNetworkContext, MyClusterContext> {
        public Set<ConnectionListener> connectionListeners = new IdentityHashSet();

        public MyClusteredNetworkContext(@NotNull MyClusterContext clusterContext) {
            super((ClusterContext)clusterContext);
        }

        public void addConnectionListener(ConnectionListener connectionListener) {
            this.connectionListeners.add(connectionListener);
        }

        public void removeConnectionListener(ConnectionListener connectionListener) {
            this.connectionListeners.remove(connectionListener);
        }
    }

    private static class RejectingSubHandler
    extends AbstractSubHandler<MyClusteredNetworkContext>
    implements Marshallable {
        protected boolean rejected = false;

        private RejectingSubHandler() {
        }

        public void onRead(@NotNull WireIn inWire, @NotNull WireOut outWire) {
            if (!this.rejected && REJECTING_SUB_HANDLER_SHOULD_REJECT.get()) {
                this.rejected = true;
                throw new RejectedHandlerException("Rejected in onRead");
            }
            if (this.rejected) {
                REJECTED_HANDLER_ONREAD_CALLED.set(true);
            }
        }

        public void onInitialize(WireOut outWire) throws RejectedExecutionException {
            if (!this.rejected && REJECTING_SUB_HANDLER_SHOULD_REJECT.get()) {
                this.rejected = true;
                throw new RejectedHandlerException("Rejected in onInitialize");
            }
        }
    }

    private static class WritableRejectingSubHandler
    extends RejectingSubHandler
    implements WritableSubHandler<MyClusteredNetworkContext> {
        private WritableRejectingSubHandler() {
        }

        public void onWrite(WireOut outWire) {
            if (!this.rejected && REJECTING_SUB_HANDLER_SHOULD_REJECT.get()) {
                this.rejected = true;
                throw new RejectedHandlerException("Rejected in onWrite");
            }
            if (this.rejected) {
                REJECTED_HANDLER_ONWRITE_CALLED.set(true);
            }
        }
    }

    private static class RegisterableSubHandler
    extends RejectingSubHandler
    implements Registerable<RegisterableSubHandler> {
        public static final String REGISTRY_KEY = "registryKey";

        private RegisterableSubHandler() {
        }

        public Object registryKey() {
            return REGISTRY_KEY;
        }

        public void registry(Map<Object, RegisterableSubHandler> registry) {
            REGISTRY.set(registry);
        }
    }

    private static class ConnectionListenerSubHandler
    extends RejectingSubHandler
    implements ConnectionListener {
        private ConnectionListenerSubHandler() {
        }

        public void onConnected(int localIdentifier, int remoteIdentifier, boolean isAcceptor) {
        }

        public void onDisconnected(int localIdentifier, int remoteIdentifier, boolean isAcceptor) {
        }
    }
}

