package org.cacheonix.impl.net.cluster;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import org.cacheonix.CacheonixTestCase;
import org.cacheonix.ShutdownMode;
import org.cacheonix.TestUtils;
import org.cacheonix.impl.clock.Clock;
import org.cacheonix.impl.clock.ClockImpl;
import org.cacheonix.impl.clock.Time;
import org.cacheonix.impl.cluster.node.state.ReplicatedState;
import org.cacheonix.impl.net.ClusterNodeAddress;
import org.cacheonix.impl.net.multicast.sender.PlainMulticastSender;
import org.cacheonix.impl.net.multicast.server.MulticastServer;
import org.cacheonix.impl.net.multicast.server.MulticastServerImpl;
import org.cacheonix.impl.net.processor.Message;
import org.cacheonix.impl.net.processor.Router;
import org.cacheonix.impl.net.processor.UUID;
import org.cacheonix.impl.net.tcp.Receiver;
import org.cacheonix.impl.net.tcp.Sender;
import org.cacheonix.impl.util.IOUtils;
import org.cacheonix.impl.util.exception.ExceptionUtils;
import org.cacheonix.impl.util.logging.Logger;
import org.cacheonix.impl.util.thread.DaemonThreadFactory;

/* loaded from: input_file:org/cacheonix/impl/net/cluster/ClusterProcessorImplTest.class */
public final class ClusterProcessorImplTest extends CacheonixTestCase {
    private static final Logger LOG = Logger.getLogger(ClusterProcessorImplTest.class);
    private static final int BASE_TCP_PORT = 7676;
    private static final int MULTICAST_PORT = 9999;
    private static final int MULTICAST_TTL = 0;
    private static final long HOME_ALONE_TIMEOUT_MILLIS = 2000;
    private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS = 30000;
    private static final long NETWORK_TIMEOUT_MILLIS = 30000;
    private static final int PROCESS_COUNT = 3;
    private static final String LOCALHOST = "127.0.0.1";
    private static final String MULTICAST_ADDRESS = "228.0.0.1";
    private static final int SINGLE_PART_PACKET_SIZE = 489;
    private static final long WORST_CASE_LATENCY_MILLIS = 30000;
    private static final long SELECTOR_TIMEOUT_MILLIS = 1000;
    private static final long CLUSTER_ANNOUNCEMENT_TIMEOUT_MILLS = 100;
    private static final long CLUSTER_SURVEY_TIMEOUT_MILLS = 500;
    private static final String TEST_CLUSTER_NAME = "Test Cluster Name";
    private final List<ClusterProcessor> clusterProcessors = new ArrayList(3);
    private final List<ClusterNodeAddress> processes = createProcesses();
    private final List<Receiver> servers = new ArrayList(3);
    private final List<TestMarkerCountingRequestDispatcher> messageHandlers = new ArrayList(3);
    private final List<TestMulticastMessageListener> mcastMessageListeners = new ArrayList(3);
    private final List<Sender> senders = new ArrayList(3);
    private final List<Clock> clocks = new ArrayList(3);
    private final List<MulticastServer> multicastServers = new ArrayList(3);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/cacheonix/impl/net/cluster/ClusterProcessorImplTest$TestMulticastMessageListener.class */
    public static final class TestMulticastMessageListener implements MulticastMessageListener {
        private final int[] messageSequenceNumbers;
        private int messageCount = 0;
        private int resetCount = 0;
        private IOException error = null;

        TestMulticastMessageListener(int i) {
            this.messageSequenceNumbers = new int[i];
            Arrays.fill(this.messageSequenceNumbers, 0);
        }

        public final int getMessageCount() throws IOException {
            if (this.error != null) {
                throw this.error;
            }
            return this.messageCount;
        }

        public int getResetCount() {
            return this.resetCount;
        }

        @Override // org.cacheonix.impl.net.cluster.MulticastMessageListener
        public final void receive(Message message) {
            if (this.error != null) {
                return;
            }
            TestMessage testMessage = (TestMessage) message;
            int senderID = testMessage.getSenderID();
            int messageNumber = testMessage.getMessageNumber();
            int i = this.messageSequenceNumbers[senderID];
            if (messageNumber == i) {
                this.messageSequenceNumbers[senderID] = messageNumber + 1;
            } else {
                this.error = new IOException("Received message out of order, message source: " + senderID + ", expected message index: " + i + ", actual message index: " + messageNumber);
            }
            this.messageCount++;
        }

