package io.vertx.test.core;

import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.spi.cluster.jgroups.JGroupsClusterManager;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.jgroups.JChannel;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/test/core/JGroupsSimpleClusterManagerWithCustomJChannelTest.class */
public class JGroupsSimpleClusterManagerWithCustomJChannelTest extends AsyncTestBase {
    private JChannel channel1;
    private JChannel channel2;

    @Before
    public void setUp() throws Exception {
        super.setUp();
        InputStream configStream = JGroupsClusterManager.getConfigStream();
        InputStream configStream2 = JGroupsClusterManager.getConfigStream();
        this.channel1 = new JChannel(configStream);
        this.channel2 = new JChannel(configStream2);
        configStream.close();
        configStream2.close();
    }

    @After
    public void tearDown() throws Exception {
        super.tearDown();
        this.channel1.close();
        this.channel2.close();
    }

    @Test
    public void testEventBusP2P() throws Exception {
        JGroupsClusterManager jGroupsClusterManager = new JGroupsClusterManager(this.channel1);
        JGroupsClusterManager jGroupsClusterManager2 = new JGroupsClusterManager(this.channel2);
        VertxOptions clusterHost = new VertxOptions().setClusterManager(jGroupsClusterManager).setClustered(true).setClusterHost("127.0.0.1");
        VertxOptions clusterHost2 = new VertxOptions().setClusterManager(jGroupsClusterManager2).setClustered(true).setClusterHost("127.0.0.1");
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        Vertx.clusteredVertx(clusterHost, asyncResult -> {
            assertTrue(asyncResult.succeeded());
            assertNotNull(jGroupsClusterManager.getNodeID());
            ((Vertx) asyncResult.result()).eventBus().consumer("news", message -> {
                assertNotNull(message);
                assertTrue(message.body().equals("hello"));
                testComplete();
            });
            atomicReference.set(asyncResult.result());
        });
        waitUntil(() -> {
            return atomicReference.get() != null;
        });
        Vertx.clusteredVertx(clusterHost2, asyncResult2 -> {
            assertTrue(asyncResult2.succeeded());
            assertNotNull(jGroupsClusterManager2.getNodeID());
            atomicReference2.set(asyncResult2.result());
            ((Vertx) asyncResult2.result()).eventBus().send("news", "hello");
        });
        await();
        ((Vertx) atomicReference.get()).close();
        ((Vertx) atomicReference2.get()).close();
    }

    @Test
    public void testEventBusPubSub() throws Exception {
        JGroupsClusterManager jGroupsClusterManager = new JGroupsClusterManager(this.channel1);
        JGroupsClusterManager jGroupsClusterManager2 = new JGroupsClusterManager(this.channel2);
        VertxOptions clusterHost = new VertxOptions().setClusterManager(jGroupsClusterManager).setClustered(true).setClusterHost("127.0.0.1");
        VertxOptions clusterHost2 = new VertxOptions().setClusterManager(jGroupsClusterManager2).setClustered(true).setClusterHost("127.0.0.1");
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        AtomicInteger atomicInteger = new AtomicInteger();
        Vertx.clusteredVertx(clusterHost, asyncResult -> {
            assertTrue(asyncResult.succeeded());
            assertNotNull(jGroupsClusterManager.getNodeID());
            ((Vertx) asyncResult.result()).eventBus().consumer("news", message -> {
                assertNotNull(message);
                assertTrue(message.body().equals("hello"));
                atomicInteger.incrementAndGet();
            });
            atomicReference.set(asyncResult.result());
        });
        waitUntil(() -> {
            return atomicReference.get() != null;
        });
        Vertx.clusteredVertx(clusterHost2, asyncResult2 -> {
            assertTrue(asyncResult2.succeeded());
            assertNotNull(jGroupsClusterManager2.getNodeID());
            atomicReference2.set(asyncResult2.result());
            ((Vertx) asyncResult2.result()).eventBus().publish("news", "hello");
        });
        waitUntil(() -> {
            return atomicInteger.get() == 1;
        });
        ((Vertx) atomicReference.get()).close();
        ((Vertx) atomicReference2.get()).close();
    }

    @Test
    public void testEventBusWithReply() throws Exception {
        JGroupsClusterManager jGroupsClusterManager = new JGroupsClusterManager(this.channel1);
        JGroupsClusterManager jGroupsClusterManager2 = new JGroupsClusterManager(this.channel2);
        VertxOptions clusterHost = new VertxOptions().setClusterManager(jGroupsClusterManager).setClustered(true).setClusterHost("127.0.0.1");
        VertxOptions clusterHost2 = new VertxOptions().setClusterManager(jGroupsClusterManager2).setClustered(true).setClusterHost("127.0.0.1");
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        Vertx.clusteredVertx(clusterHost, asyncResult -> {
            assertTrue(asyncResult.succeeded());
            assertNotNull(jGroupsClusterManager.getNodeID());
            ((Vertx) asyncResult.result()).eventBus().consumer("news", message -> {
                assertNotNull(message);
                assertTrue(message.body().equals("ping"));
                message.reply("pong");
            });
            atomicReference.set(asyncResult.result());
        });
        waitUntil(() -> {
            return atomicReference.get() != null;
        });
        Vertx.clusteredVertx(clusterHost2, asyncResult2 -> {
            assertTrue(asyncResult2.succeeded());
            assertNotNull(jGroupsClusterManager2.getNodeID());
            atomicReference2.set(asyncResult2.result());
            ((Vertx) asyncResult2.result()).eventBus().send("news", "ping", asyncResult2 -> {
                if (asyncResult2.succeeded()) {
                    assertTrue(((Message) asyncResult2.result()).body().equals("pong"));
                    testComplete();
                }
            });
        });
        await();
        ((Vertx) atomicReference.get()).close();
        ((Vertx) atomicReference2.get()).close();
    }

    @Test
    public void testSharedData() throws Exception {
        JGroupsClusterManager jGroupsClusterManager = new JGroupsClusterManager(this.channel1);
        JGroupsClusterManager jGroupsClusterManager2 = new JGroupsClusterManager(this.channel2);
        VertxOptions clusterHost = new VertxOptions().setClusterManager(jGroupsClusterManager).setClustered(true).setClusterHost("127.0.0.1");
        VertxOptions clusterHost2 = new VertxOptions().setClusterManager(jGroupsClusterManager2).setClustered(true).setClusterHost("127.0.0.1");
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        Vertx.clusteredVertx(clusterHost, asyncResult -> {
            assertTrue(asyncResult.succeeded());
            assertNotNull(jGroupsClusterManager.getNodeID());
            ((Vertx) asyncResult.result()).sharedData().getClusterWideMap("mymap1", asyncResult -> {
                ((AsyncMap) asyncResult.result()).put("news", "hello", asyncResult -> {
                    atomicReference.set(asyncResult.result());
                });
            });
        });
        waitUntil(() -> {
            return atomicReference.get() != null;
        });
        Vertx.clusteredVertx(clusterHost2, asyncResult2 -> {
            assertTrue(asyncResult2.succeeded());
            assertNotNull(jGroupsClusterManager2.getNodeID());
            atomicReference2.set(asyncResult2.result());
            ((Vertx) asyncResult2.result()).sharedData().getClusterWideMap("mymap1", asyncResult2 -> {
                ((AsyncMap) asyncResult2.result()).get("news", asyncResult2 -> {
                    assertEquals("hello", asyncResult2.result());
                    testComplete();
                });
            });
        });
        await();
        ((Vertx) atomicReference.get()).close();
        ((Vertx) atomicReference2.get()).close();
    }
}
