package org.apache.ratis;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.NotReplicatedException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/ratis/RaftBasicTests.class */
public abstract class RaftBasicTests extends BaseTest {
    public static final int NUM_SERVERS = 5;
    protected static final RaftProperties properties = new RaftProperties();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/RaftBasicTests$Client4TestWithLoad.class */
    public static class Client4TestWithLoad extends Thread {
        boolean useAsync;
        final int index;
        final RaftTestUtil.SimpleMessage[] messages;
        final AtomicBoolean isRunning;
        final AtomicInteger step;
        final AtomicReference<Throwable> exceptionInClientThread;
        final MiniRaftCluster cluster;
        final Logger LOG;

        Client4TestWithLoad(int i, int i2, boolean z, MiniRaftCluster miniRaftCluster, Logger logger) {
            super("client-" + i);
            this.isRunning = new AtomicBoolean(true);
            this.step = new AtomicInteger();
            this.exceptionInClientThread = new AtomicReference<>();
            this.index = i;
            this.messages = RaftTestUtil.SimpleMessage.create(i2, i + "-");
            this.useAsync = z;
            this.cluster = miniRaftCluster;
            this.LOG = logger;
        }

        boolean isRunning() {
            return this.isRunning.get();
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    RaftClient createClient = this.cluster.createClient();
                    Throwable th = null;
                    try {
                        CompletableFuture completableFuture = new CompletableFuture();
                        for (int i = 0; i < this.messages.length; i++) {
                            if (this.useAsync) {
                                createClient.sendAsync(this.messages[i]).thenAcceptAsync(raftClientReply -> {
                                    if (!raftClientReply.isSuccess()) {
                                        completableFuture.completeExceptionally(new AssertionError("Failed with reply: " + raftClientReply));
                                    }
                                    if (this.step.incrementAndGet() == this.messages.length) {
                                        completableFuture.complete(null);
                                    }
                                    Assert.assertTrue(raftClientReply.isSuccess());
                                });
                            } else {
                                Assert.assertTrue(createClient.send(this.messages[this.step.getAndIncrement()]).isSuccess());
                            }
                        }
                        if (this.useAsync) {
                            completableFuture.join();
                            Assert.assertTrue(this.step.get() == this.messages.length);
                        }
                        if (createClient != null) {
                            if (0 != 0) {
                                try {
                                    createClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createClient.close();
                            }
                        }
                        this.isRunning.set(false);
                    } catch (Throwable th3) {
                        if (createClient != null) {
                            if (0 != 0) {
                                try {
                                    createClient.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                createClient.close();
                            }
                        }
                        throw th3;
                    }
                } catch (Throwable th5) {
                    this.isRunning.set(false);
                    throw th5;
                }
            } catch (Throwable th6) {
                if (this.exceptionInClientThread.compareAndSet(null, th6)) {
                    this.LOG.error(this + " failed", th6);
                } else {
                    this.exceptionInClientThread.get().addSuppressed(th6);
                    this.LOG.error(this + " failed again!", th6);
                }
                this.isRunning.set(false);
            }
        }

        @Override // java.lang.Thread
        public String toString() {
            return getClass().getSimpleName() + this.index + "(step=" + this.step + "/" + this.messages.length + ", isRunning=" + this.isRunning + ", isAlive=" + isAlive() + ", exception=" + this.exceptionInClientThread + ")";
        }
    }