        @Override // org.cacheonix.impl.net.cluster.MulticastMessageListener
        public void notifyClusterNodeJoined(ClusterNodeJoinedEvent clusterNodeJoinedEvent) {
        }

        @Override // org.cacheonix.impl.net.cluster.MulticastMessageListener
        public void notifyClusterNodeLeft(ClusterNodeLeftEvent clusterNodeLeftEvent) {
        }

        @Override // org.cacheonix.impl.net.cluster.MulticastMessageListener
        public void notifyClusterNodeBlocked() {
        }

        @Override // org.cacheonix.impl.net.cluster.MulticastMessageListener
        public void notifyClusterNodeUnblocked() {
        }

        @Override // org.cacheonix.impl.net.cluster.MulticastMessageListener
        public void notifyReset() {
            this.resetCount++;
        }

        public String toString() {
            return "TestMulticastMessageListener{messageSequenceNumbers=" + Arrays.toString(this.messageSequenceNumbers) + ", messageCount=" + this.messageCount + ", resetCount=" + this.resetCount + ", error=" + this.error + '}';
        }
    }

    public void testMessagesGetSentInParallelSent() throws InterruptedException, IOException {
        Thread[] threadArr = new Thread[3];
        DaemonThreadFactory daemonThreadFactory = new DaemonThreadFactory("Sender");
        for (int i = 0; i < 3; i++) {
            Thread newThread = daemonThreadFactory.newThread(new TimedMulticastMessageSender(getClusterProcessor(i), i, 2));
            threadArr[i] = newThread;
            newThread.start();
        }
        for (Thread thread : threadArr) {
            thread.join();
        }
        Thread.sleep(3000L);
        for (int i2 = 0; i2 < 3; i2++) {
            int markerCount = getHandler(i2).getMarkerCount();
            if (LOG.isDebugEnabled()) {
                LOG.debug("tokenCount: " + markerCount + " for process index: " + i2);
            }
        }
        for (int i3 = 0; i3 < 3; i3++) {
            TestMulticastMessageListener mcastMessageListener = getMcastMessageListener(i3);
            assertTrue("Message listener # " + i3 + " should receive min 10 messages, received: " + mcastMessageListener.getMessageCount(), mcastMessageListener.getMessageCount() >= 10);
            LOG.debug("Process " + i3 + " received " + mcastMessageListener.getMessageCount() + " messages");
        }
    }

    public void testNewMemberJoins() throws InterruptedException, IOException {
        ClusterNodeAddress createProcess = createProcess(3);
        this.processes.add(createProcess);
        UUID randomUUID = UUID.randomUUID();
        Router router = new Router(createProcess);
        router.setClusterUUID(randomUUID);
        ClusterProcessor createClusterProcessor = createClusterProcessor(createProcess, InetAddress.getByName("228.0.0.1"), 9999, router, randomUUID);
        createClusterProcessor.getProcessorState().setReplicateState(new ReplicatedState());
        router.register(ClusterProcessorKey.getInstance(), createClusterProcessor);
        router.register(MulticastClientProcessorKey.getInstance(), createClusterProcessor);
        router.register(ReplicatedStateProcessorKey.getInstance(), createClusterProcessor);
        this.clusterProcessors.add(createClusterProcessor);
        TestMarkerCountingRequestDispatcher testMarkerCountingRequestDispatcher = new TestMarkerCountingRequestDispatcher(3, getClusterProcessor(3));
        this.messageHandlers.add(testMarkerCountingRequestDispatcher);
        Clock attachTo = new ClockImpl(1000L).attachTo(new Timer());
        Receiver receiver = new Receiver(attachTo, "127.0.0.1", getAddress(3).getTcpPort(), testMarkerCountingRequestDispatcher, 30000L, 1000L);
        receiver.startup();
        this.servers.add(receiver);
        TestMulticastMessageListener testMulticastMessageListener = new TestMulticastMessageListener(3);
        this.mcastMessageListeners.add(testMulticastMessageListener);
        getClusterProcessor(3).subscribeMulticastMessageListener(testMulticastMessageListener);
        Sender sender = new Sender(getClusterProcessor(3).getAddress(), 30000L, 1000L, attachTo);
        this.senders.add(sender);
        router.setOutput(sender);
        sender.setRouter(router);
        sender.startup();
        if (LOG.isDebugEnabled()) {
            LOG.debug("================================== Begin Join ===================================");
        }
        getClusterProcessor(3).startup();
        MulticastServerImpl multicastServerImpl = new MulticastServerImpl(InetAddress.getByName("228.0.0.1"), 9999, createProcess.getTcpPort());
        multicastServerImpl.addListener(createClusterProcessor);
        this.multicastServers.add(multicastServerImpl);
        multicastServerImpl.startup();
        Thread.sleep(10000L);
        if (LOG.isDebugEnabled()) {
            LOG.debug("================================== Finished waiting ===================================");
        }
        Iterator<ClusterProcessor> it = this.clusterProcessors.iterator();
        while (it.hasNext()) {
            assertEquals("Normal", it.next().getProcessorState().getStateName());
        }
        for (int i = 0; i < this.clusterProcessors.size(); i++) {
            assertEquals(this.clusterProcessors.size(), this.clusterProcessors.get(i).getProcessorState().getClusterView().getSize());
        }
        TestMarkerCountingRequestDispatcher handler = getHandler(3);
        int markerCount = handler.getMarkerCount();
        LOG.debug("tokenCount: " + markerCount + " for joining process index: 3, handler index: " + handler.getConnectionIndex());
        assertTrue(markerCount > 0);
    }

