package io.vertx.test.core;

import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.jgroups.JGroupsClusterManager;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/vertx/test/core/JGroupsClusteredEventbusTest.class */
public class JGroupsClusteredEventbusTest extends ClusteredEventBusTest {

    @Rule
    public JGroupsCleanupRule testingJGroups = new JGroupsCleanupRule(true);
    public static final String WRAPPED_CHANNEL = "wrapper-channel";
    private static final String ADDRESS1 = "some-address1";
    private static final Logger log = LoggerFactory.getLogger(JGroupsClusteredEventbusTest.class);
    private static int VERTX_NODE_0 = 0;
    private static int VERTX_NODE_1 = 1;
    private static int VERTX_NODE_2 = 2;

    protected ClusterManager getClusterManager() {
        return new JGroupsClusterManager();
    }

    @Test
    public void testSubsRemovedForKilledNode2() throws Exception {
        startNodes(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        this.vertices[VERTX_NODE_1].eventBus().consumer(WRAPPED_CHANNEL, message -> {
            log.info("Wrapper channel received request");
            this.vertices[VERTX_NODE_1].eventBus().send(ADDRESS1, "foo" + ((Integer) message.body()).intValue(), asyncResult -> {
                assertEquals("ok", ((Message) asyncResult.result()).body().toString().substring(0, 2));
                message.reply("ok");
            });
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.vertices[VERTX_NODE_0].eventBus().consumer(ADDRESS1, message2 -> {
            int andIncrement = atomicInteger.getAndIncrement();
            assertEquals(message2.body(), "foo" + andIncrement);
            log.info(">> receive and reply in consumer before kill");
            message2.reply("ok" + andIncrement);
            if (andIncrement > 1) {
                fail("too many messages");
            }
        }).completionHandler(onSuccess(r6 -> {
            this.vertices[VERTX_NODE_1].runOnContext(r7 -> {
                log.info("Send request through the wrapper");
                this.vertices[VERTX_NODE_1].eventBus().send(WRAPPED_CHANNEL, 0, asyncResult -> {
                    log.info("Received request through the wrapper");
                    if (asyncResult.succeeded()) {
                        countDownLatch.countDown();
                    }
                });
            });
        }));
        awaitLatch(countDownLatch);
        kill(VERTX_NODE_0);
        Thread.sleep(2000L);
        addNodes(1);
        Thread.sleep(500L);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.vertices[VERTX_NODE_2].eventBus().consumer(ADDRESS1, message3 -> {
            int andIncrement = atomicInteger.getAndIncrement();
            assertEquals(message3.body(), "foo" + andIncrement);
            message3.reply("ok" + andIncrement);
            if (andIncrement == 0) {
                fail("should not get first messages");
            }
        }).completionHandler(onSuccess(r62 -> {
            this.vertices[VERTX_NODE_1].runOnContext(r8 -> {
                this.vertices[VERTX_NODE_1].eventBus().send(ADDRESS1, "foo1", asyncResult -> {
                    log.info(">>> direct request is " + asyncResult.succeeded());
                    if (asyncResult.failed()) {
                        fail("I should receive success reply");
                    }
                    countDownLatch2.countDown();
                });
            });
        }));
        awaitLatch(countDownLatch2);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        this.vertices[VERTX_NODE_2].runOnContext(r7 -> {
            log.info("Send request through the wrapper");
            this.vertices[VERTX_NODE_2].eventBus().send(WRAPPED_CHANNEL, 2, asyncResult -> {
                log.info("Received request through the wrapper");
                if (asyncResult.succeeded()) {
                    countDownLatch3.countDown();
                }
            });
        });
        awaitLatch(countDownLatch3);
        complete();
    }

    private void removeNode(int i) {
        Vertx[] vertxArr = new Vertx[this.vertices.length - 1];
        int i2 = 0;
        for (int i3 = 0; i3 < this.vertices.length; i3++) {
            if (i3 != i) {
                vertxArr[i2] = this.vertices[i3];
                i2++;
            }
        }
        this.vertices = vertxArr;
    }

    protected void addNodes(int i) {
        System.out.println("Before add node");
        addNodes(i, getOptions());
    }

    protected void addNodes(int i, VertxOptions vertxOptions) {
        CountDownLatch countDownLatch = new CountDownLatch(i);
        Vertx[] vertxArr = this.vertices;
        int length = i + this.vertices.length;
        this.vertices = new Vertx[length];
        for (int i2 = 0; i2 < vertxArr.length; i2++) {
            this.vertices[i2] = vertxArr[i2];
        }
        for (int length2 = vertxArr.length; length2 < length; length2++) {
            int i3 = length2;
            Vertx.clusteredVertx(vertxOptions.setClusterHost("localhost").setClusterPort(0).setClustered(true).setClusterManager(getClusterManager()), asyncResult -> {
                if (asyncResult.failed()) {
                    asyncResult.cause().printStackTrace();
                }
                assertTrue("Failed to start node", asyncResult.succeeded());
                this.vertices[i3] = (Vertx) asyncResult.result();
                countDownLatch.countDown();
            });
        }
        try {
            assertTrue(countDownLatch.await(2L, TimeUnit.MINUTES));
        } catch (InterruptedException e) {
            fail(e.getMessage());
        }
        log.info("Added new nodes count " + i + ", total count " + this.vertices.length);
    }

    protected void kill(int i) {
        VertxInternal vertxInternal = this.vertices[i];
        vertxInternal.executeBlocking(future -> {
            vertxInternal.simulateKill();
            future.complete();
        }, asyncResult -> {
            assertTrue(asyncResult.succeeded());
        });
    }
}