    public RaftBasicTests() {
        LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
        LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG);
        LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
        RaftServerConfigKeys.RetryCache.setExpiryTime(properties, TimeDuration.valueOf(5L, TimeUnit.SECONDS));
    }

    public abstract MiniRaftCluster getCluster();

    public RaftProperties getProperties() {
        return properties;
    }

    @Before
    public void setup() throws IOException {
        Assert.assertNull(getCluster().getLeader());
        getCluster().start();
    }

    @After
    public void tearDown() {
        MiniRaftCluster cluster = getCluster();
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    @Test
    public void testBasicAppendEntries() throws Exception {
        runTestBasicAppendEntries(false, RaftProtos.ReplicationLevel.MAJORITY, false, 10, getCluster(), this.LOG);
    }

    @Test
    public void testBasicAppendEntriesKillLeader() throws Exception {
        runTestBasicAppendEntries(false, RaftProtos.ReplicationLevel.MAJORITY, true, 10, getCluster(), this.LOG);
    }

    @Test
    public void testBasicAppendEntriesWithAllReplication() throws Exception {
        runTestBasicAppendEntries(false, RaftProtos.ReplicationLevel.ALL, false, 10, getCluster(), this.LOG);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void killAndRestartServer(RaftPeerId raftPeerId, long j, long j2, MiniRaftCluster miniRaftCluster, Logger logger) {
        try {
            Thread.sleep(j);
            miniRaftCluster.killServer(raftPeerId);
            Thread.sleep(j2);
            logger.info("restart server: " + raftPeerId);
            miniRaftCluster.restartServer(raftPeerId, false);
        } catch (Exception e) {
            ExitUtils.terminate(-1, "Failed to kill/restart server: " + raftPeerId, e, logger);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void runTestBasicAppendEntries(boolean z, RaftProtos.ReplicationLevel replicationLevel, boolean z2, int i, MiniRaftCluster miniRaftCluster, Logger logger) throws Exception {
        logger.info("runTestBasicAppendEntries: async? {}, replication={}, killLeader={}, numMessages={}", new Object[]{Boolean.valueOf(z), replicationLevel, Boolean.valueOf(z2), Integer.valueOf(i)});
        Iterator<RaftServerProxy> it = miniRaftCluster.getServers().iterator();
        while (it.hasNext()) {
            miniRaftCluster.restartServer(it.next().getId(), false);
        }
        RaftServerImpl waitForLeader = RaftTestUtil.waitForLeader(miniRaftCluster);
        long currentTerm = waitForLeader.getState().getCurrentTerm();
        new Thread(() -> {
            killAndRestartServer(miniRaftCluster.getFollowers().get(0).getId(), 0L, 1000L, miniRaftCluster, logger);
        }).start();
        if (z2) {
            logger.info("killAndRestart leader " + waitForLeader.getId());
            new Thread(() -> {
                killAndRestartServer(waitForLeader.getId(), 2000L, 4000L, miniRaftCluster, logger);
            }).start();
        }
        logger.info(miniRaftCluster.printServers());
        RaftTestUtil.SimpleMessage[] create = RaftTestUtil.SimpleMessage.create(i);
        RaftClient createClient = miniRaftCluster.createClient();
        Throwable th = null;
        try {
            try {
                AtomicInteger atomicInteger = new AtomicInteger();
                CompletableFuture completableFuture = new CompletableFuture();
                for (RaftTestUtil.SimpleMessage simpleMessage : create) {
                    if (z) {
                        createClient.sendAsync(simpleMessage, replicationLevel).thenAcceptAsync(raftClientReply -> {
                            if (!raftClientReply.isSuccess()) {
                                completableFuture.completeExceptionally(new AssertionError("Failed with reply " + raftClientReply));
                            } else if (atomicInteger.incrementAndGet() == create.length) {
                                completableFuture.complete(null);
                            }
                        });
                    } else {
                        Preconditions.assertTrue(createClient.send(simpleMessage, replicationLevel).isSuccess());
                    }
                }
                if (z) {
                    completableFuture.join();
                    Assert.assertEquals(create.length, atomicInteger.get());
                }
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createClient.close();
                    }
                }
                if (replicationLevel != RaftProtos.ReplicationLevel.ALL) {
                    Thread.sleep(miniRaftCluster.getMaxTimeout() + 100);
                }
                logger.info(miniRaftCluster.printAllLogs());
                Iterator<RaftServerProxy> it2 = miniRaftCluster.getServers().iterator();
                while (it2.hasNext()) {
                    RaftServerImpl impl = it2.next().getImpl();
                    if (impl.isAlive() || replicationLevel == RaftProtos.ReplicationLevel.ALL) {
                        JavaUtils.attempt(() -> {
                            RaftTestUtil.assertLogEntries(impl, currentTerm, create);
                        }, 5, 1000L, impl.getId() + " assertLogEntries", logger);
                    }
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (th != null) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testOldLeaderCommit() throws Exception {
        this.LOG.info("Running testOldLeaderCommit");
        MiniRaftCluster cluster = getCluster();
        RaftServerImpl waitForLeader = RaftTestUtil.waitForLeader(cluster);
        RaftPeerId id = waitForLeader.getId();
        long currentTerm = waitForLeader.getState().getCurrentTerm();
        List<RaftServerImpl> followers = cluster.getFollowers();
        RaftServerImpl raftServerImpl = followers.get(0);
        for (int i = 1; i < 4; i++) {
            cluster.killServer(followers.get(i).getId());
        }
        RaftTestUtil.SimpleMessage[] create = RaftTestUtil.SimpleMessage.create(1);
        RaftTestUtil.sendMessageInNewThread(cluster, create);
        Thread.sleep(cluster.getMaxTimeout() + 100);
        Assert.assertTrue(RaftTestUtil.logEntriesContains(raftServerImpl.getState().getLog(), create));
        this.LOG.info(String.format("killing old leader: %s", id.toString()));
        cluster.killServer(id);
        for (int i2 = 1; i2 < 3; i2++) {
            RaftServerImpl raftServerImpl2 = followers.get(i2);
            this.LOG.info(String.format("restarting follower: %s", raftServerImpl2.getId().toString()));
            cluster.restartServer(raftServerImpl2.getId(), false);
        }
        Thread.sleep(cluster.getMaxTimeout() * 5);
        Assert.assertEquals(raftServerImpl.getId(), RaftTestUtil.waitForLeader(cluster).getId());
        cluster.getServerAliveStream().map(raftServerImpl3 -> {
            return raftServerImpl3.getState().getLog();
        }).forEach(raftLog -> {
            RaftTestUtil.assertLogEntries(raftLog, currentTerm, create);
        });
        this.LOG.info("terminating testOldLeaderCommit test");
    }

    @Test
    public void testOldLeaderNotCommit() throws Exception {
        this.LOG.info("Running testOldLeaderNotCommit");
        MiniRaftCluster cluster = getCluster();
        RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();
        List<RaftServerImpl> followers = cluster.getFollowers();
        RaftServerImpl raftServerImpl = followers.get(0);
        for (int i = 1; i < 4; i++) {
            cluster.killServer(followers.get(i).getId());
        }
        RaftTestUtil.SimpleMessage[] create = RaftTestUtil.SimpleMessage.create(1);
        RaftTestUtil.sendMessageInNewThread(cluster, create);
        Thread.sleep(cluster.getMaxTimeout() + 100);
        RaftTestUtil.logEntriesContains(raftServerImpl.getState().getLog(), create);
        cluster.killServer(id);
        cluster.killServer(raftServerImpl.getId());
        for (int i2 = 1; i2 < 4; i2++) {
            cluster.restartServer(followers.get(i2).getId(), false);
        }
        RaftTestUtil.waitForLeader(cluster);
        Thread.sleep(cluster.getMaxTimeout() + 100);
        Predicate predicate = logEntryProto -> {
            return logEntryProto.getTerm() != 1;
        };
        cluster.getServerAliveStream().map(raftServerImpl2 -> {
            return raftServerImpl2.getState().getLog();
        }).forEach(raftLog -> {
            RaftTestUtil.checkLogEntries(raftLog, create, predicate);
        });
    }

    @Test
    public void testWithLoad() throws Exception {
        testWithLoad(10, 500, false, getCluster(), this.LOG);
    }

    public static void testWithLoad(int i, int i2, boolean z, final MiniRaftCluster miniRaftCluster, final Logger logger) throws Exception {
        logger.info("Running testWithLoad: numClients=" + i + ", numMessages=" + i2 + ", async=" + z);
        RaftTestUtil.waitForLeader(miniRaftCluster);
        final List<Client4TestWithLoad> list = (List) Stream.iterate(0, num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).limit(i).map(num2 -> {
            return new Client4TestWithLoad(num2.intValue(), i2, z, miniRaftCluster, logger);
        }).collect(Collectors.toList());
        final AtomicInteger atomicInteger = new AtomicInteger();
        Timer timer = new Timer();
        timer.schedule(new TimerTask() { // from class: org.apache.ratis.RaftBasicTests.1
            private int previousLastStep;

            {
                this.previousLastStep = atomicInteger.get();
            }

            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                logger.info(miniRaftCluster.printServers());
                logger.info(BlockRequestHandlingInjection.getInstance().toString());
                logger.info(miniRaftCluster.toString());
                List list2 = list;
                Logger logger2 = logger;
                list2.forEach(client4TestWithLoad -> {
                    logger2.info("  " + client4TestWithLoad);
                });
                Logger logger3 = logger;
                JavaUtils.dumpAllThreads(str -> {
                    logger3.info(str);
                });
                int i3 = atomicInteger.get();
                if (i3 != this.previousLastStep) {
                    this.previousLastStep = i3;
                    return;
                }
                RaftServerImpl leader = miniRaftCluster.getLeader();
                logger.info("NO PROGRESS at " + i3 + ", try to restart leader=" + leader);
                if (leader != null) {
                    try {
                        miniRaftCluster.restartServer(leader.getId(), false);
                        logger.info("Restarted leader=" + leader);
                    } catch (IOException e) {
                        logger.error("Failed to restart leader=" + leader);
                    }
                }
            }
        }, 5000L, 10000L);
        list.forEach((v0) -> {
            v0.start();
        });
        int i3 = 0;
        while (list.stream().filter((v0) -> {
            return v0.isRunning();
        }).count() != 0) {
            int sum = list.stream().mapToInt(client4TestWithLoad -> {
                return client4TestWithLoad.step.get();
            }).sum();
            Assert.assertTrue(sum >= atomicInteger.get());
            if (sum - atomicInteger.get() < 50 * i) {
                Thread.sleep(10L);
            } else {
                atomicInteger.set(sum);
                i3++;
                try {
                    RaftServerImpl leader = miniRaftCluster.getLeader();
                    if (leader != null) {
                        RaftTestUtil.changeLeader(miniRaftCluster, leader.getId());
                    }
                } catch (IllegalStateException e) {
                    logger.error("Failed to change leader ", e);
                }
            }
        }
        logger.info("Leader change count=" + i3);
        timer.cancel();
        for (Client4TestWithLoad client4TestWithLoad2 : list) {
            if (client4TestWithLoad2.exceptionInClientThread.get() != null) {
                throw new AssertionError(client4TestWithLoad2.exceptionInClientThread.get());
            }
            RaftTestUtil.assertLogEntries(miniRaftCluster.getServers(), client4TestWithLoad2.messages);
        }
    }

    public static void testRequestTimeout(boolean z, MiniRaftCluster miniRaftCluster, Logger logger) throws Exception {
        logger.info("Running testRequestTimeout");
        RaftTestUtil.waitForLeader(miniRaftCluster);
        long currentTimeMillis = System.currentTimeMillis();
        RaftClient createClient = miniRaftCluster.createClient();
        Throwable th = null;
        try {
            try {
                long callId = RaftClientTestUtil.getCallId(createClient);
                miniRaftCluster.getServerAliveStream().forEach(raftServerImpl -> {
                    RetryCacheTestUtil.getOrCreateEntry(raftServerImpl.getRetryCache(), createClient.getId(), callId);
                });
                if (z) {
                    createClient.sendAsync(new RaftTestUtil.SimpleMessage("abc")).get();
                } else {
                    createClient.send(new RaftTestUtil.SimpleMessage("abc"));
                }
                Assert.assertTrue(TimeDuration.valueOf(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS).compareTo(RaftServerConfigKeys.RetryCache.expiryTime(miniRaftCluster.getProperties())) >= 0);
                if (createClient != null) {
                    if (0 == 0) {
                        createClient.close();
                        return;
                    }
                    try {
                        createClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createClient != null) {
                if (th != null) {
                    try {
                        createClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testDelayRequestIfLeaderStepDown() throws Exception {
        runTestDelayRequestIfLeaderStepDown(false, getCluster(), this.LOG);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void runTestDelayRequestIfLeaderStepDown(boolean z, MiniRaftCluster miniRaftCluster, Logger logger) throws Exception {
        RaftClientReply send;
        boolean z2 = false;
        for (RaftServer raftServer : miniRaftCluster.getServers()) {
            if (z2) {
                miniRaftCluster.restartServer(raftServer.getId(), false);
            } else {
                z2 = true;
                miniRaftCluster.killServer(raftServer.getId());
            }
        }
        RaftServerImpl waitForLeader = RaftTestUtil.waitForLeader(miniRaftCluster);
        logger.info("leader: " + waitForLeader.getId() + ", " + miniRaftCluster.printServers());
        RaftTestUtil.SimpleMessage simpleMessage = RaftTestUtil.SimpleMessage.create(1)[0];
        try {
            try {
                RaftClient createClientWithLeader = miniRaftCluster.createClientWithLeader();
                if (z) {
                    CompletableFuture sendAsync = createClientWithLeader.sendAsync(simpleMessage, RaftProtos.ReplicationLevel.ALL);
                    Thread.sleep(1000L);
                    RaftTestUtil.changeLeader(miniRaftCluster, waitForLeader.getId());
                    send = (RaftClientReply) sendAsync.get();
                } else {
                    new Thread(() -> {
                        try {
                            Thread.sleep(1000L);
                            RaftTestUtil.changeLeader(miniRaftCluster, waitForLeader.getId());
                        } catch (Exception e) {
                            logger.warn("changeLeader", e);
                        }
                    }).start();
                    send = createClientWithLeader.send(simpleMessage, RaftProtos.ReplicationLevel.ALL);
                }
                throw send.getNotReplicatedException();
            } finally {
            }
        } catch (NotReplicatedException e) {
            logger.info("Expected", e);
        }
    }
}