    public void testRingGetsReformedUponFailure() throws InterruptedException {
        Thread.sleep(CLUSTER_ANNOUNCEMENT_TIMEOUT_MILLS);
        getMulticastServer(1).shutdown();
        getTCPServer(1).shutdown();
        this.senders.get(1).shutdown();
        getClusterProcessor(1).shutdown(ShutdownMode.FORCED_SHUTDOWN);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Begin to wait for 3000 milliseconds for cluster to stabilize");
        }
        Thread.sleep(3000L);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Finished waiting for 3000 milliseconds for cluster to stabilize");
        }
    }

    public void testMarkerGetsPassed() throws InterruptedException {
        Thread.sleep(1000L);
        for (int i = 0; i < 3; i++) {
            int markerCount = getHandler(i).getMarkerCount();
            if (LOG.isDebugEnabled()) {
                LOG.debug("tokenCount: " + markerCount + " for process index: " + i);
            }
            assertTrue(markerCount > 0);
        }
        ArrayList<Time> arrayList = new ArrayList(3);
        for (int i2 = 0; i2 < 3; i2++) {
            arrayList.add(this.clocks.get(0).currentTime());
        }
        for (Time time : arrayList) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Time subtract = time.subtract((Time) it.next());
                assertEquals(0L, subtract.getMillis());
                assertTrue(Math.abs(subtract.getCount()) <= 2);
            }
        }
    }

    public void testMessagesGetSent() throws InterruptedException, IOException {
        for (int i = 0; i < 10; i++) {
            getClusterProcessor(1).post(new TestMessage(1, i, TestUtils.makeTestObject(SINGLE_PART_PACKET_SIZE)));
        }
        Thread.sleep(1000L);
        for (int i2 = 0; i2 < 3; i2++) {
            if (i2 != 1) {
                assertEquals("Message listener # " + i2 + " should receive 10 messages", 10, getMcastMessageListener(i2).getMessageCount());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.cacheonix.CacheonixTestCase
    public void setUp() throws Exception {
        super.setUp();
        for (int i = 0; i < 3; i++) {
            InetAddress byName = InetAddress.getByName("228.0.0.1");
            ClusterNodeAddress address = getAddress(i);
            int tcpPort = address.getTcpPort();
            UUID randomUUID = UUID.randomUUID();
            Router router = new Router(address);
            router.setClusterUUID(randomUUID);
            ClusterProcessor createClusterProcessor = createClusterProcessor(address, byName, 9999, router, randomUUID);
            createClusterProcessor.getProcessorState().setReplicateState(new ReplicatedState());
            this.clusterProcessors.add(createClusterProcessor);
            router.register(ClusterProcessorKey.getInstance(), createClusterProcessor);
            router.register(MulticastClientProcessorKey.getInstance(), createClusterProcessor);
            router.register(ReplicatedStateProcessorKey.getInstance(), createClusterProcessor);
            TestMulticastMessageListener testMulticastMessageListener = new TestMulticastMessageListener(3);
            this.mcastMessageListeners.add(testMulticastMessageListener);
            getClusterProcessor(i).subscribeMulticastMessageListener(testMulticastMessageListener);
            Clock attachTo = new ClockImpl(1000L).attachTo(new Timer());
            this.clocks.add(attachTo);
            Sender sender = new Sender(address, 30000L, 1000L, attachTo);
            sender.setRouter(router);
            router.setOutput(sender);
            this.senders.add(sender);
            sender.startup();
            TestMarkerCountingRequestDispatcher testMarkerCountingRequestDispatcher = new TestMarkerCountingRequestDispatcher(i, this.clusterProcessors.get(i));
            this.messageHandlers.add(testMarkerCountingRequestDispatcher);
            Receiver receiver = new Receiver(attachTo, "127.0.0.1", address.getTcpPort(), testMarkerCountingRequestDispatcher, 30000L, 1000L);
            this.servers.add(receiver);
            receiver.startup();
            getClusterProcessor(i).startup();
            MulticastServerImpl multicastServerImpl = new MulticastServerImpl(byName, 9999, tcpPort);
            this.multicastServers.add(multicastServerImpl);
            multicastServerImpl.addListener(createClusterProcessor);
            multicastServerImpl.startup();
        }
        Thread.sleep(HOME_ALONE_TIMEOUT_MILLIS);
    }

    private Receiver getTCPServer(int i) {
        return this.servers.get(i);
    }

    private MulticastServer getMulticastServer(int i) {
        return this.multicastServers.get(i);
    }

    private ClusterNodeAddress getAddress(int i) {
        return this.processes.get(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.cacheonix.CacheonixTestCase
    public void tearDown() throws Exception {
        shutdownClusterServices();
        shutdownMulticastServers();
        shutdownTCPServers();
        Iterator<Sender> it = this.senders.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        super.tearDown();
    }

    private void shutdownTCPServers() {
        for (int i = 0; i < this.servers.size(); i++) {
            Receiver tCPServer = getTCPServer(i);
            if (!tCPServer.isShutDown()) {
                tCPServer.shutdown();
            }
        }
    }

    private void shutdownMulticastServers() {
        for (int i = 0; i < this.servers.size(); i++) {
            IOUtils.shutdownHard(getMulticastServer(i));
        }
    }

    private void shutdownClusterServices() {
        for (int i = 0; i < this.clusterProcessors.size(); i++) {
            ClusterProcessor clusterProcessor = getClusterProcessor(i);
            if (!clusterProcessor.isShutdown()) {
                clusterProcessor.shutdown();
            }
        }
    }

    private static List<ClusterNodeAddress> createProcesses() {
        ArrayList arrayList = new ArrayList(3);
        for (int i = 0; i < 3; i++) {
            try {
                arrayList.add(createProcess(i));
            } catch (IOException e) {
                throw ExceptionUtils.createIllegalStateException(e);
            }
        }
        return arrayList;
    }

    private static ClusterNodeAddress createProcess(int i) throws IOException {
        return ClusterNodeAddress.createAddress("127.0.0.1", 7676 + i);
    }

    private TestMarkerCountingRequestDispatcher getHandler(int i) {
        return this.messageHandlers.get(i);
    }

    private ClusterProcessor getClusterProcessor(int i) {
        return this.clusterProcessors.get(i);
    }

    private TestMulticastMessageListener getMcastMessageListener(int i) {
        return this.mcastMessageListeners.get(i);
    }

    private ClusterProcessor createClusterProcessor(ClusterNodeAddress clusterNodeAddress, InetAddress inetAddress, int i, Router router, UUID uuid) throws IOException {
        return new ClusterProcessorImpl(TEST_CLUSTER_NAME, getClock(), getTimer(), router, new PlainMulticastSender(inetAddress, i, 0), clusterNodeAddress, HOME_ALONE_TIMEOUT_MILLIS, 30000L, 30000L, CLUSTER_SURVEY_TIMEOUT_MILLS, CLUSTER_ANNOUNCEMENT_TIMEOUT_MILLS, uuid);
    }

    public String toString() {
        return "ClusterProcessorImplTest{clusterProcessors=" + this.clusterProcessors + ", processes=" + this.processes + ", servers=" + this.servers + ", messageHandlers=" + this.messageHandlers + ", mcastMessageListeners=" + this.mcastMessageListeners + "} " + super.toString();
    }
}
