package org.apache.ignite.raft.jraft.core;

import com.codahale.metrics.ConsoleReporter;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Iterator;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.JoinableClosure;
import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.closure.TaskClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.core.Replicator;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.Task;
import org.apache.ignite.raft.jraft.entity.UserLog;
import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException;
import org.apache.ignite.raft.jraft.error.LogNotFoundException;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.option.BootstrapOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.ReadOnlyOption;
import org.apache.ignite.raft.jraft.rpc.RpcClientEx;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.storage.impl.DefaultLogStorageFactory;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.ThroughputSnapshotThrottle;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Bits;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@ExtendWith({WorkDirectoryExtension.class})
/* loaded from: input_file:org/apache/ignite/raft/jraft/core/ItNodeTest.class */
public class ItNodeTest {
    private static final IgniteLogger LOG = IgniteLogger.forClass(ItNodeTest.class);
    private static DumpThread dumpThread;
    private String dataPath;
    private long testStartMs;
    private TestCluster cluster;
    private TestInfo testInfo;
    private final AtomicInteger startedCounter = new AtomicInteger(0);
    private final AtomicInteger stoppedCounter = new AtomicInteger(0);
    private final List<RaftGroupService> services = new ArrayList();
    private final List<ExecutorService> executors = new ArrayList();
    private final List<FixedThreadsExecutorGroup> appendEntriesExecutors = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ignite/raft/jraft/core/ItNodeTest$ChangeArg.class */
    public static class ChangeArg {
        TestCluster c;
        List<PeerId> peers;
        volatile boolean stop;
        boolean dontRemoveFirstPeer;

        ChangeArg(TestCluster testCluster, List<PeerId> list, boolean z, boolean z2) {
            this.c = testCluster;
            this.peers = list;
            this.stop = z;
            this.dontRemoveFirstPeer = z2;
        }
    }

    /* loaded from: input_file:org/apache/ignite/raft/jraft/core/ItNodeTest$DumpThread.class */
    private static class DumpThread extends Thread {
        private static long DUMP_TIMEOUT_MS = 300000;
        private volatile boolean stopped = false;

        private DumpThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.stopped) {
                try {
                    Thread.sleep(DUMP_TIMEOUT_MS);
                    ItNodeTest.LOG.info("Test hang too long, dump threads", new Object[0]);
                    TestUtils.dumpThreads();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/ignite/raft/jraft/core/ItNodeTest$MockFSM1.class */
    static class MockFSM1 extends MockStateMachine {
        MockFSM1() {
            this(new Endpoint("0.0.0.0", 0));
        }

        MockFSM1(Endpoint endpoint) {
            super(endpoint);
        }

        @Override // org.apache.ignite.raft.jraft.core.MockStateMachine
        public boolean onSnapshotLoad(SnapshotReader snapshotReader) {
            return false;
        }
    }

    /* loaded from: input_file:org/apache/ignite/raft/jraft/core/ItNodeTest$UserReplicatorStateListener.class */
    class UserReplicatorStateListener implements Replicator.ReplicatorStateListener {
        UserReplicatorStateListener() {
        }

        public void onCreated(PeerId peerId) {
            ItNodeTest.LOG.info("Replicator has been created {} {}", new Object[]{peerId, Integer.valueOf(ItNodeTest.this.startedCounter.incrementAndGet())});
        }

        public void onError(PeerId peerId, Status status) {
            ItNodeTest.LOG.info("Replicator has errors {} {}", new Object[]{peerId, status});
        }

        public void onDestroyed(PeerId peerId) {
            ItNodeTest.LOG.info("Replicator has been destroyed {} {}", new Object[]{peerId, Integer.valueOf(ItNodeTest.this.stoppedCounter.incrementAndGet())});
        }
    }

    @BeforeAll
    public static void setupNodeTest() {
        dumpThread = new DumpThread();
        dumpThread.setName("NodeTest-DumpThread");
        dumpThread.setDaemon(true);
        dumpThread.start();
    }

    @AfterAll
    public static void tearNodeTest() throws Exception {
        dumpThread.stopped = true;
        dumpThread.interrupt();
        dumpThread.join(100L);
    }

    @BeforeEach
    public void setup(TestInfo testInfo, @WorkDirectory Path path) throws Exception {
        LOG.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getDisplayName(), new Object[0]);
        this.testInfo = testInfo;
        this.dataPath = path.toString();
        this.testStartMs = Utils.monotonicMs();
        dumpThread.interrupt();
    }

    @AfterEach
    public void teardown() throws Exception {
        this.services.forEach(raftGroupService -> {
            try {
                raftGroupService.shutdown();
            } catch (Exception e) {
                LOG.error("Error while closing a service", e);
            }
        });
        this.executors.forEach(ExecutorServiceHelper::shutdownAndAwaitTermination);
        this.appendEntriesExecutors.forEach((v0) -> {
            v0.shutdownGracefully();
        });
        if (this.cluster != null) {
            this.cluster.stopAll();
        }
        this.startedCounter.set(0);
        this.stoppedCounter.set(0);
        TestUtils.assertAllJraftThreadsStopped();
        LOG.info(">>>>>>>>>>>>>>> End test method: " + this.testInfo.getDisplayName() + ", cost:" + (Utils.monotonicMs() - this.testStartMs) + " ms.", new Object[0]);
    }

    @Test
    public void testInitShutdown() {
        Endpoint endpoint = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        NodeOptions createNodeOptions = createNodeOptions(0);
        createNodeOptions.setFsm(new MockStateMachine(endpoint));
        createNodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        createNodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        createService("unittest", new PeerId(endpoint, 0), createNodeOptions).start();
    }

    @Test
    public void testNodeTaskOverload() throws Exception {
        Endpoint endpoint = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        PeerId peerId = new PeerId(endpoint, 0);
        NodeOptions createNodeOptions = createNodeOptions(0);
        RaftOptions raftOptions = new RaftOptions();
        raftOptions.setDisruptorBufferSize(2);
        createNodeOptions.setRaftOptions(raftOptions);
        createNodeOptions.setFsm(new MockStateMachine(endpoint));
        createNodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        createNodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        createNodeOptions.setInitialConf(new Configuration(Collections.singletonList(peerId)));
        Node start = createService("unittest", new PeerId(endpoint, 0), createNodeOptions).start();
        Assertions.assertEquals(1, start.listPeers().size());
        Assertions.assertTrue(start.listPeers().contains(peerId));
        do {
        } while (!start.isLeader());
        ArrayList arrayList = new ArrayList();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i < 10; i++) {
            int i2 = i;
            Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes(StandardCharsets.UTF_8)), new JoinableClosure(status -> {
                LOG.info("{} i={}", new Object[]{status, Integer.valueOf(i2)});
                if (!status.isOk()) {
                    Assertions.assertTrue(status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM);
                }
                atomicInteger.incrementAndGet();
            }));
            start.apply(task);
            arrayList.add(task);
        }
        Task.joinAll(arrayList, TimeUnit.SECONDS.toMillis(30L));
        Assertions.assertEquals(10, atomicInteger.get());
    }

    @Test
    public void testRollbackStateMachineWithReadIndex_Issue317() throws Exception {
        PeerId peerId = new PeerId(new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT), 0);
        NodeOptions createNodeOptions = createNodeOptions(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger(-1);
        final String displayName = this.testInfo.getDisplayName();
        createNodeOptions.setFsm(new StateMachineAdapter() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.1
            public void onApply(Iterator iterator) {
                countDownLatch3.countDown();
                try {
                    countDownLatch2.await();
                } catch (InterruptedException e) {
                    Assertions.fail();
                }
                int i = 0;
                while (iterator.hasNext()) {
                    int i2 = Bits.getInt(((ByteBuffer) iterator.next()).array(), 0);
                    int i3 = i;
                    i++;
                    Assertions.assertEquals(i3, i2);
                    atomicInteger.set(i2);
                }
                if (i > 0) {
                    atomicInteger.set(i - 1);
                    iterator.setErrorAndRollback(1L, new Status(-1, displayName));
                    countDownLatch.countDown();
                }
            }
        });
        createNodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        createNodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        createNodeOptions.setInitialConf(new Configuration(Collections.singletonList(peerId)));
        Node start = createService("unittest", peerId, createNodeOptions).start();
        Assertions.assertEquals(1, start.listPeers().size());
        Assertions.assertTrue(start.listPeers().contains(peerId));
        do {
        } while (!start.isLeader());
        for (int i = 0; i < 5; i++) {
            byte[] bArr = new byte[4];
            Bits.putInt(bArr, 0, i);
            start.apply(new Task(ByteBuffer.wrap(bArr), (Closure) null));
        }
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        countDownLatch3.await();
        final CountDownLatch countDownLatch4 = new CountDownLatch(1);
        start.readIndex((byte[]) null, new ReadIndexClosure() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.2
            public void run(Status status, long j, byte[] bArr2) {
                try {
                    if (status.isOk()) {
                        atomicInteger2.incrementAndGet();
                    } else {
                        Assertions.assertTrue(status.getErrorMsg().contains(displayName) || status.getRaftError() == RaftError.ETIMEDOUT || status.getErrorMsg().contains("Invalid state for readIndex: STATE_ERROR"), "Unexpected status: " + status);
                    }
                } finally {
                    countDownLatch4.countDown();
                }
            }
        });
        countDownLatch2.countDown();
        waitForCondition(() -> {
            return !start.isLeader();
        }, 5000L);
        countDownLatch4.await();
        countDownLatch.await();
        Assertions.assertEquals(0, atomicInteger2.get());
        Assertions.assertTrue(5 - 1 >= atomicInteger.get());
    }

    @Test
    public void testSingleNode() throws Exception {
        Endpoint endpoint = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        PeerId peerId = new PeerId(endpoint, 0);
        NodeOptions createNodeOptions = createNodeOptions(0);
        MockStateMachine mockStateMachine = new MockStateMachine(endpoint);
        createNodeOptions.setFsm(mockStateMachine);
        createNodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        createNodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        createNodeOptions.setInitialConf(new Configuration(Collections.singletonList(peerId)));
        Node start = createService("unittest", peerId, createNodeOptions).start();
        Assertions.assertEquals(1, start.listPeers().size());
        Assertions.assertTrue(start.listPeers().contains(peerId));
        do {
        } while (!start.isLeader());
        sendTestTaskAndWait(start);
        Assertions.assertEquals(10, mockStateMachine.getLogs().size());
        int i = 0;
        for (ByteBuffer byteBuffer : mockStateMachine.getLogs()) {
            int i2 = i;
            i++;
            Assertions.assertEquals("hello" + i2, stringFromBytes(byteBuffer.array()));
        }
    }

    private String stringFromBytes(byte[] bArr) {
        return new String(bArr, StandardCharsets.UTF_8);
    }

    @Test
    public void testNoLeader() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        Assertions.assertTrue(this.cluster.start(generatePeers.get(0).getEndpoint()));
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(1, followers.size());
        Node node = followers.get(0);
        sendTestTaskAndWait(node, 0, RaftError.EPERM);
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), 5006);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        node.addPeer(peerId, new ExpectClosure(RaftError.EPERM, countDownLatch));
        waitLatch(countDownLatch);
        PeerId peerId2 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        node.removePeer(peerId2, new ExpectClosure(RaftError.EPERM, countDownLatch2));
        waitLatch(countDownLatch2);
    }

    @Test
    public void testTripleNodesWithReplicatorStateListener() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        UserReplicatorStateListener userReplicatorStateListener = new UserReplicatorStateListener();
        UserReplicatorStateListener userReplicatorStateListener2 = new UserReplicatorStateListener();
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, new LinkedHashSet(), TestCluster.ELECTION_TIMEOUT_MILLIS, nodeOptions -> {
            nodeOptions.setReplicationStateListeners(List.of(userReplicatorStateListener, userReplicatorStateListener2));
        }, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        this.cluster.ensureLeader(this.cluster.getLeader());
        for (Node node : this.cluster.getFollowers()) {
            waitForCondition(() -> {
                return node.getLeaderId() != null;
            }, 5000L);
        }
        Assertions.assertEquals(4, this.startedCounter.get());
        Assertions.assertEquals(2, this.cluster.getLeader().getReplicatorStateListeners().size());
        Assertions.assertEquals(2, this.cluster.getFollowers().get(0).getReplicatorStateListeners().size());
        Assertions.assertEquals(2, this.cluster.getFollowers().get(1).getReplicatorStateListeners().size());
        java.util.Iterator<NodeImpl> it2 = this.cluster.getNodes().iterator();
        while (it2.hasNext()) {
            it2.next().removeReplicatorStateListener(userReplicatorStateListener);
        }
        Assertions.assertEquals(1, this.cluster.getLeader().getReplicatorStateListeners().size());
        Assertions.assertEquals(1, this.cluster.getFollowers().get(0).getReplicatorStateListeners().size());
        Assertions.assertEquals(1, this.cluster.getFollowers().get(1).getReplicatorStateListeners().size());
    }

    @Disabled
    @Test
    public void testVoteTimedoutStepDown() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        NodeImpl leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        sendTestTaskAndWait(leader);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertFalse(followers.isEmpty());
        java.util.Iterator<Node> it2 = followers.iterator();
        while (it2.hasNext()) {
            Assertions.assertTrue(this.cluster.stop(it2.next().getNodeId().getPeerId().getEndpoint()));
        }
        while (leader.isLeader()) {
            Thread.sleep(10L);
        }
        leader.tryElectSelf();
        Thread.sleep(1500L);
        Assertions.assertNull(this.cluster.getLeader());
        java.util.Iterator<Node> it3 = followers.iterator();
        while (it3.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it3.next().getNodeId().getPeerId().getEndpoint()));
        }
        this.cluster.ensureSame();
    }

    @Test
    public void testLeaderTransferWithReplicatorStateListener() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, new LinkedHashSet(), TestCluster.ELECTION_TIMEOUT_MILLIS, nodeOptions -> {
            nodeOptions.setReplicationStateListeners(List.of(new UserReplicatorStateListener()));
        }, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        this.cluster.ensureLeader(leader);
        sendTestTaskAndWait(leader);
        Thread.sleep(100L);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertTrue(waitForCondition(() -> {
            return this.startedCounter.get() == 2;
        }, 5000L), this.startedCounter.get());
        PeerId copy = followers.get(0).getNodeId().getPeerId().copy();
        LOG.info("Transfer leadership from {} to {}", new Object[]{leader, copy});
        Assertions.assertTrue(leader.transferLeadershipTo(copy).isOk());
        Thread.sleep(1000L);
        this.cluster.waitLeader();
        Assertions.assertTrue(waitForCondition(() -> {
            return this.startedCounter.get() == 4;
        }, 5000L), this.startedCounter.get());
        java.util.Iterator<NodeImpl> it2 = this.cluster.getNodes().iterator();
        while (it2.hasNext()) {
            it2.next().clearReplicatorStateListeners();
        }
        Assertions.assertEquals(0, this.cluster.getLeader().getReplicatorStateListeners().size());
        Assertions.assertEquals(0, this.cluster.getFollowers().get(0).getReplicatorStateListeners().size());
        Assertions.assertEquals(0, this.cluster.getFollowers().get(1).getReplicatorStateListeners().size());
    }

    @Test
    public void testTripleNodes() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        sendTestTaskAndWait(leader);
        leader.apply(new Task(ByteBuffer.wrap("no closure".getBytes(StandardCharsets.UTF_8)), (Closure) null));
        ByteBuffer wrap = ByteBuffer.wrap("task closure".getBytes(StandardCharsets.UTF_8));
        final List synchronizedList = Collections.synchronizedList(new ArrayList());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.apply(new Task(wrap, new TaskClosure() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.3
            public void run(Status status) {
                synchronizedList.add("apply");
                countDownLatch.countDown();
            }

            public void onCommitted() {
                synchronizedList.add("commit");
            }
        }));
        countDownLatch.await();
        Assertions.assertEquals(2, synchronizedList.size());
        Assertions.assertEquals("commit", synchronizedList.get(0));
        Assertions.assertEquals("apply", synchronizedList.get(1));
        this.cluster.ensureSame();
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
    }

    @Test
    public void testSingleNodeWithLearner() throws Exception {
        Endpoint endpoint = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        PeerId peerId = new PeerId(endpoint, 0);
        Endpoint endpoint2 = new Endpoint(TestUtils.getLocalAddress(), 5004);
        PeerId peerId2 = new PeerId(endpoint2, 0);
        NodeOptions createNodeOptions = createNodeOptions(0);
        MockStateMachine mockStateMachine = new MockStateMachine(endpoint2);
        createNodeOptions.setFsm(mockStateMachine);
        createNodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta1");
        createNodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot1");
        createNodeOptions.setInitialConf(new Configuration(Collections.singletonList(peerId), Collections.singletonList(peerId2)));
        RaftGroupService createService = createService("unittest", new PeerId(endpoint2, 0), createNodeOptions);
        createService.start();
        NodeOptions createNodeOptions2 = createNodeOptions(1);
        MockStateMachine mockStateMachine2 = new MockStateMachine(endpoint);
        createNodeOptions2.setFsm(mockStateMachine2);
        createNodeOptions2.setRaftMetaUri(this.dataPath + File.separator + "meta");
        createNodeOptions2.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        createNodeOptions2.setInitialConf(new Configuration(Collections.singletonList(peerId), Collections.singletonList(peerId2)));
        RaftGroupService createService2 = createService("unittest", new PeerId(endpoint, 0), createNodeOptions2);
        Node start = createService2.start();
        Assertions.assertEquals(1, start.listPeers().size());
        Assertions.assertTrue(start.listPeers().contains(peerId));
        Assertions.assertTrue(waitForCondition(() -> {
            return start.isLeader();
        }, 1000L));
        sendTestTaskAndWait(start, 10);
        Assertions.assertEquals(10, mockStateMachine2.getLogs().size());
        int i = 0;
        for (ByteBuffer byteBuffer : mockStateMachine2.getLogs()) {
            int i2 = i;
            i++;
            Assertions.assertEquals("hello" + i2, stringFromBytes(byteBuffer.array()));
        }
        Thread.sleep(1000L);
        createService2.shutdown();
        Assertions.assertEquals(10, mockStateMachine.getLogs().size());
        int i3 = 0;
        for (ByteBuffer byteBuffer2 : mockStateMachine.getLogs()) {
            int i4 = i3;
            i3++;
            Assertions.assertEquals("hello" + i4, stringFromBytes(byteBuffer2.array()));
        }
        createService.shutdown();
    }

    @Test
    public void testResetLearners() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (int i = 0; i < 3; i++) {
            linkedHashSet.add(new PeerId(TestUtils.getLocalAddress(), 5006 + i));
        }
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, linkedHashSet, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        java.util.Iterator it2 = linkedHashSet.iterator();
        while (it2.hasNext()) {
            Assertions.assertTrue(this.cluster.startLearner((PeerId) it2.next()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        this.cluster.ensureLeader(leader);
        waitForCondition(() -> {
            return leader.listAlivePeers().size() == 3;
        }, 5000L);
        waitForCondition(() -> {
            return leader.listAliveLearners().size() == 3;
        }, 5000L);
        sendTestTaskAndWait(leader);
        List<MockStateMachine> fsms = this.cluster.getFsms();
        Assertions.assertEquals(6, fsms.size());
        this.cluster.ensureSame();
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), 5006);
        linkedHashSet.remove(peerId);
        Assertions.assertEquals(2, linkedHashSet.size());
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        leader.resetLearners(new ArrayList(linkedHashSet), synchronizedClosure);
        Assertions.assertTrue(synchronizedClosure.await().isOk());
        Assertions.assertEquals(2, leader.listAliveLearners().size());
        Assertions.assertEquals(2, leader.listLearners().size());
        sendTestTaskAndWait(leader);
        Thread.sleep(500L);
        Assertions.assertEquals(6, fsms.size());
        MockStateMachine remove = fsms.remove(3);
        Assertions.assertEquals(remove.getAddress(), peerId.getEndpoint());
        Assertions.assertTrue(this.cluster.getLeaderFsm().getLogs().size() > remove.getLogs().size());
        Assertions.assertEquals(this.cluster.getLeaderFsm().getLogs().size(), 2 * remove.getLogs().size());
        PeerId peerId2 = new PeerId(TestUtils.getLocalAddress(), 5007);
        SynchronizedClosure synchronizedClosure2 = new SynchronizedClosure();
        leader.removeLearners(Arrays.asList(peerId2), synchronizedClosure2);
        Assertions.assertTrue(synchronizedClosure2.await().isOk());
        sendTestTaskAndWait(leader);
        Thread.sleep(500L);
        MockStateMachine remove2 = fsms.remove(3);
        Assertions.assertEquals(remove2.getAddress(), peerId2.getEndpoint());
        Assertions.assertTrue(this.cluster.getLeaderFsm().getLogs().size() > remove2.getLogs().size());
        Assertions.assertEquals(this.cluster.getLeaderFsm().getLogs().size(), (remove2.getLogs().size() / 2) * 3);
        Assertions.assertEquals(3, leader.listAlivePeers().size());
        Assertions.assertEquals(1, leader.listAliveLearners().size());
        Assertions.assertEquals(1, leader.listLearners().size());
    }

    @Test
    public void testTripleNodesWithStaticLearners() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        LinkedHashSet<PeerId> linkedHashSet = new LinkedHashSet<>();
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), 5006);
        linkedHashSet.add(peerId);
        this.cluster.setLearners(linkedHashSet);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        this.cluster.ensureLeader(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        Assertions.assertEquals(1, leader.listLearners().size());
        Assertions.assertTrue(leader.listLearners().contains(peerId));
        Assertions.assertTrue(leader.listAliveLearners().isEmpty());
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        Thread.sleep(1000L);
        Assertions.assertEquals(3, leader.listPeers().size());
        Assertions.assertEquals(1, leader.listLearners().size());
        Assertions.assertEquals(1, leader.listAliveLearners().size());
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        Assertions.assertEquals(4, this.cluster.getFsms().size());
    }

    @Test
    public void testTripleNodesWithLearners() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        Assertions.assertTrue(leader.listLearners().isEmpty());
        Assertions.assertTrue(leader.listAliveLearners().isEmpty());
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), 5006);
        Assertions.assertTrue(this.cluster.startLearner(peerId));
        leader.addLearners(Arrays.asList(peerId), synchronizedClosure);
        Assertions.assertTrue(synchronizedClosure.await().isOk());
        Assertions.assertEquals(1, leader.listAliveLearners().size());
        Assertions.assertEquals(1, leader.listLearners().size());
        sendTestTaskAndWait(leader);
        leader.apply(new Task(ByteBuffer.wrap("no closure".getBytes(StandardCharsets.UTF_8)), (Closure) null));
        ByteBuffer wrap = ByteBuffer.wrap("task closure".getBytes(StandardCharsets.UTF_8));
        final List synchronizedList = Collections.synchronizedList(new ArrayList());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.apply(new Task(wrap, new TaskClosure() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.4
            public void run(Status status) {
                synchronizedList.add("apply");
                countDownLatch.countDown();
            }

            public void onCommitted() {
                synchronizedList.add("commit");
            }
        }));
        countDownLatch.await();
        Assertions.assertEquals(2, synchronizedList.size());
        Assertions.assertEquals("commit", synchronizedList.get(0));
        Assertions.assertEquals("apply", synchronizedList.get(1));
        Assertions.assertEquals(4, this.cluster.getFsms().size());
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
        Assertions.assertEquals(1, this.cluster.getLearners().size());
        this.cluster.ensureSame();
        SynchronizedClosure synchronizedClosure2 = new SynchronizedClosure();
        PeerId peerId2 = new PeerId(TestUtils.getLocalAddress(), 5007);
        Assertions.assertTrue(this.cluster.startLearner(peerId2));
        leader.addLearners(Arrays.asList(peerId2), synchronizedClosure2);
        Assertions.assertTrue(synchronizedClosure2.await().isOk());
        Assertions.assertEquals(2, leader.listAliveLearners().size());
        Assertions.assertEquals(2, leader.listLearners().size());
        this.cluster.ensureSame();
        java.util.Iterator<Node> it2 = this.cluster.getFollowers().iterator();
        while (it2.hasNext()) {
            Assertions.assertTrue(this.cluster.stop(it2.next().getNodeId().getPeerId().getEndpoint()));
        }
        ByteBuffer wrap2 = ByteBuffer.wrap("task closure".getBytes(StandardCharsets.UTF_8));
        SynchronizedClosure synchronizedClosure3 = new SynchronizedClosure();
        leader.apply(new Task(wrap2, synchronizedClosure3));
        Assertions.assertFalse(synchronizedClosure3.await().isOk());
        Assertions.assertEquals(RaftError.EPERM, synchronizedClosure3.getStatus().getRaftError());
        Assertions.assertEquals(3, this.cluster.getFsms().size());
    }

    @Test
    public void testNodesWithPriorityElection() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(100);
        arrayList.add(40);
        arrayList.add(40);
        List<PeerId> generatePriorityPeers = TestUtils.generatePriorityPeers(3, arrayList);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePriorityPeers, this.testInfo);
        for (PeerId peerId : generatePriorityPeers) {
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), peerId.getPriority()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        Assertions.assertEquals(100, leader.getNodeTargetPriority());
        Assertions.assertEquals(100, leader.getLeaderId().getPriority());
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
    }

    @Test
    public void testNodesWithPartPriorityElection() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(100);
        arrayList.add(40);
        arrayList.add(-1);
        List<PeerId> generatePriorityPeers = TestUtils.generatePriorityPeers(3, arrayList);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePriorityPeers, this.testInfo);
        for (PeerId peerId : generatePriorityPeers) {
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), peerId.getPriority()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
    }

    @Test
    public void testNodesWithSpecialPriorityElection() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(0);
        arrayList.add(0);
        arrayList.add(-1);
        List<PeerId> generatePriorityPeers = TestUtils.generatePriorityPeers(3, arrayList);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePriorityPeers, this.testInfo);
        for (PeerId peerId : generatePriorityPeers) {
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), peerId.getPriority()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
    }

    @Test
    public void testNodesWithZeroValPriorityElection() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(50);
        arrayList.add(0);
        arrayList.add(0);
        List<PeerId> generatePriorityPeers = TestUtils.generatePriorityPeers(3, arrayList);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePriorityPeers, this.testInfo);
        for (PeerId peerId : generatePriorityPeers) {
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), peerId.getPriority()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
        Assertions.assertEquals(50, leader.getNodeTargetPriority());
        Assertions.assertEquals(50, leader.getLeaderId().getPriority());
    }

    @Test
    public void testNoLeaderWithZeroValPriorityElection() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(0);
        arrayList.add(0);
        arrayList.add(0);
        List<PeerId> generatePriorityPeers = TestUtils.generatePriorityPeers(3, arrayList);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePriorityPeers, this.testInfo);
        for (PeerId peerId : generatePriorityPeers) {
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), peerId.getPriority()));
        }
        Thread.sleep(200L);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(3, followers.size());
        java.util.Iterator<Node> it = followers.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(0, it.next().getNodeId().getPeerId().getPriority());
        }
    }

    @Test
    public void testLeaderStopAndReElectWithPriority() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(100);
        arrayList.add(60);
        arrayList.add(10);
        List<PeerId> generatePriorityPeers = TestUtils.generatePriorityPeers(3, arrayList);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePriorityPeers, this.testInfo);
        for (PeerId peerId : generatePriorityPeers) {
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), peerId.getPriority()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        this.cluster.ensureLeader(leader);
        Assertions.assertNotNull(leader);
        Assertions.assertEquals(100, leader.getNodeId().getPeerId().getPriority());
        Assertions.assertEquals(100, leader.getNodeTargetPriority());
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        Assertions.assertTrue(this.cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        Assertions.assertNotNull(leader2);
        Assertions.assertEquals(60, leader2.getNodeId().getPeerId().getPriority());
        Assertions.assertEquals(100, leader2.getNodeTargetPriority());
    }

    @Test
    public void testRemoveLeaderWithPriority() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(100);
        arrayList.add(60);
        arrayList.add(10);
        List<PeerId> generatePriorityPeers = TestUtils.generatePriorityPeers(3, arrayList);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePriorityPeers, this.testInfo);
        for (PeerId peerId : generatePriorityPeers) {
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), peerId.getPriority()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        Assertions.assertEquals(100, leader.getNodeTargetPriority());
        Assertions.assertEquals(100, leader.getNodeId().getPeerId().getPriority());
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
        PeerId copy = leader.getNodeId().getPeerId().copy();
        Endpoint endpoint = copy.getEndpoint();
        LOG.info("Remove old leader {}", new Object[]{copy});
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.removePeer(copy, new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        Assertions.assertEquals(60, leader.getNodeTargetPriority());
        LOG.info("Stop and clean old leader {}", new Object[]{copy});
        Assertions.assertTrue(this.cluster.stop(endpoint));
        this.cluster.clean(endpoint);
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        LOG.info("New leader is {}", new Object[]{leader2});
        Assertions.assertNotNull(leader2);
        Assertions.assertNotSame(leader2, copy);
    }

    @Test
    public void testChecksum() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        TestCluster testCluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        try {
            RaftOptions raftOptions = new RaftOptions();
            raftOptions.setEnableLogEntryChecksum(true);
            java.util.Iterator<PeerId> it = generatePeers.iterator();
            while (it.hasNext()) {
                Assertions.assertTrue(testCluster.start(it.next().getEndpoint(), false, 300, true, null, raftOptions));
            }
            testCluster.waitLeader();
            Node leader = testCluster.getLeader();
            Assertions.assertNotNull(leader);
            Assertions.assertEquals(3, leader.listPeers().size());
            sendTestTaskAndWait(leader);
            testCluster.ensureSame();
            testCluster.stopAll();
            TestCluster testCluster2 = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
            try {
                RaftOptions raftOptions2 = new RaftOptions();
                raftOptions2.setEnableLogEntryChecksum(false);
                for (PeerId peerId : generatePeers) {
                    if (peerId.equals(generatePeers.get(2))) {
                        raftOptions2 = new RaftOptions();
                        raftOptions2.setEnableLogEntryChecksum(true);
                    }
                    Assertions.assertTrue(testCluster2.start(peerId.getEndpoint(), false, 300, true, null, raftOptions2));
                }
                testCluster2.waitLeader();
                Node leader2 = testCluster2.getLeader();
                Assertions.assertNotNull(leader2);
                Assertions.assertEquals(3, leader2.listPeers().size());
                sendTestTaskAndWait(leader2);
                testCluster2.ensureSame();
                testCluster2.stopAll();
                testCluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
                try {
                    RaftOptions raftOptions3 = new RaftOptions();
                    raftOptions3.setEnableLogEntryChecksum(false);
                    java.util.Iterator<PeerId> it2 = generatePeers.iterator();
                    while (it2.hasNext()) {
                        Assertions.assertTrue(testCluster.start(it2.next().getEndpoint(), false, 300, true, null, raftOptions3));
                    }
                    testCluster.waitLeader();
                    Node leader3 = testCluster.getLeader();
                    Assertions.assertNotNull(leader3);
                    Assertions.assertEquals(3, leader3.listPeers().size());
                    sendTestTaskAndWait(leader3);
                    testCluster.ensureSame();
                    testCluster.stopAll();
                    TestCluster testCluster3 = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
                    try {
                        RaftOptions raftOptions4 = new RaftOptions();
                        raftOptions4.setEnableLogEntryChecksum(true);
                        java.util.Iterator<PeerId> it3 = generatePeers.iterator();
                        while (it3.hasNext()) {
                            Assertions.assertTrue(testCluster3.start(it3.next().getEndpoint(), false, 300, true, null, raftOptions4));
                        }
                        testCluster3.waitLeader();
                        Node leader4 = testCluster3.getLeader();
                        Assertions.assertNotNull(leader4);
                        Assertions.assertEquals(3, leader4.listPeers().size());
                        sendTestTaskAndWait(leader4);
                        testCluster3.ensureSame();
                        testCluster3.stopAll();
                    } finally {
                        testCluster3.stopAll();
                    }
                } finally {
                    testCluster.stopAll();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testReadIndex() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint(), false, 300, true));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        sendTestTaskAndWait(leader);
        if (!assertReadIndex(leader, 11)) {
            Assertions.assertTrue(assertReadIndex(leader, 11));
        }
        for (Node node : this.cluster.getFollowers()) {
            Assertions.assertNotNull(node);
            Assertions.assertTrue(waitForCondition(() -> {
                return leader.getNodeId().getPeerId().equals(node.getLeaderId());
            }, 5000L));
            assertReadIndex(node, 11);
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.readIndex((byte[]) null, new ReadIndexClosure() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.5
            public void run(Status status, long j, byte[] bArr) {
                Assertions.assertNull(bArr);
                Assertions.assertTrue(status.isOk());
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }

    @Test
    public void testReadIndexTimeout() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint(), false, 300, true));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        sendTestTaskAndWait(leader);
        if (!assertReadIndex(leader, 11)) {
            Assertions.assertTrue(assertReadIndex(leader, 11));
        }
        for (Node node : this.cluster.getFollowers()) {
            Assertions.assertNotNull(node);
            Assertions.assertTrue(waitForCondition(() -> {
                return leader.getNodeId().getPeerId().equals(node.getLeaderId());
            }, 5000L));
            assertReadIndex(node, 11);
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final long currentTimeMillis = System.currentTimeMillis();
        leader.readIndex((byte[]) null, new ReadIndexClosure() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.6
            public void run(Status status, long j, byte[] bArr) {
                Assertions.assertNull(bArr);
                if (status.isOk()) {
                    System.err.println("Read-index so fast: " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
                } else {
                    Assertions.assertEquals(new Status(RaftError.ETIMEDOUT, "read-index request timeout", new Object[0]), status);
                    Assertions.assertEquals(-1L, j);
                }
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }

    @Test
    public void testReadIndexFromLearner() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint(), false, 300, true));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        sendTestTaskAndWait(leader);
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), 5006);
        Assertions.assertTrue(this.cluster.startLearner(peerId));
        leader.addLearners(Arrays.asList(peerId), synchronizedClosure);
        Assertions.assertTrue(synchronizedClosure.await().isOk());
        Assertions.assertEquals(1, leader.listAliveLearners().size());
        Assertions.assertEquals(1, leader.listLearners().size());
        Thread.sleep(100L);
        Node node = (Node) this.cluster.getNodes().get(3);
        Assertions.assertNotNull(leader);
        assertReadIndex(node, 12);
        assertReadIndex(node, 12);
    }

    @Test
    public void testReadIndexChaos() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint(), false, 300, true));
        }
        this.cluster.waitLeader();
        final Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        this.executors.add(newFixedThreadPool);
        for (int i = 0; i < 10; i++) {
            newFixedThreadPool.submit(new Runnable() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.7
                @Override // java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 100; i2++) {
                        try {
                            try {
                                ItNodeTest.this.sendTestTaskAndWait(leader);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                            readIndexRandom(ItNodeTest.this.cluster);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                }

                private void readIndexRandom(TestCluster testCluster) {
                    final CountDownLatch countDownLatch2 = new CountDownLatch(1);
                    final byte[] randomBytes = TestUtils.getRandomBytes();
                    testCluster.getNodes().get(ThreadLocalRandom.current().nextInt(3)).readIndex(randomBytes, new ReadIndexClosure() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.7.1
                        public void run(Status status, long j, byte[] bArr) {
                            if (status.isOk()) {
                                Assertions.assertTrue(status.isOk(), status.toString());
                                Assertions.assertTrue(j > 0);
                                Assertions.assertArrayEquals(randomBytes, bArr);
                            }
                            countDownLatch2.countDown();
                        }
                    });
                    try {
                        countDownLatch2.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }
        countDownLatch.await();
        this.cluster.ensureSame();
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(10000, it2.next().getLogs().size());
        }
    }

    @Test
    public void testNodeMetrics() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint(), false, 300, true));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        Assertions.assertEquals(3, leader.listPeers().size());
        sendTestTaskAndWait(leader);
        leader.apply(new Task(ByteBuffer.wrap("no closure".getBytes(StandardCharsets.UTF_8)), (Closure) null));
        this.cluster.ensureSame();
        for (Node node : this.cluster.getNodes()) {
            System.out.println("-------------" + node.getNodeId() + "-------------");
            ConsoleReporter build = ConsoleReporter.forRegistry(node.getNodeMetrics().getMetricRegistry()).build();
            build.report();
            build.close();
            System.out.println();
        }
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
    }

    @Test
    public void testLeaderFail() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        LOG.info("Current leader is {}", new Object[]{leader.getLeaderId()});
        sendTestTaskAndWait(leader);
        List<Node> followers = this.cluster.getFollowers();
        blockMessagesOnFollowers(followers, (obj, str) -> {
            return (obj instanceof RpcRequests.RequestVoteRequest) && !((RpcRequests.RequestVoteRequest) obj).preVote();
        });
        LOG.warn("Stop leader {}", new Object[]{leader.getNodeId().getPeerId()});
        PeerId peerId = leader.getNodeId().getPeerId();
        Assertions.assertTrue(this.cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));
        Assertions.assertFalse(followers.isEmpty());
        sendTestTaskAndWait("follower apply ", followers.get(0), -1);
        stopBlockingMessagesOnFollowers(followers);
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        LOG.info("Elect new leader is {}", new Object[]{leader2.getLeaderId()});
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 10; i < 20; i++) {
            leader2.apply(new Task(ByteBuffer.wrap(("hello" + i).getBytes(StandardCharsets.UTF_8)), new ExpectClosure(countDownLatch)));
        }
        waitLatch(countDownLatch);
        LOG.info("restart old leader {}", new Object[]{peerId});
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        CountDownLatch countDownLatch2 = new CountDownLatch(10);
        for (int i2 = 20; i2 < 30; i2++) {
            leader2.apply(new Task(ByteBuffer.wrap(("hello" + i2).getBytes(StandardCharsets.UTF_8)), new ExpectClosure(countDownLatch2)));
        }
        waitLatch(countDownLatch2);
        this.cluster.stop(peerId.getEndpoint());
        this.cluster.clean(peerId.getEndpoint());
        LOG.info("Restart old leader with cleanup {}", new Object[]{peerId});
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        this.cluster.ensureSame();
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(30, it2.next().getLogs().size());
        }
    }

    @Test
    public void testJoinNodes() throws Exception {
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        PeerId peerId2 = new PeerId(TestUtils.getLocalAddress(), 5004);
        PeerId peerId3 = new PeerId(TestUtils.getLocalAddress(), 5005);
        PeerId peerId4 = new PeerId(TestUtils.getLocalAddress(), 5006);
        ArrayList arrayList = new ArrayList();
        arrayList.add(peerId);
        this.cluster = new TestCluster("unittest", this.dataPath, arrayList, this.testInfo);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        Assertions.assertEquals(leader.getNodeId().getPeerId(), peerId);
        sendTestTaskAndWait(leader);
        Assertions.assertTrue(this.cluster.start(peerId2.getEndpoint(), false, 300));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        arrayList.add(peerId2);
        leader.addPeer(peerId2, new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        this.cluster.ensureSame();
        Assertions.assertEquals(2, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it = this.cluster.getFsms().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(10, it.next().getLogs().size());
        }
        arrayList.add(peerId3);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        leader.addPeer(peerId3, new ExpectClosure(RaftError.ECATCHUP, countDownLatch2));
        waitLatch(countDownLatch2);
        Thread.sleep(2000L);
        Assertions.assertTrue(this.cluster.start(peerId3.getEndpoint(), false, 300));
        CountDownLatch countDownLatch3 = new CountDownLatch(2);
        leader.addPeer(peerId3, new ExpectClosure(countDownLatch3));
        leader.addPeer(peerId4, new ExpectClosure(RaftError.EBUSY, countDownLatch3));
        waitLatch(countDownLatch3);
        try {
            leader.addPeer(peerId3, new ExpectClosure(countDownLatch3));
            Assertions.fail();
        } catch (IllegalArgumentException e) {
            Assertions.assertEquals("Peer already exists in current configuration", e.getMessage());
        }
        this.cluster.ensureSame();
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(10, it2.next().getLogs().size());
        }
    }

    @Test
    public void testRemoveFollower() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        PeerId peerId = followers.get(0).getNodeId().getPeerId();
        Endpoint endpoint = peerId.getEndpoint();
        LOG.info("Stop and clean follower {}", new Object[]{peerId});
        Assertions.assertTrue(this.cluster.stop(endpoint));
        this.cluster.clean(endpoint);
        LOG.info("Remove follower {}", new Object[]{peerId});
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.removePeer(peerId, new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
        Assertions.assertEquals(1, this.cluster.getFollowers().size());
        Assertions.assertTrue(TestUtils.generatePeers(3).remove(peerId));
        LOG.info("Start and add follower {}", new Object[]{peerId});
        Assertions.assertTrue(this.cluster.start(endpoint));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        leader.addPeer(peerId, new ExpectClosure(countDownLatch2));
        waitLatch(countDownLatch2);
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
        this.cluster.ensureSame();
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(20, it2.next().getLogs().size());
        }
    }

    @Test
    public void testRemoveLeader() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
        PeerId copy = leader.getNodeId().getPeerId().copy();
        Endpoint endpoint = copy.getEndpoint();
        LOG.info("Remove old leader {}", new Object[]{copy});
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.removePeer(copy, new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        LOG.info("New leader is {}", new Object[]{leader2});
        Assertions.assertNotNull(leader2);
        sendTestTaskAndWait(leader2, 10, RaftError.SUCCESS);
        LOG.info("Stop and clean old leader {}", new Object[]{copy});
        Assertions.assertTrue(this.cluster.stop(endpoint));
        this.cluster.clean(endpoint);
        LOG.info("Start and add old leader {}", new Object[]{copy});
        Assertions.assertTrue(this.cluster.start(endpoint));
        Assertions.assertTrue(TestUtils.generatePeers(3).remove(copy));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        leader2.addPeer(copy, new ExpectClosure(countDownLatch2));
        waitLatch(countDownLatch2);
        Assertions.assertEquals(2, this.cluster.getFollowers().size());
        this.cluster.ensureSame();
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(20, it2.next().getLogs().size());
        }
    }

    @Test
    public void testPreVote() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        NodeImpl leader = this.cluster.getLeader();
        long currentTerm = leader.getCurrentTerm();
        Assertions.assertNotNull(leader);
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        PeerId peerId = followers.get(0).getNodeId().getPeerId();
        Endpoint endpoint = peerId.getEndpoint();
        LOG.info("Remove follower {}", new Object[]{peerId});
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.removePeer(peerId, new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        sendTestTaskAndWait((Node) leader, 10, RaftError.SUCCESS);
        Thread.sleep(2000L);
        LOG.info("Add follower {}", new Object[]{endpoint});
        Assertions.assertTrue(TestUtils.generatePeers(3).remove(peerId));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        leader.addPeer(peerId, new ExpectClosure(countDownLatch2));
        waitLatch(countDownLatch2);
        NodeImpl leader2 = this.cluster.getLeader();
        Assertions.assertNotNull(leader2);
        Assertions.assertEquals(currentTerm, leader2.getCurrentTerm());
    }

    @Test
    public void testSetPeer1() throws Exception {
        this.cluster = new TestCluster("testSetPeer1", this.dataPath, new ArrayList(), this.testInfo);
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(1, followers.size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(peerId);
        Assertions.assertTrue(followers.get(0).resetPeers(new Configuration(arrayList)).isOk());
        this.cluster.waitLeader();
        Assertions.assertNotNull(this.cluster.getLeader());
    }

    @Test
    public void testSetPeer2() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        PeerId peerId = followers.get(0).getNodeId().getPeerId();
        Endpoint endpoint = peerId.getEndpoint();
        PeerId peerId2 = followers.get(1).getNodeId().getPeerId();
        Endpoint endpoint2 = peerId2.getEndpoint();
        LOG.info("Stop and clean follower {}", new Object[]{peerId});
        Assertions.assertTrue(this.cluster.stop(endpoint));
        this.cluster.clean(endpoint);
        sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
        Endpoint copy = leader.getLeaderId().getEndpoint().copy();
        LOG.info("Set peers to {}", new Object[]{copy});
        LOG.info("Stop and clean follower {}", new Object[]{peerId2});
        Assertions.assertTrue(this.cluster.stop(endpoint2));
        this.cluster.clean(endpoint2);
        Assertions.assertTrue(waitForTopology(this.cluster, copy, 1, 5000L));
        Thread.sleep(2000L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new PeerId(copy, 0));
        Assertions.assertTrue(leader.resetPeers(new Configuration(generatePeers)).isOk());
        LOG.warn("Set peers to {}", new Object[]{copy});
        Assertions.assertTrue(leader.resetPeers(new Configuration(arrayList)).isOk());
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        Assertions.assertNotNull(leader2);
        Assertions.assertEquals(copy, leader2.getNodeId().getPeerId().getEndpoint());
        LOG.info("start follower {}", new Object[]{endpoint});
        Assertions.assertTrue(this.cluster.start(endpoint, true, 300));
        LOG.info("start follower {}", new Object[]{endpoint2});
        Assertions.assertTrue(this.cluster.start(endpoint2, true, 300));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        LOG.info("Add old follower {}", new Object[]{endpoint});
        leader2.addPeer(peerId, new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        LOG.info("Add old follower {}", new Object[]{endpoint2});
        leader2.addPeer(peerId2, new ExpectClosure(countDownLatch2));
        waitLatch(countDownLatch2);
        arrayList.add(peerId);
        arrayList.add(peerId2);
        this.cluster.ensureSame();
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(20, it2.next().getLogs().size());
        }
    }

    @Test
    public void testRestoreSnapshot() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        LOG.info("Leader: " + leader, new Object[0]);
        Assertions.assertNotNull(leader);
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        triggerLeaderSnapshot(this.cluster, leader);
        Endpoint copy = leader.getNodeId().getPeerId().getEndpoint().copy();
        Assertions.assertTrue(this.cluster.stop(copy));
        this.cluster.waitLeader();
        Assertions.assertEquals(0, this.cluster.getLeaderFsm().getLoadSnapshotTimes());
        Assertions.assertTrue(this.cluster.start(copy));
        this.cluster.ensureSame();
        Assertions.assertEquals(0, this.cluster.getLeaderFsm().getLoadSnapshotTimes());
    }

    @Test
    public void testRestoreSnapshotWithDelta() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        LOG.info("Leader: " + leader, new Object[0]);
        Assertions.assertNotNull(leader);
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        triggerLeaderSnapshot(this.cluster, leader);
        Endpoint copy = leader.getNodeId().getPeerId().getEndpoint().copy();
        Assertions.assertTrue(this.cluster.stop(copy));
        this.cluster.waitLeader();
        sendTestTaskAndWait(this.cluster.getLeader(), 10, RaftError.SUCCESS);
        Assertions.assertEquals(0, this.cluster.getLeaderFsm().getLoadSnapshotTimes());
        Assertions.assertTrue(this.cluster.start(copy));
        Node node = this.cluster.getNode(copy);
        this.cluster.ensureSame();
        Assertions.assertEquals(0, this.cluster.getLeaderFsm().getLoadSnapshotTimes());
        Assertions.assertEquals(1, node.getOptions().getFsm().getLoadSnapshotTimes());
    }

    @Test
    public void testInstallSnapshotWithThrottle() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint(), false, 200, false, new ThroughputSnapshotThrottle(1024L, 1L)));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        Endpoint endpoint = followers.get(0).getNodeId().getPeerId().getEndpoint();
        Assertions.assertTrue(this.cluster.stop(endpoint));
        this.cluster.waitLeader();
        sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
        Thread.sleep(1000L);
        triggerLeaderSnapshot(this.cluster, leader);
        sendTestTaskAndWait(leader, 20, RaftError.SUCCESS);
        triggerLeaderSnapshot(this.cluster, leader, 2);
        Thread.sleep(1000L);
        this.cluster.clean(endpoint);
        Assertions.assertTrue(this.cluster.start(endpoint, true, 300, false, new ThroughputSnapshotThrottle(1024L, 1L)));
        Thread.sleep(2000L);
        this.cluster.ensureSame();
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(30, it2.next().getLogs().size());
        }
    }

    @Disabled("https://issues.apache.org/jira/browse/IGNITE-16467")
    @Test
    public void testInstallLargeSnapshotWithThrottle() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(4);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers.subList(0, 3), this.testInfo);
        for (int i = 0; i < generatePeers.size() - 1; i++) {
            Assertions.assertTrue(this.cluster.start(generatePeers.get(i).getEndpoint(), false, 200, false));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
        this.cluster.ensureSame();
        for (int i2 = 1; i2 < 100; i2++) {
            sendTestTaskAndWait(leader, i2 * 10, RaftError.SUCCESS);
        }
        Thread.sleep(1000L);
        triggerLeaderSnapshot(this.cluster, leader);
        for (int i3 = 100; i3 < 200; i3++) {
            sendTestTaskAndWait(leader, i3 * 10, RaftError.SUCCESS);
        }
        triggerLeaderSnapshot(this.cluster, leader, 2);
        Thread.sleep(1000L);
        PeerId peerId = generatePeers.get(3);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), false, 300, false, new ThroughputSnapshotThrottle(128L, 1L)));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.addPeer(peerId, status -> {
            Assertions.assertTrue(status.isOk(), status.toString());
            countDownLatch.countDown();
        });
        waitLatch(countDownLatch);
        this.cluster.ensureSame();
        Assertions.assertEquals(4, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it = this.cluster.getFsms().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(2000, it.next().getLogs().size());
        }
    }

    @Test
    public void testInstallLargeSnapshot() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(4);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers.subList(0, 3), this.testInfo);
        for (int i = 0; i < generatePeers.size() - 1; i++) {
            Assertions.assertTrue(this.cluster.start(generatePeers.get(i).getEndpoint(), false, 200, false));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
        this.cluster.ensureSame();
        for (int i2 = 1; i2 < 100; i2++) {
            sendTestTaskAndWait(leader, i2 * 10, RaftError.SUCCESS);
        }
        Thread.sleep(1000L);
        triggerLeaderSnapshot(this.cluster, leader);
        for (int i3 = 100; i3 < 200; i3++) {
            sendTestTaskAndWait(leader, i3 * 10, RaftError.SUCCESS);
        }
        triggerLeaderSnapshot(this.cluster, leader, 2);
        Thread.sleep(1000L);
        PeerId peerId = generatePeers.get(3);
        RaftOptions raftOptions = new RaftOptions();
        raftOptions.setMaxByteCountPerRpc(128);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), false, 300, false, null, raftOptions));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.addPeer(peerId, status -> {
            Assertions.assertTrue(status.isOk(), status.toString());
            countDownLatch.countDown();
        });
        waitLatch(countDownLatch);
        this.cluster.ensureSame();
        Assertions.assertEquals(4, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it = this.cluster.getFsms().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(2000, it.next().getLogs().size());
        }
    }

    @Test
    public void testInstallSnapshot() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        sendTestTaskAndWait(leader);
        this.cluster.ensureSame();
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        Endpoint endpoint = followers.get(0).getNodeId().getPeerId().getEndpoint();
        Assertions.assertTrue(this.cluster.stop(endpoint));
        sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
        triggerLeaderSnapshot(this.cluster, leader);
        sendTestTaskAndWait(leader, 20, RaftError.SUCCESS);
        triggerLeaderSnapshot(this.cluster, leader, 2);
        Thread.sleep(50L);
        this.cluster.clean(endpoint);
        Assertions.assertTrue(this.cluster.start(endpoint, false, 300));
        this.cluster.ensureSame();
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        for (MockStateMachine mockStateMachine : this.cluster.getFsms()) {
            Assertions.assertEquals(30, mockStateMachine.getLogs().size(), mockStateMachine.getAddress().toString());
        }
    }

    @Test
    public void testNoSnapshot() throws Exception {
        Endpoint endpoint = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        NodeOptions createNodeOptions = createNodeOptions(0);
        MockStateMachine mockStateMachine = new MockStateMachine(endpoint);
        createNodeOptions.setFsm(mockStateMachine);
        createNodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        createNodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(endpoint, 0))));
        Node start = createService("unittest", new PeerId(endpoint, 0), createNodeOptions).start();
        Thread.sleep(2000L);
        sendTestTaskAndWait(start);
        Assertions.assertEquals(0, mockStateMachine.getSaveSnapshotTimes());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        start.snapshot(new ExpectClosure(RaftError.EINVAL, "Snapshot is not supported", countDownLatch));
        waitLatch(countDownLatch);
        Assertions.assertEquals(0, mockStateMachine.getSaveSnapshotTimes());
    }

    @Test
    public void testAutoSnapshot() throws Exception {
        Endpoint endpoint = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        NodeOptions createNodeOptions = createNodeOptions(0);
        MockStateMachine mockStateMachine = new MockStateMachine(endpoint);
        createNodeOptions.setFsm(mockStateMachine);
        createNodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        createNodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        createNodeOptions.setSnapshotIntervalSecs(10);
        createNodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(endpoint, 0))));
        Node start = createService("unittest", new PeerId(endpoint, 0), createNodeOptions).start();
        Thread.sleep(2000L);
        sendTestTaskAndWait(start);
        Thread.sleep(10000L);
        int saveSnapshotTimes = mockStateMachine.getSaveSnapshotTimes();
        Assertions.assertTrue(saveSnapshotTimes >= 1, "snapshotTimes=" + saveSnapshotTimes);
        Assertions.assertTrue(mockStateMachine.getSnapshotIndex() > 0);
    }

    @Test
    public void testLeaderShouldNotChange() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        NodeImpl leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        long currentTerm = leader.getCurrentTerm();
        LOG.info("Current leader is {}, term is {}", new Object[]{leader, Long.valueOf(currentTerm)});
        Thread.sleep(5000L);
        this.cluster.waitLeader();
        NodeImpl leader2 = this.cluster.getLeader();
        Assertions.assertNotNull(leader2);
        LOG.info("Current leader is {}", new Object[]{leader2});
        Assertions.assertEquals(currentTerm, leader2.getCurrentTerm());
    }

    @Test
    public void testRecoverFollower() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        Endpoint copy = followers.get(0).getNodeId().getPeerId().getEndpoint().copy();
        Assertions.assertTrue(this.cluster.stop(copy));
        sendTestTaskAndWait(leader);
        for (int i = 10; i < 30; i++) {
            leader.apply(new Task(ByteBuffer.wrap(("no cluster" + i).getBytes(StandardCharsets.UTF_8)), (Closure) null));
        }
        Thread.sleep(5000L);
        Assertions.assertTrue(this.cluster.start(copy));
        this.cluster.ensureSame();
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(30, it2.next().getLogs().size());
        }
    }

    @Test
    public void testLeaderTransfer() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        sendTestTaskAndWait(leader);
        Thread.sleep(100L);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        PeerId copy = followers.get(0).getNodeId().getPeerId().copy();
        LOG.info("Transfer leadership from {} to {}", new Object[]{leader, copy});
        Assertions.assertTrue(leader.transferLeadershipTo(copy).isOk());
        this.cluster.waitLeader();
        Assertions.assertEquals(this.cluster.getLeader().getNodeId().getPeerId(), copy);
    }

    @Test
    public void testLeaderTransferBeforeLogIsCompleted() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint(), false, 1));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        PeerId copy = followers.get(0).getNodeId().getPeerId().copy();
        Assertions.assertTrue(this.cluster.stop(copy.getEndpoint()));
        sendTestTaskAndWait(leader);
        LOG.info("Transfer leadership from {} to {}", new Object[]{leader, copy});
        Assertions.assertTrue(leader.transferLeadershipTo(copy).isOk());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.apply(new Task(ByteBuffer.wrap("aaaaa".getBytes(StandardCharsets.UTF_8)), new ExpectClosure(RaftError.EBUSY, countDownLatch)));
        waitLatch(countDownLatch);
        this.cluster.waitLeader();
        Assertions.assertTrue(this.cluster.start(copy.getEndpoint()));
        Assertions.assertNotEquals(copy, this.cluster.getLeader().getNodeId().getPeerId());
        this.cluster.ensureSame();
    }

    @Test
    public void testLeaderTransferResumeOnFailure() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint(), false, 1));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        PeerId copy = followers.get(0).getNodeId().getPeerId().copy();
        Assertions.assertTrue(this.cluster.stop(copy.getEndpoint()));
        sendTestTaskAndWait(leader);
        Assertions.assertTrue(leader.transferLeadershipTo(copy).isOk());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.apply(new Task(ByteBuffer.wrap("aaaaa".getBytes(StandardCharsets.UTF_8)), new ExpectClosure(RaftError.EBUSY, countDownLatch)));
        waitLatch(countDownLatch);
        Thread.sleep(100L);
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        Assertions.assertSame(leader2, leader);
        Assertions.assertTrue(this.cluster.start(copy.getEndpoint()));
        Thread.sleep(100L);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        leader2.apply(new Task(ByteBuffer.wrap("aaaaa".getBytes(StandardCharsets.UTF_8)), new ExpectClosure(countDownLatch2)));
        waitLatch(countDownLatch2);
        this.cluster.ensureSame();
    }

    @Test
    public void testShutdownAndJoinWorkAfterInitFails() throws Exception {
        Endpoint endpoint = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        NodeOptions createNodeOptions = createNodeOptions(0);
        createNodeOptions.setFsm(new MockStateMachine(endpoint));
        createNodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        createNodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        createNodeOptions.setSnapshotIntervalSecs(10);
        createNodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(endpoint, 0))));
        RaftGroupService createService = createService("unittest", new PeerId(endpoint, 0), createNodeOptions);
        Node start = createService.start();
        Thread.sleep(1000L);
        sendTestTaskAndWait(start);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        start.snapshot(new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        createService.shutdown();
        NodeOptions createNodeOptions2 = createNodeOptions(1);
        createNodeOptions2.setFsm(new MockFSM1(endpoint));
        createNodeOptions2.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        createNodeOptions2.setRaftMetaUri(this.dataPath + File.separator + "meta");
        createNodeOptions2.setSnapshotIntervalSecs(10);
        createNodeOptions2.setInitialConf(new Configuration(Collections.singletonList(new PeerId(endpoint, 0))));
        try {
            createService("unittest", new PeerId(endpoint, 0), createNodeOptions2).start();
            Assertions.fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testShuttingDownLeaderTriggerTimeoutNow() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        LOG.info("Shutdown leader {}", new Object[]{leader});
        leader.shutdown();
        leader.join();
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        Assertions.assertNotNull(leader2);
        Assertions.assertNotSame(leader2, leader);
    }

    @Test
    public void testRemovingLeaderTriggerTimeoutNow() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        for (Node node : this.cluster.getFollowers()) {
            Assertions.assertTrue(waitForCondition(() -> {
                return node.getLeaderId() != null;
            }, 5000L));
        }
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.removePeer(leader.getNodeId().getPeerId(), new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        Assertions.assertNotNull(leader2);
        Assertions.assertNotSame(leader2, leader);
    }

    @Test
    public void testTransferShouldWorkAfterInstallSnapshot() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        for (int i = 0; i < generatePeers.size() - 1; i++) {
            Assertions.assertTrue(this.cluster.start(generatePeers.get(i).getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        sendTestTaskAndWait(leader);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(1, followers.size());
        PeerId peerId = followers.get(0).getNodeId().getPeerId();
        Assertions.assertTrue(leader.transferLeadershipTo(peerId).isOk());
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        Assertions.assertEquals(peerId, leader2.getNodeId().getPeerId());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader2.snapshot(new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        leader2.snapshot(new ExpectClosure(countDownLatch2));
        waitLatch(countDownLatch2);
        PeerId peerId2 = generatePeers.get(2);
        Assertions.assertTrue(this.cluster.start(peerId2.getEndpoint()));
        Thread.sleep(5000L);
        Assertions.assertTrue(leader2.transferLeadershipTo(peerId2).isOk());
        Thread.sleep(2000L);
        Assertions.assertEquals(peerId2, this.cluster.getLeader().getNodeId().getPeerId());
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it = this.cluster.getFsms().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(10, it.next().getLogs().size());
        }
    }

    @Test
    public void testAppendEntriesWhenFollowerIsInErrorState() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(5);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        sendTestTaskAndWait(leader);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(4, followers.size());
        NodeImpl nodeImpl = (Node) followers.get(0);
        Endpoint endpoint = nodeImpl.getNodeId().getPeerId().copy().getEndpoint();
        LOG.info("Set follower {} into error state", new Object[]{nodeImpl});
        nodeImpl.onError(new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(-1, "Follower has something wrong.")));
        Endpoint copy = leader.getNodeId().getPeerId().getEndpoint().copy();
        Assertions.assertTrue(this.cluster.stop(copy));
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        Assertions.assertNotNull(leader2);
        LOG.info("Elect a new leader {}", new Object[]{leader2});
        sendTestTaskAndWait(leader2, 10, RaftError.SUCCESS);
        Thread.sleep(20L);
        LOG.info("Stop error follower {}", new Object[]{nodeImpl});
        Assertions.assertTrue(this.cluster.stop(endpoint));
        LOG.info("Restart error follower {} and old leader {}", new Object[]{endpoint, copy});
        Assertions.assertTrue(this.cluster.start(endpoint));
        Assertions.assertTrue(this.cluster.start(copy));
        this.cluster.ensureSame();
        Assertions.assertEquals(5, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(20, it2.next().getLogs().size());
        }
    }

    @Test
    public void testFollowerStartStopFollowing() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(5);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        sendTestTaskAndWait(leader);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(4, followers.size());
        for (Node node : followers) {
            Assertions.assertTrue(waitForCondition(() -> {
                return node.getOptions().getFsm().getOnStartFollowingTimes() == 1;
            }, 5000L));
            Assertions.assertEquals(0, node.getOptions().getFsm().getOnStopFollowingTimes());
        }
        Assertions.assertTrue(this.cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));
        this.cluster.waitLeader();
        Node leader2 = this.cluster.getLeader();
        Assertions.assertNotNull(leader2);
        sendTestTaskAndWait(leader2, 10, RaftError.SUCCESS);
        List<Node> followers2 = this.cluster.getFollowers();
        Assertions.assertEquals(3, followers2.size());
        for (Node node2 : followers2) {
            Assertions.assertTrue(waitForCondition(() -> {
                return node2.getOptions().getFsm().getOnStartFollowingTimes() == 2;
            }, 5000L));
            Assertions.assertEquals(1, node2.getOptions().getFsm().getOnStopFollowingTimes());
        }
        PeerId copy = followers2.get(0).getNodeId().getPeerId().copy();
        Assertions.assertTrue(leader2.transferLeadershipTo(copy).isOk());
        Thread.sleep(100L);
        this.cluster.waitLeader();
        Node leader3 = this.cluster.getLeader();
        Assertions.assertEquals(copy, leader3.getNodeId().getPeerId());
        sendTestTaskAndWait(leader3, 20, RaftError.SUCCESS);
        List<Node> followers3 = this.cluster.getFollowers();
        Assertions.assertEquals(3, followers3.size());
        for (int i = 0; i < 3; i++) {
            Node node3 = followers3.get(i);
            if (node3.getNodeId().getPeerId().equals(leader2.getNodeId().getPeerId())) {
                Assertions.assertTrue(waitForCondition(() -> {
                    return node3.getOptions().getFsm().getOnStartFollowingTimes() == 2;
                }, 5000L));
                Assertions.assertEquals(1, node3.getOptions().getFsm().getOnStopFollowingTimes());
            } else {
                Assertions.assertTrue(waitForCondition(() -> {
                    return node3.getOptions().getFsm().getOnStartFollowingTimes() == 3;
                }, 5000L));
                Assertions.assertEquals(2, node3.getOptions().getFsm().getOnStopFollowingTimes());
            }
        }
        this.cluster.ensureSame();
    }

    @Test
    public void testLeaderPropagatedBeforeVote() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, 3000, this.testInfo);
        for (PeerId peerId : generatePeers) {
            RaftOptions raftOptions = new RaftOptions();
            raftOptions.setElectionHeartbeatFactor(4);
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), false, 300, false, null, raftOptions));
        }
        List<NodeImpl> nodes = this.cluster.getNodes();
        AtomicReference atomicReference = new AtomicReference();
        java.util.Iterator<NodeImpl> it = nodes.iterator();
        while (it.hasNext()) {
            RpcClientEx sender = TestUtils.sender(it.next());
            sender.recordMessages((obj, str) -> {
                return true;
            });
            sender.blockMessages((obj2, str2) -> {
                if (obj2 instanceof RpcRequests.RequestVoteRequest) {
                    if (((RpcRequests.RequestVoteRequest) obj2).preVote()) {
                        return false;
                    }
                    if (atomicReference.compareAndSet(null, str2)) {
                        return true;
                    }
                }
                if (!(obj2 instanceof RpcRequests.AppendEntriesRequest) || !str2.equals(atomicReference.get())) {
                    return false;
                }
                RpcRequests.AppendEntriesRequest appendEntriesRequest = (RpcRequests.AppendEntriesRequest) obj2;
                return (appendEntriesRequest.entriesList() == null || appendEntriesRequest.entriesList().isEmpty()) ? false : true;
            });
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        this.cluster.ensureLeader(leader);
        TestUtils.sender(leader).stopBlock(1);
        Node node = this.cluster.getNode(new Endpoint(NetworkAddress.from((String) atomicReference.get())));
        Assertions.assertFalse(waitForCondition(() -> {
            return node.getOptions().getFsm().getOnStopFollowingTimes() != 0;
        }, 1000L), "The follower shouldn't stop following");
    }

    @Test
    public void readCommittedUserLog() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unitest", this.dataPath, generatePeers, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        this.cluster.ensureLeader(leader);
        int i = 10;
        sendTestTaskAndWait(leader, 10);
        Assertions.assertTrue(waitForCondition(() -> {
            try {
                return leader.readCommittedUserLog((long) (1 + i)) != null;
            } catch (Exception e) {
                return false;
            }
        }, 10000L));
        UserLog readCommittedUserLog = leader.readCommittedUserLog(1L);
        Assertions.assertNotNull(readCommittedUserLog);
        Assertions.assertEquals(2L, readCommittedUserLog.getIndex());
        Assertions.assertEquals("hello0", stringFromBytes(readCommittedUserLog.getData().array()));
        UserLog readCommittedUserLog2 = leader.readCommittedUserLog(5L);
        Assertions.assertNotNull(readCommittedUserLog2);
        Assertions.assertEquals(5L, readCommittedUserLog2.getIndex());
        Assertions.assertEquals("hello3", stringFromBytes(readCommittedUserLog2.getData().array()));
        try {
            Assertions.assertNull(leader.readCommittedUserLog(15L));
            Assertions.fail();
        } catch (LogIndexOutOfBoundsException e) {
            Assertions.assertEquals("Request index 15 is greater than lastAppliedIndex: 11", e.getMessage());
        }
        try {
            Assertions.assertNull(leader.readCommittedUserLog(0L));
            Assertions.fail();
        } catch (LogIndexOutOfBoundsException e2) {
            Assertions.assertEquals("Request index is invalid: 0", e2.getMessage());
        }
        LOG.info("Trigger leader snapshot", new Object[0]);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        leader.snapshot(new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        List<Node> followers = this.cluster.getFollowers();
        Assertions.assertEquals(2, followers.size());
        Node node = followers.get(0);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        leader.removePeer(node.getNodeId().getPeerId(), new ExpectClosure(countDownLatch2));
        waitLatch(countDownLatch2);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        leader.addPeer(node.getNodeId().getPeerId(), new ExpectClosure(countDownLatch3));
        waitLatch(countDownLatch3);
        sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
        LOG.info("Trigger leader snapshot", new Object[0]);
        CountDownLatch countDownLatch4 = new CountDownLatch(1);
        leader.snapshot(new ExpectClosure(countDownLatch4));
        waitLatch(countDownLatch4);
        Thread.sleep(100L);
        try {
            leader.readCommittedUserLog(5L);
            Assertions.fail();
        } catch (LogNotFoundException e3) {
            Assertions.assertEquals("User log is deleted at index: 5", e3.getMessage());
        }
        UserLog readCommittedUserLog3 = leader.readCommittedUserLog(12L);
        Assertions.assertNotNull(readCommittedUserLog3);
        Assertions.assertEquals(16L, readCommittedUserLog3.getIndex());
        Assertions.assertEquals("hello10", stringFromBytes(readCommittedUserLog3.getData().array()));
        UserLog readCommittedUserLog4 = leader.readCommittedUserLog(17L);
        Assertions.assertNotNull(readCommittedUserLog4);
        Assertions.assertEquals(17L, readCommittedUserLog4.getIndex());
        Assertions.assertEquals("hello11", stringFromBytes(readCommittedUserLog4.getData().array()));
        this.cluster.ensureSame();
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        for (MockStateMachine mockStateMachine : this.cluster.getFsms()) {
            Assertions.assertEquals(20, mockStateMachine.getLogs().size());
            for (int i2 = 0; i2 < 20; i2++) {
                Assertions.assertEquals("hello" + i2, stringFromBytes(mockStateMachine.getLogs().get(i2).array()));
            }
        }
    }

    @Test
    public void testBootStrapWithSnapshot() throws Exception {
        Endpoint endpoint = new Endpoint("127.0.0.1", 5006);
        MockStateMachine mockStateMachine = new MockStateMachine(endpoint);
        Path of = Path.of(this.dataPath, "node0", "log");
        Files.createDirectories(of, new FileAttribute[0]);
        char c = 'a';
        while (true) {
            char c2 = c;
            if (c2 > 'z') {
                break;
            }
            mockStateMachine.getLogs().add(ByteBuffer.wrap(new byte[]{(byte) c2}));
            c = (char) (c2 + 1);
        }
        BootstrapOptions bootstrapOptions = new BootstrapOptions();
        DefaultLogStorageFactory defaultLogStorageFactory = new DefaultLogStorageFactory(of) { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.8
            public LogStorage getLogStorage(String str, RaftOptions raftOptions) {
                return super.getLogStorage("test", raftOptions);
            }
        };
        defaultLogStorageFactory.start();
        bootstrapOptions.setServiceFactory(new DefaultJRaftServiceFactory(defaultLogStorageFactory));
        bootstrapOptions.setLastLogIndex(mockStateMachine.getLogs().size());
        bootstrapOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        bootstrapOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        bootstrapOptions.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006"));
        bootstrapOptions.setFsm(mockStateMachine);
        NodeOptions nodeOptions = new NodeOptions();
        bootstrapOptions.setNodeOptions(nodeOptions);
        Assertions.assertTrue(JRaftUtils.bootstrap(bootstrapOptions));
        defaultLogStorageFactory.close();
        nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        DefaultLogStorageFactory defaultLogStorageFactory2 = new DefaultLogStorageFactory(of) { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.9
            public LogStorage getLogStorage(String str, RaftOptions raftOptions) {
                return super.getLogStorage("test", raftOptions);
            }
        };
        defaultLogStorageFactory2.start();
        nodeOptions.setServiceFactory(new DefaultJRaftServiceFactory(defaultLogStorageFactory2));
        nodeOptions.setFsm(mockStateMachine);
        Node start = createService("test", new PeerId(endpoint, 0), nodeOptions).start();
        Assertions.assertEquals(26, mockStateMachine.getLogs().size());
        for (int i = 0; i < 26; i++) {
            Assertions.assertEquals(97 + i, mockStateMachine.getLogs().get(i).get());
        }
        while (!start.isLeader()) {
            Thread.sleep(20L);
        }
        sendTestTaskAndWait(start);
        Assertions.assertEquals(36, mockStateMachine.getLogs().size());
    }

    @Test
    public void testBootStrapWithoutSnapshot() throws Exception {
        Endpoint endpoint = new Endpoint("127.0.0.1", 5006);
        MockStateMachine mockStateMachine = new MockStateMachine(endpoint);
        Path of = Path.of(this.dataPath, "node0", "log");
        Files.createDirectories(of, new FileAttribute[0]);
        BootstrapOptions bootstrapOptions = new BootstrapOptions();
        DefaultLogStorageFactory defaultLogStorageFactory = new DefaultLogStorageFactory(of) { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.10
            public LogStorage getLogStorage(String str, RaftOptions raftOptions) {
                return super.getLogStorage("test", raftOptions);
            }
        };
        defaultLogStorageFactory.start();
        bootstrapOptions.setServiceFactory(new DefaultJRaftServiceFactory(defaultLogStorageFactory));
        bootstrapOptions.setLastLogIndex(0L);
        bootstrapOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        bootstrapOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        bootstrapOptions.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006"));
        bootstrapOptions.setFsm(mockStateMachine);
        NodeOptions nodeOptions = new NodeOptions();
        bootstrapOptions.setNodeOptions(nodeOptions);
        Assertions.assertTrue(JRaftUtils.bootstrap(bootstrapOptions));
        defaultLogStorageFactory.close();
        nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
        nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
        nodeOptions.setFsm(mockStateMachine);
        DefaultLogStorageFactory defaultLogStorageFactory2 = new DefaultLogStorageFactory(of) { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.11
            public LogStorage getLogStorage(String str, RaftOptions raftOptions) {
                return super.getLogStorage("test", raftOptions);
            }
        };
        defaultLogStorageFactory2.start();
        nodeOptions.setServiceFactory(new DefaultJRaftServiceFactory(defaultLogStorageFactory2));
        Node start = createService("test", new PeerId(endpoint, 0), nodeOptions).start();
        while (!start.isLeader()) {
            Thread.sleep(20L);
        }
        sendTestTaskAndWait(start);
        Assertions.assertEquals(10, mockStateMachine.getLogs().size());
    }

    @Test
    public void testChangePeers() throws Exception {
        changePeers(false);
    }

    @Test
    public void testChangeAsyncPeers() throws Exception {
        changePeers(true);
    }

    private void changePeers(boolean z) throws Exception {
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        this.cluster = new TestCluster("testChangePeers", this.dataPath, Collections.singletonList(peerId), this.testInfo);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        this.cluster.waitLeader();
        sendTestTaskAndWait(this.cluster.getLeader());
        for (int i = 1; i < 10; i++) {
            Assertions.assertTrue(this.cluster.start(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i).getEndpoint(), false, 300));
        }
        for (int i2 = 0; i2 < 9; i2++) {
            this.cluster.waitLeader();
            Node leader = this.cluster.getLeader();
            Assertions.assertNotNull(leader);
            Assertions.assertEquals(new PeerId(TestUtils.getLocalAddress(), peerId.getEndpoint().getPort() + i2), leader.getNodeId().getPeerId());
            PeerId peerId2 = new PeerId(TestUtils.getLocalAddress(), peerId.getEndpoint().getPort() + i2 + 1);
            if (z) {
                SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
                leader.changePeersAsync(new Configuration(Collections.singletonList(peerId2)), leader.getCurrentTerm(), synchronizedClosure);
                Status await = synchronizedClosure.await();
                Assertions.assertTrue(await.isOk(), await.getRaftError().toString());
                Assertions.assertTrue(waitForCondition(() -> {
                    if (this.cluster.getLeader() != null) {
                        return peerId2.equals(this.cluster.getLeader().getLeaderId());
                    }
                    return false;
                }, 10000L));
            } else {
                SynchronizedClosure synchronizedClosure2 = new SynchronizedClosure();
                leader.changePeers(new Configuration(Collections.singletonList(peerId2)), synchronizedClosure2);
                Status await2 = synchronizedClosure2.await();
                Assertions.assertTrue(await2.isOk(), await2.getRaftError().toString());
            }
        }
        this.cluster.waitLeader();
        java.util.Iterator<MockStateMachine> it = this.cluster.getFsms().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(10, it.next().getLogs().size());
        }
    }

    @Test
    public void testOnReconfigurationErrorListener() throws Exception {
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        this.cluster = new TestCluster("testChangePeers", this.dataPath, Collections.singletonList(peerId), this.testInfo);
        RaftGroupEventsListener raftGroupEventsListener = (RaftGroupEventsListener) Mockito.mock(RaftGroupEventsListener.class);
        this.cluster.setRaftGrpEvtsLsnr(raftGroupEventsListener);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        sendTestTaskAndWait(leader);
        ((RaftGroupEventsListener) Mockito.verify(raftGroupEventsListener, Mockito.never())).onNewPeersConfigurationApplied((List) ArgumentMatchers.any());
        PeerId peerId2 = new PeerId(TestUtils.getLocalAddress(), 5004);
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        leader.changePeersAsync(new Configuration(Collections.singletonList(peerId2)), leader.getCurrentTerm(), synchronizedClosure);
        Assertions.assertEquals(synchronizedClosure.await(), Status.OK());
        ((RaftGroupEventsListener) Mockito.verify(raftGroupEventsListener, Mockito.timeout(10000L))).onReconfigurationError((Status) ArgumentMatchers.argThat(status -> {
            return status.getRaftError() == RaftError.ECATCHUP;
        }), (List) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
    }

    @Test
    public void testNewPeersConfigurationAppliedListener() throws Exception {
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        this.cluster = new TestCluster("testChangePeers", this.dataPath, Collections.singletonList(peerId), this.testInfo);
        RaftGroupEventsListener raftGroupEventsListener = (RaftGroupEventsListener) Mockito.mock(RaftGroupEventsListener.class);
        this.cluster.setRaftGrpEvtsLsnr(raftGroupEventsListener);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        this.cluster.waitLeader();
        sendTestTaskAndWait(this.cluster.getLeader());
        for (int i = 1; i < 5; i++) {
            Assertions.assertTrue(this.cluster.start(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i).getEndpoint(), false, 300));
        }
        ((RaftGroupEventsListener) Mockito.verify(raftGroupEventsListener, Mockito.never())).onNewPeersConfigurationApplied((List) ArgumentMatchers.any());
        for (int i2 = 0; i2 < 4; i2++) {
            Node leader = this.cluster.getLeader();
            Assertions.assertNotNull(leader);
            Assertions.assertEquals(new PeerId(TestUtils.getLocalAddress(), peerId.getEndpoint().getPort() + i2), leader.getNodeId().getPeerId());
            PeerId peerId2 = new PeerId(TestUtils.getLocalAddress(), peerId.getEndpoint().getPort() + i2 + 1);
            SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
            leader.changePeersAsync(new Configuration(Collections.singletonList(peerId2)), leader.getCurrentTerm(), synchronizedClosure);
            Assertions.assertEquals(synchronizedClosure.await(), Status.OK());
            Assertions.assertTrue(waitForCondition(() -> {
                if (this.cluster.getLeader() != null) {
                    return peerId2.equals(this.cluster.getLeader().getLeaderId());
                }
                return false;
            }, 10000L));
            ((RaftGroupEventsListener) Mockito.verify(raftGroupEventsListener, Mockito.times(1))).onNewPeersConfigurationApplied(Collections.singletonList(peerId2));
        }
    }

    @Test
    public void testChangePeersOnLeaderElected() throws Exception {
        List list = (List) IntStream.range(0, 6).mapToObj(i -> {
            return new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i);
        }).collect(Collectors.toList());
        this.cluster = new TestCluster("testChangePeers", this.dataPath, list, this.testInfo);
        RaftGroupEventsListener raftGroupEventsListener = (RaftGroupEventsListener) Mockito.mock(RaftGroupEventsListener.class);
        this.cluster.setRaftGrpEvtsLsnr(raftGroupEventsListener);
        java.util.Iterator it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(((PeerId) it.next()).getEndpoint(), false, 300));
        }
        this.cluster.waitLeader();
        ((RaftGroupEventsListener) Mockito.verify(raftGroupEventsListener, Mockito.times(1))).onLeaderElected(ArgumentMatchers.anyLong());
        this.cluster.stop(this.cluster.getLeader().getLeaderId().getEndpoint());
        this.cluster.waitLeader();
        ((RaftGroupEventsListener) Mockito.verify(raftGroupEventsListener, Mockito.times(2))).onLeaderElected(ArgumentMatchers.anyLong());
        this.cluster.stop(this.cluster.getLeader().getLeaderId().getEndpoint());
        this.cluster.waitLeader();
        ((RaftGroupEventsListener) Mockito.verify(raftGroupEventsListener, Mockito.times(3))).onLeaderElected(ArgumentMatchers.anyLong());
    }

    @Test
    public void changePeersAsyncResponses() throws Exception {
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        this.cluster = new TestCluster("testChangePeers", this.dataPath, Collections.singletonList(peerId), this.testInfo);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        this.cluster.waitLeader();
        sendTestTaskAndWait(this.cluster.getLeader());
        Assertions.assertTrue(this.cluster.start(new PeerId(TestUtils.getLocalAddress(), 5004).getEndpoint(), false, 300));
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        PeerId peerId2 = new PeerId(TestUtils.getLocalAddress(), peerId.getEndpoint().getPort());
        Assertions.assertEquals(peerId2, leader.getNodeId().getPeerId());
        PeerId peerId3 = new PeerId(TestUtils.getLocalAddress(), peerId.getEndpoint().getPort() + 1);
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        leader.changePeersAsync(new Configuration(Collections.singletonList(peerId3)), leader.getCurrentTerm() - 1, synchronizedClosure);
        Assertions.assertEquals(synchronizedClosure.await(), Status.OK());
        SynchronizedClosure synchronizedClosure2 = new SynchronizedClosure();
        leader.changePeersAsync(new Configuration(Collections.singletonList(peerId2)), leader.getCurrentTerm(), synchronizedClosure2);
        Assertions.assertEquals(synchronizedClosure2.await(), Status.OK());
        SynchronizedClosure synchronizedClosure3 = new SynchronizedClosure();
        leader.changePeersAsync(new Configuration(Collections.singletonList(peerId3)), leader.getCurrentTerm(), synchronizedClosure3);
        Assertions.assertEquals(synchronizedClosure3.await(), Status.OK());
        Assertions.assertTrue(waitForCondition(() -> {
            if (this.cluster.getLeader() != null) {
                return peerId3.equals(this.cluster.getLeader().getLeaderId());
            }
            return false;
        }, 10000L));
        java.util.Iterator<MockStateMachine> it = this.cluster.getFsms().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(10, it.next().getLogs().size());
        }
        Node leader2 = this.cluster.getLeader();
        sendTestTaskAndWait(leader2);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 2; i++) {
            SynchronizedClosure synchronizedClosure4 = new SynchronizedClosure();
            arrayList.add(synchronizedClosure4);
            arrayList2.add(newFixedThreadPool.submit(() -> {
                leader2.changePeersAsync(new Configuration(Collections.singletonList(peerId)), 2L, synchronizedClosure4);
            }));
        }
        ((Future) arrayList2.get(0)).get();
        ((Future) arrayList2.get(1)).get();
        Assertions.assertEquals(((SynchronizedClosure) arrayList.get(0)).await(), Status.OK());
        Assertions.assertEquals(((SynchronizedClosure) arrayList.get(1)).await().getRaftError(), RaftError.EBUSY);
        Assertions.assertTrue(waitForCondition(() -> {
            if (this.cluster.getLeader() != null) {
                return peerId.equals(this.cluster.getLeader().getLeaderId());
            }
            return false;
        }, 10000L));
        java.util.Iterator<MockStateMachine> it2 = this.cluster.getFsms().iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(20, it2.next().getLogs().size());
        }
    }

    @Test
    public void testChangePeersAddMultiNodes() throws Exception {
        PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        this.cluster = new TestCluster("testChangePeers", this.dataPath, Collections.singletonList(peerId), this.testInfo);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        this.cluster.waitLeader();
        Node leader = this.cluster.getLeader();
        sendTestTaskAndWait(leader);
        Configuration configuration = new Configuration();
        for (int i = 0; i < 3; i++) {
            configuration.addPeer(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i));
        }
        PeerId peerId2 = new PeerId(TestUtils.getLocalAddress(), peerId.getEndpoint().getPort() + 1);
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        leader.changePeers(new Configuration(Collections.singletonList(peerId2)), synchronizedClosure);
        Assertions.assertEquals(RaftError.ECATCHUP, synchronizedClosure.await().getRaftError());
        Assertions.assertTrue(this.cluster.start(peerId2.getEndpoint()));
        synchronizedClosure.reset();
        leader.changePeers(configuration, synchronizedClosure);
        Assertions.assertEquals(RaftError.ECATCHUP, synchronizedClosure.await().getRaftError());
        Assertions.assertTrue(this.cluster.start(new PeerId(TestUtils.getLocalAddress(), peerId.getEndpoint().getPort() + 2).getEndpoint()));
        synchronizedClosure.reset();
        leader.changePeers(configuration, synchronizedClosure);
        Status await = synchronizedClosure.await();
        Assertions.assertTrue(await.isOk(), await.getErrorMsg());
        this.cluster.ensureSame();
        Assertions.assertEquals(3, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it = this.cluster.getFsms().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(10, it.next().getLogs().size());
        }
    }

    @Test
    public void testChangePeersStepsDownInJointConsensus() throws Exception {
        ArrayList arrayList = new ArrayList();
        PeerId peerId = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5006");
        PeerId peerId2 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5007");
        PeerId peerId3 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5008");
        PeerId peerId4 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5009");
        arrayList.add(peerId);
        this.cluster = new TestCluster("testChangePeersStepsDownInJointConsensus", this.dataPath, arrayList, this.testInfo);
        Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        this.cluster.waitLeader();
        NodeImpl leader = this.cluster.getLeader();
        Assertions.assertNotNull(leader);
        sendTestTaskAndWait(leader);
        Assertions.assertTrue(this.cluster.start(peerId2.getEndpoint()));
        Assertions.assertTrue(this.cluster.start(peerId3.getEndpoint()));
        Assertions.assertTrue(this.cluster.start(peerId4.getEndpoint()));
        Assertions.assertTrue(waitForTopology(this.cluster, leader.getNodeId().getPeerId().getEndpoint(), 4, 3000L));
        Configuration configuration = new Configuration();
        configuration.addPeer(peerId);
        configuration.addPeer(peerId2);
        configuration.addPeer(peerId3);
        configuration.addPeer(peerId4);
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        leader.changePeers(configuration, synchronizedClosure);
        Assertions.assertTrue(synchronizedClosure.await().isOk());
        Assertions.assertTrue(this.cluster.stop(peerId4.getEndpoint()));
        configuration.removePeer(peerId);
        configuration.removePeer(peerId2);
        synchronizedClosure.reset();
        leader.changePeers(configuration, synchronizedClosure);
        Assertions.assertEquals(RaftError.EPERM, synchronizedClosure.await().getRaftError());
        LOG.info(synchronizedClosure.getStatus().toString(), new Object[0]);
        Assertions.assertFalse(leader.getConf().isStable());
        Assertions.assertNull(this.cluster.getLeader());
        Assertions.assertTrue(this.cluster.start(peerId4.getEndpoint()));
        Thread.sleep(1000L);
        this.cluster.waitLeader();
        List listPeers = this.cluster.getLeader().listPeers();
        Assertions.assertTrue(!listPeers.isEmpty());
        Assertions.assertEquals(configuration.getPeerSet(), new HashSet(listPeers));
    }

    private Future<?> startChangePeersThread(ChangeArg changeArg) {
        HashSet hashSet = new HashSet();
        hashSet.add(RaftError.EBUSY);
        hashSet.add(RaftError.EPERM);
        hashSet.add(RaftError.ECATCHUP);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        this.executors.add(newSingleThreadExecutor);
        return Utils.runInThread(newSingleThreadExecutor, () -> {
            while (!changeArg.stop) {
                try {
                    changeArg.c.waitLeader();
                    Node leader = changeArg.c.getLeader();
                    if (leader != null) {
                        Configuration configuration = new Configuration();
                        if (changeArg.dontRemoveFirstPeer) {
                            configuration.addPeer(changeArg.peers.get(0));
                        }
                        for (int i = 0; i < changeArg.peers.size(); i++) {
                            if ((ThreadLocalRandom.current().nextInt(64) < 32) && !configuration.contains(changeArg.peers.get(i))) {
                                configuration.addPeer(changeArg.peers.get(i));
                            }
                        }
                        if (configuration.isEmpty()) {
                            LOG.warn("No peer has been selected", new Object[0]);
                        } else {
                            SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
                            leader.changePeers(configuration, synchronizedClosure);
                            synchronizedClosure.await();
                            Assertions.assertTrue(synchronizedClosure.getStatus().isOk() || hashSet.contains(synchronizedClosure.getStatus().getRaftError()), synchronizedClosure.getStatus().toString());
                        }
                    }
                } catch (InterruptedException e) {
                    LOG.error("ChangePeersThread is interrupted", e);
                    return;
                }
            }
        });
    }

    @Test
    public void testChangePeersChaosWithSnapshot() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT));
        this.cluster = new TestCluster("unittest", this.dataPath, arrayList, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        Assertions.assertTrue(this.cluster.start(((PeerId) arrayList.get(0)).getEndpoint(), false, 2));
        for (int i = 1; i < 10; i++) {
            PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i);
            arrayList.add(peerId);
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint()));
        }
        ChangeArg changeArg = new ChangeArg(this.cluster, arrayList, false, false);
        Future<?> startChangePeersThread = startChangePeersThread(changeArg);
        int i2 = 0;
        while (i2 < 5000) {
            this.cluster.waitLeader();
            Node leader = this.cluster.getLeader();
            if (leader != null) {
                SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
                leader.apply(new Task(ByteBuffer.wrap(("hello" + i2).getBytes(StandardCharsets.UTF_8)), synchronizedClosure));
                Status await = synchronizedClosure.await();
                if (await.isOk()) {
                    i2++;
                    if (i2 % 100 == 0) {
                        System.out.println("Progress:" + i2);
                    }
                } else {
                    Assertions.assertEquals(RaftError.EPERM, await.getRaftError());
                }
            }
        }
        changeArg.stop = true;
        startChangePeersThread.get();
        this.cluster.waitLeader();
        SynchronizedClosure synchronizedClosure2 = new SynchronizedClosure();
        this.cluster.getLeader().changePeers(new Configuration(arrayList), synchronizedClosure2);
        Status await2 = synchronizedClosure2.await();
        Assertions.assertTrue(await2.isOk(), await2.getErrorMsg());
        this.cluster.ensureSame();
        Assertions.assertEquals(10, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it = this.cluster.getFsms().iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(it.next().getLogs().size() >= 5000);
        }
    }

    @Test
    public void testChangePeersChaosWithoutSnapshot() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT));
        this.cluster = new TestCluster("unittest", this.dataPath, arrayList, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        Assertions.assertTrue(this.cluster.start(((PeerId) arrayList.get(0)).getEndpoint(), false, 100000));
        for (int i = 1; i < 10; i++) {
            PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i);
            arrayList.add(peerId);
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), false, 10000));
        }
        ChangeArg changeArg = new ChangeArg(this.cluster, arrayList, false, true);
        Future<?> startChangePeersThread = startChangePeersThread(changeArg);
        int i2 = 0;
        while (i2 < 5000) {
            this.cluster.waitLeader();
            Node leader = this.cluster.getLeader();
            if (leader != null) {
                SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
                leader.apply(new Task(ByteBuffer.wrap(("hello" + i2).getBytes(StandardCharsets.UTF_8)), synchronizedClosure));
                Status await = synchronizedClosure.await();
                if (await.isOk()) {
                    i2++;
                    if (i2 % 100 == 0) {
                        System.out.println("Progress:" + i2);
                    }
                } else {
                    Assertions.assertEquals(RaftError.EPERM, await.getRaftError());
                }
            }
        }
        changeArg.stop = true;
        startChangePeersThread.get();
        this.cluster.waitLeader();
        SynchronizedClosure synchronizedClosure2 = new SynchronizedClosure();
        this.cluster.getLeader().changePeers(new Configuration(arrayList), synchronizedClosure2);
        Assertions.assertTrue(synchronizedClosure2.await().isOk());
        this.cluster.ensureSame();
        Assertions.assertEquals(10, this.cluster.getFsms().size());
        for (MockStateMachine mockStateMachine : this.cluster.getFsms()) {
            Assertions.assertTrue(mockStateMachine.getLogs().size() >= 5000);
            Assertions.assertTrue(mockStateMachine.getLogs().size() - 5000 < 100);
        }
    }

    @Test
    public void testChangePeersChaosApplyTasks() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT));
        this.cluster = new TestCluster("unittest", this.dataPath, arrayList, TestCluster.ELECTION_TIMEOUT_MILLIS, this.testInfo);
        Assertions.assertTrue(this.cluster.start(((PeerId) arrayList.get(0)).getEndpoint(), false, 100000));
        for (int i = 1; i < 10; i++) {
            PeerId peerId = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i);
            arrayList.add(peerId);
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), false, 100000));
        }
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        this.executors.add(newFixedThreadPool);
        for (int i2 = 0; i2 < 3; i2++) {
            ChangeArg changeArg = new ChangeArg(this.cluster, arrayList, false, true);
            arrayList2.add(changeArg);
            arrayList3.add(startChangePeersThread(changeArg));
            Utils.runInThread(newFixedThreadPool, () -> {
                try {
                    int i3 = 0;
                    while (i3 < 5000) {
                        try {
                            this.cluster.waitLeader();
                            Node leader = this.cluster.getLeader();
                            if (leader != null) {
                                SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
                                leader.apply(new Task(ByteBuffer.wrap(("hello" + i3).getBytes(StandardCharsets.UTF_8)), synchronizedClosure));
                                Status await = synchronizedClosure.await();
                                if (await.isOk()) {
                                    i3++;
                                    if (i3 % 100 == 0) {
                                        System.out.println("Progress:" + i3);
                                    }
                                } else {
                                    Assertions.assertEquals(RaftError.EPERM, await.getRaftError());
                                }
                            }
                        } catch (Exception e) {
                            LOG.error("Failed to run tasks", e);
                            countDownLatch.countDown();
                            return;
                        }
                    }
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            });
        }
        countDownLatch.await();
        java.util.Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            ((ChangeArg) it.next()).stop = true;
        }
        java.util.Iterator it2 = arrayList3.iterator();
        while (it2.hasNext()) {
            ((Future) it2.next()).get();
        }
        this.cluster.waitLeader();
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        this.cluster.getLeader().changePeers(new Configuration(arrayList), synchronizedClosure);
        Assertions.assertTrue(synchronizedClosure.await().isOk());
        this.cluster.ensureSame();
        Assertions.assertEquals(10, this.cluster.getFsms().size());
        java.util.Iterator<MockStateMachine> it3 = this.cluster.getFsms().iterator();
        while (it3.hasNext()) {
            int size = it3.next().getLogs().size();
            Assertions.assertTrue(size >= 15000, "logSize= " + size);
            Assertions.assertTrue(size - 15000 < 100, "logSize= " + size);
        }
    }

    @Test
    public void testBlockedElection() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        NodeImpl leader = this.cluster.getLeader();
        LOG.warn("Current leader {}, electTimeout={}", new Object[]{leader.getNodeId().getPeerId(), Integer.valueOf(leader.getOptions().getElectionTimeoutMs())});
        List<Node> followers = this.cluster.getFollowers();
        blockMessagesOnFollowers(followers, (obj, str) -> {
            return (obj instanceof RpcRequests.RequestVoteRequest) && !((RpcRequests.RequestVoteRequest) obj).preVote();
        });
        LOG.warn("Stop leader {}, curTerm={}", new Object[]{leader.getNodeId().getPeerId(), Long.valueOf(leader.getCurrentTerm())});
        Assertions.assertTrue(this.cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));
        Assertions.assertNull(this.cluster.getLeader());
        Thread.sleep(2000L);
        Assertions.assertNull(this.cluster.getLeader());
        stopBlockingMessagesOnFollowers(followers);
        this.cluster.waitLeader();
        NodeImpl leader2 = this.cluster.getLeader();
        LOG.info("Elect new leader is {}, curTerm={}", new Object[]{leader2.getLeaderId(), Long.valueOf(leader2.getCurrentTerm())});
    }

    @Test
    public void testElectionTimeoutAutoAdjustWhenBlockedAllMessages() throws Exception {
        testElectionTimeoutAutoAdjustWhenBlockedMessages((obj, str) -> {
            return true;
        });
    }

    @Test
    public void testElectionTimeoutAutoAdjustWhenBlockedRequestVoteMessages() throws Exception {
        testElectionTimeoutAutoAdjustWhenBlockedMessages((obj, str) -> {
            return (obj instanceof RpcRequests.RequestVoteRequest) && !((RpcRequests.RequestVoteRequest) obj).preVote();
        });
    }

    private void testElectionTimeoutAutoAdjustWhenBlockedMessages(BiPredicate<Object, String> biPredicate) throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(4);
        int i = 3;
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, new LinkedHashSet(), TestCluster.ELECTION_TIMEOUT_MILLIS, nodeOptions -> {
            nodeOptions.setElectionTimeoutStrategy(new ExponentialBackoffTimeoutStrategy(11000, i));
        }, this.testInfo);
        java.util.Iterator<PeerId> it = generatePeers.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(this.cluster.start(it.next().getEndpoint()));
        }
        this.cluster.waitLeader();
        NodeImpl leader = this.cluster.getLeader();
        int electionTimeoutMs = leader.getOptions().getElectionTimeoutMs();
        LOG.warn("Current leader {}, electTimeout={}", new Object[]{leader.getNodeId().getPeerId(), Integer.valueOf(leader.getOptions().getElectionTimeoutMs())});
        List<Node> followers = this.cluster.getFollowers();
        java.util.Iterator<Node> it2 = followers.iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(electionTimeoutMs, ((Node) it2.next()).getOptions().getElectionTimeoutMs());
        }
        blockMessagesOnFollowers(followers, biPredicate);
        LOG.warn("Stop leader {}, curTerm={}", new Object[]{leader.getNodeId().getPeerId(), Long.valueOf(leader.getCurrentTerm())});
        Assertions.assertTrue(this.cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));
        Assertions.assertNull(this.cluster.getLeader());
        Assertions.assertTrue(waitForCondition(() -> {
            return followers.stream().allMatch(node -> {
                return node.getOptions().getElectionTimeoutMs() > electionTimeoutMs;
            });
        }, 3 * (electionTimeoutMs + followers.get(0).getOptions().getRaftOptions().getMaxElectionDelayMs()) * 2));
        stopBlockingMessagesOnFollowers(followers);
        this.cluster.waitLeader();
        NodeImpl leader2 = this.cluster.getLeader();
        LOG.info("Elected new leader is {}, curTerm={}", new Object[]{leader2.getLeaderId(), Long.valueOf(leader2.getCurrentTerm())});
        Assertions.assertTrue(waitForCondition(() -> {
            return followers.stream().allMatch(node -> {
                return node.getOptions().getElectionTimeoutMs() == electionTimeoutMs;
            });
        }, 3000L));
    }

    @Test
    public void testLeaseReadAfterSegmentation() throws Exception {
        List<PeerId> generatePeers = TestUtils.generatePeers(3);
        this.cluster = new TestCluster("unittest", this.dataPath, generatePeers, 3000, this.testInfo);
        for (PeerId peerId : generatePeers) {
            RaftOptions raftOptions = new RaftOptions();
            raftOptions.setElectionHeartbeatFactor(2);
            raftOptions.setReadOnlyOptions(ReadOnlyOption.ReadOnlyLeaseBased);
            Assertions.assertTrue(this.cluster.start(peerId.getEndpoint(), false, 300, false, null, raftOptions));
        }
        this.cluster.waitLeader();
        Node node = (NodeImpl) this.cluster.getLeader();
        Assertions.assertNotNull(node);
        this.cluster.ensureLeader(node);
        sendTestTaskAndWait(node);
        this.cluster.ensureSame();
        RpcClientEx rpcClient = node.getRpcClientService().getRpcClient();
        AtomicInteger atomicInteger = new AtomicInteger();
        rpcClient.blockMessages((obj, str) -> {
            Assertions.assertTrue(obj instanceof RpcRequests.AppendEntriesRequest);
            if (atomicInteger.get() >= 2) {
                return true;
            }
            LOG.info("Send heartbeat: " + obj + " to " + str, new Object[0]);
            atomicInteger.incrementAndGet();
            return false;
        });
        Assertions.assertTrue(waitForCondition(() -> {
            return (this.cluster.getLeader() == null || node.getNodeId().equals(this.cluster.getLeader().getNodeId())) ? false : true;
        }, 10000L));
        final CompletableFuture completableFuture = new CompletableFuture();
        this.cluster.getLeader().readIndex((byte[]) null, new ReadIndexClosure() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.12
            public void run(Status status, long j, byte[] bArr) {
                completableFuture.complete(status);
            }
        });
        Assertions.assertTrue(((Status) completableFuture.get()).isOk());
    }

    private NodeOptions createNodeOptions(int i) {
        NodeOptions nodeOptions = new NodeOptions();
        DefaultLogStorageFactory defaultLogStorageFactory = new DefaultLogStorageFactory(Path.of(this.dataPath, "node" + i, "log"));
        defaultLogStorageFactory.start();
        nodeOptions.setServiceFactory(new DefaultJRaftServiceFactory(defaultLogStorageFactory));
        return nodeOptions;
    }

    private boolean waitForTopology(TestCluster testCluster, Endpoint endpoint, int i, long j) {
        RaftGroupService server = testCluster.getServer(endpoint);
        if (server == null) {
            LOG.warn("Node has not been found {}", new Object[]{endpoint});
            return false;
        }
        if (!(server.getRpcServer() instanceof IgniteRpcServer)) {
            return true;
        }
        ClusterService clusterService = server.getRpcServer().clusterService();
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (System.currentTimeMillis() < currentTimeMillis) {
            if (clusterService.topologyService().allMembers().size() >= i) {
                return true;
            }
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
                return false;
            }
        }
        return false;
    }

    private boolean waitForCondition(BooleanSupplier booleanSupplier, long j) {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (System.currentTimeMillis() < currentTimeMillis) {
            if (booleanSupplier.getAsBoolean()) {
                return true;
            }
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
                return false;
            }
        }
        return false;
    }

    private RaftGroupService createService(String str, PeerId peerId, NodeOptions nodeOptions) {
        Configuration initialConf = nodeOptions.getInitialConf();
        nodeOptions.setStripes(1);
        List list = (List) (initialConf == null ? Stream.empty() : Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream())).map((v0) -> {
            return v0.getEndpoint();
        }).map(JRaftUtils::addressFromEndpoint).collect(Collectors.toList());
        NodeManager nodeManager = new NodeManager();
        final ClusterService clusterService = ClusterServiceTestUtils.clusterService(this.testInfo, peerId.getEndpoint().getPort(), new StaticNodeFinder(list));
        ExecutorService createRequestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
        this.executors.add(createRequestExecutor);
        final TestIgniteRpcServer testIgniteRpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions, createRequestExecutor);
        nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
        clusterService.start();
        RaftGroupService raftGroupService = new RaftGroupService(str, peerId, nodeOptions, testIgniteRpcServer, nodeManager) { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.13
            public synchronized void shutdown() {
                testIgniteRpcServer.shutdown();
                super.shutdown();
                clusterService.stop();
            }
        };
        this.services.add(raftGroupService);
        return raftGroupService;
    }

    private void sendTestTaskAndWait(Node node) throws InterruptedException {
        sendTestTaskAndWait(node, 0, 10, RaftError.SUCCESS);
    }

    private void sendTestTaskAndWait(Node node, int i) throws InterruptedException {
        sendTestTaskAndWait(node, 0, i, RaftError.SUCCESS);
    }

    private void sendTestTaskAndWait(Node node, RaftError raftError) throws InterruptedException {
        sendTestTaskAndWait(node, 0, 10, raftError);
    }

    private void sendTestTaskAndWait(Node node, int i, int i2, RaftError raftError) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(i2);
        for (int i3 = i; i3 < i + i2; i3++) {
            node.apply(new Task(ByteBuffer.wrap(("hello" + i3).getBytes(StandardCharsets.UTF_8)), new ExpectClosure(raftError, countDownLatch)));
        }
        waitLatch(countDownLatch);
    }

    private void sendTestTaskAndWait(Node node, int i, RaftError raftError) throws InterruptedException {
        sendTestTaskAndWait(node, i, 10, raftError);
    }

    private void sendTestTaskAndWait(String str, Node node, int i) throws InterruptedException {
        sendTestTaskAndWait(str, node, 10, i);
    }

    private void sendTestTaskAndWait(String str, Node node, int i, int i2) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i3 = 0; i3 < i; i3++) {
            node.apply(new Task(ByteBuffer.wrap((str + i3).getBytes(StandardCharsets.UTF_8)), new ExpectClosure(i2, (String) null, countDownLatch)));
        }
        waitLatch(countDownLatch);
    }

    private void triggerLeaderSnapshot(TestCluster testCluster, Node node) throws InterruptedException {
        triggerLeaderSnapshot(testCluster, node, 1);
    }

    private void triggerLeaderSnapshot(TestCluster testCluster, Node node, int i) throws InterruptedException {
        int saveSnapshotTimes = testCluster.getLeaderFsm().getSaveSnapshotTimes();
        Assertions.assertTrue(saveSnapshotTimes == i - 1 || saveSnapshotTimes == i, "snapshotTimes=" + saveSnapshotTimes + ", times=" + i);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        node.snapshot(new ExpectClosure(countDownLatch));
        waitLatch(countDownLatch);
        Assertions.assertEquals(saveSnapshotTimes + 1, testCluster.getLeaderFsm().getSaveSnapshotTimes());
    }

    private void waitLatch(CountDownLatch countDownLatch) throws InterruptedException {
        Assertions.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
    }

    private boolean assertReadIndex(Node node, final int i) throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final byte[] randomBytes = TestUtils.getRandomBytes();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        node.readIndex(randomBytes, new ReadIndexClosure() { // from class: org.apache.ignite.raft.jraft.core.ItNodeTest.14
            public void run(Status status, long j, byte[] bArr) {
                if (status.isOk()) {
                    Assertions.assertEquals(i, j);
                    Assertions.assertArrayEquals(randomBytes, bArr);
                    atomicBoolean.set(true);
                } else {
                    Assertions.assertTrue(status.getErrorMsg().contains("RPC exception:Check connection["), status.getErrorMsg());
                    Assertions.assertTrue(status.getErrorMsg().contains("] fail and try to create new one"), status.getErrorMsg());
                }
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        return atomicBoolean.get();
    }

    private void blockMessagesOnFollowers(List<Node> list, BiPredicate<Object, String> biPredicate) {
        java.util.Iterator<Node> it = list.iterator();
        while (it.hasNext()) {
            TestUtils.sender(it.next()).blockMessages(biPredicate);
        }
    }

    private void stopBlockingMessagesOnFollowers(List<Node> list) {
        java.util.Iterator<Node> it = list.iterator();
        while (it.hasNext()) {
            TestUtils.sender(it.next()).stopBlock();
        }
    }
}
