package org.apache.ratis;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Level;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* JADX WARN: Classes with same name are omitted:
  input_file:ratis-server-0.2.0-tests.jar:org/apache/ratis/RaftAsyncTests.class
 */
/* loaded from: input_file:test-classes/org/apache/ratis/RaftAsyncTests.class */
public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest implements MiniRaftCluster.Factory.Get<CLUSTER> {
    public static final int NUM_SERVERS = 3;

    @Before
    public void setup() {
        getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
    }

    @Test
    public void testAsyncConfiguration() throws IOException {
        this.LOG.info("Running testAsyncConfiguration");
        RaftProperties raftProperties = new RaftProperties();
        RaftClient.Builder properties = RaftClient.newBuilder().setRaftGroup(RaftGroup.emptyGroup()).setProperties(raftProperties);
        RaftClient build = properties.build();
        Throwable th = null;
        try {
            RaftClientTestUtil.assertScheduler(build, 3);
            RaftClientTestUtil.assertAsyncRequestSemaphore(build, 100, 0);
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    build.close();
                }
            }
            RaftClientConfigKeys.Async.setMaxOutstandingRequests(raftProperties, 5);
            RaftClientConfigKeys.Async.setSchedulerThreads(raftProperties, 200);
            RaftClient build2 = properties.build();
            Throwable th3 = null;
            try {
                try {
                    RaftClientTestUtil.assertScheduler(build2, 200);
                    RaftClientTestUtil.assertAsyncRequestSemaphore(build2, 5, 0);
                    if (build2 != null) {
                        if (0 == 0) {
                            build2.close();
                            return;
                        }
                        try {
                            build2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (build2 != null) {
                    if (th3 != null) {
                        try {
                            build2.close();
                        } catch (Throwable th7) {
                            th3.addSuppressed(th7);
                        }
                    } else {
                        build2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    build.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testAsyncRequestSemaphore() throws Exception {
        this.LOG.info("Running testAsyncRequestSemaphore");
        CLUSTER newCluster = newCluster(3);
        Assert.assertNull(newCluster.getLeader());
        newCluster.start();
        RaftTestUtil.waitForLeader(newCluster);
        int maxOutstandingRequests = RaftClientConfigKeys.Async.maxOutstandingRequests(getProperties());
        CompletableFuture[] completableFutureArr = new CompletableFuture[maxOutstandingRequests + 1];
        RaftTestUtil.SimpleMessage[] create = RaftTestUtil.SimpleMessage.create(maxOutstandingRequests);
        RaftClient createClient = newCluster.createClient();
        Iterator<RaftServerProxy> it = newCluster.getServers().iterator();
        while (it.hasNext()) {
            ((SimpleStateMachine4Testing) it.next().getImpl().getStateMachine()).setBlockTransaction(true);
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        for (int i = 0; i < maxOutstandingRequests; i++) {
            atomicInteger.getAndIncrement();
            completableFutureArr[i] = createClient.sendAsync(create[i]);
            atomicInteger.decrementAndGet();
        }
        Assert.assertTrue(atomicInteger.get() == 0);
        completableFutureArr[maxOutstandingRequests] = CompletableFuture.supplyAsync(() -> {
            atomicInteger.incrementAndGet();
            createClient.sendAsync(new RaftTestUtil.SimpleMessage("n1"));
            atomicInteger.decrementAndGet();
            return null;
        }, Executors.newFixedThreadPool(1));
        while (atomicInteger.get() != 1) {
            Thread.sleep(1000L);
        }
        Assert.assertTrue(atomicInteger.get() == 1);
        RaftClientTestUtil.assertAsyncRequestSemaphore(createClient, 0, 1);
        Iterator<RaftServerProxy> it2 = newCluster.getServers().iterator();
        while (it2.hasNext()) {
            ((SimpleStateMachine4Testing) it2.next().getImpl().getStateMachine()).setBlockTransaction(false);
        }
        for (int i2 = 0; i2 <= maxOutstandingRequests; i2++) {
            completableFutureArr[i2].join();
        }
        Assert.assertTrue(atomicInteger.get() == 0);
        newCluster.shutdown();
    }

    void runTestBasicAppendEntriesAsync(RaftProtos.ReplicationLevel replicationLevel, boolean z) throws Exception {
        CLUSTER newCluster = newCluster(z ? 5 : 3);
        try {
            newCluster.start();
            RaftTestUtil.waitForLeader(newCluster);
            RaftBasicTests.runTestBasicAppendEntries(true, replicationLevel, z, RaftServerConfigKeys.STAGING_CATCHUP_GAP_DEFAULT, newCluster, this.LOG);
            newCluster.shutdown();
        } catch (Throwable th) {
            newCluster.shutdown();
            throw th;
        }
    }

    @Test
    public void testBasicAppendEntriesAsync() throws Exception {
        runTestBasicAppendEntriesAsync(RaftProtos.ReplicationLevel.MAJORITY, false);
    }

    @Test
    public void testBasicAppendEntriesAsyncKillLeader() throws Exception {
        runTestBasicAppendEntriesAsync(RaftProtos.ReplicationLevel.MAJORITY, true);
    }

    @Test
    public void testBasicAppendEntriesAsyncWithAllReplication() throws Exception {
        runTestBasicAppendEntriesAsync(RaftProtos.ReplicationLevel.ALL, false);
    }

    @Test
    public void testWithLoadAsync() throws Exception {
        this.LOG.info("Running testWithLoadAsync");
        CLUSTER newCluster = newCluster(3);
        newCluster.start();
        RaftTestUtil.waitForLeader(newCluster);
        RaftBasicTests.testWithLoad(10, 500, true, newCluster, this.LOG);
        newCluster.shutdown();
    }

    @Test
    public void testStaleReadAsync() throws Exception {
        CLUSTER newCluster = newCluster(3);
        try {
            RaftClient createClient = newCluster.createClient();
            Throwable th = null;
            try {
                try {
                    newCluster.start();
                    RaftTestUtil.waitForLeader(newCluster);
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < 10; i++) {
                        String str = "m" + i;
                        this.LOG.info("sendAsync " + str);
                        arrayList.add(createClient.sendAsync(new RaftTestUtil.SimpleMessage(str)));
                    }
                    Assert.assertEquals(10L, arrayList.size());
                    RaftClientReply raftClientReply = null;
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        raftClientReply = (RaftClientReply) ((CompletableFuture) it.next()).join();
                        Assert.assertTrue(raftClientReply.isSuccess());
                    }
                    arrayList.clear();
                    RaftPeerId serverId = raftClientReply.getServerId();
                    this.LOG.info("leader = " + serverId);
                    Collection commitInfos = raftClientReply.getCommitInfos();
                    this.LOG.info("commitInfos = " + commitInfos);
                    RaftProtos.CommitInfoProto commitInfoProto = (RaftProtos.CommitInfoProto) commitInfos.stream().filter(commitInfoProto2 -> {
                        return !RaftPeerId.valueOf(commitInfoProto2.getServer().getId()).equals(serverId);
                    }).max(Comparator.comparing((v0) -> {
                        return v0.getCommitIndex();
                    })).get();
                    RaftPeerId valueOf = RaftPeerId.valueOf(commitInfoProto.getServer().getId());
                    this.LOG.info("max follower = " + valueOf);
                    testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index", () -> {
                        return createClient.sendStaleReadAsync(new RaftTestUtil.SimpleMessage("11"), commitInfoProto.getCommitIndex(), valueOf);
                    }, StateMachineException.class, new Class[]{IndexOutOfBoundsException.class});
                    for (int i2 = 1; i2 < commitInfoProto.getCommitIndex(); i2++) {
                        int i3 = i2;
                        this.LOG.info("sendStaleReadAsync, query=" + i3);
                        RaftTestUtil.SimpleMessage simpleMessage = new RaftTestUtil.SimpleMessage("" + i3);
                        arrayList.add(createClient.sendReadOnlyAsync(simpleMessage).thenApply(raftClientReply2 -> {
                            return getMessageContent(raftClientReply2);
                        }).thenCombine((CompletionStage) createClient.sendStaleReadAsync(simpleMessage, commitInfoProto.getCommitIndex(), valueOf).thenApply(raftClientReply3 -> {
                            return getMessageContent(raftClientReply3);
                        }), (byteString, byteString2) -> {
                            try {
                                this.LOG.info("query " + i3 + " returns " + RaftProtos.LogEntryProto.parseFrom(byteString).getSmLogEntry().getData().toStringUtf8());
                                Assert.assertEquals("log entry mismatch for query=" + i3, byteString, byteString2);
                                return null;
                            } catch (InvalidProtocolBufferException e) {
                                throw new CompletionException((Throwable) e);
                            }
                        }));
                    }
                    JavaUtils.allOf(arrayList).join();
                    if (createClient != null) {
                        if (0 != 0) {
                            try {
                                createClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createClient.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            newCluster.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ByteString getMessageContent(RaftClientReply raftClientReply) {
        Assert.assertTrue(raftClientReply.isSuccess());
        return raftClientReply.getMessage().getContent();
    }

    @Test
    public void testRequestTimeout() throws Exception {
        TimeDuration expiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
        RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5L, TimeUnit.SECONDS));
        CLUSTER newCluster = newCluster(3);
        newCluster.start();
        RaftBasicTests.testRequestTimeout(true, newCluster, this.LOG);
        newCluster.shutdown();
        RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), expiryTime);
    }

    @Test
    public void testAppendEntriesTimeout() throws IOException, InterruptedException, ExecutionException {
        this.LOG.info("Running testAppendEntriesTimeout");
        TimeDuration expiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
        RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20L, TimeUnit.SECONDS));
        CLUSTER newCluster = newCluster(3);
        newCluster.start();
        RaftTestUtil.waitForLeader(newCluster);
        long currentTimeMillis = System.currentTimeMillis();
        RaftClient createClient = newCluster.createClient();
        Throwable th = null;
        try {
            try {
                newCluster.getServerAliveStream().forEach(raftServerImpl -> {
                    try {
                        if (!raftServerImpl.isLeader()) {
                            ((SimpleStateMachine4Testing) raftServerImpl.getStateMachine()).setBlockAppend(true);
                        }
                    } catch (InterruptedException e) {
                        this.LOG.error("Interrupted while blocking append", e);
                    }
                });
                CompletableFuture sendAsync = createClient.sendAsync(new RaftTestUtil.SimpleMessage("abc"));
                Thread.sleep(5000L);
                Assert.assertTrue(!sendAsync.isDone());
                newCluster.getServerAliveStream().forEach(raftServerImpl2 -> {
                    try {
                        ((SimpleStateMachine4Testing) raftServerImpl2.getStateMachine()).setBlockAppend(false);
                    } catch (InterruptedException e) {
                        this.LOG.error("Interrupted while unblocking append", e);
                    }
                });
                sendAsync.get();
                Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis > 5000);
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createClient.close();
                    }
                }
                newCluster.shutdown();
                RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), expiryTime);
            } finally {
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (th != null) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAsyncDelayRequestIfLeaderStepDown() throws Exception {
        CLUSTER newCluster = newCluster(5);
        newCluster.start();
        RaftBasicTests.runTestDelayRequestIfLeaderStepDown(true, newCluster, this.LOG);
    }

    static {
        LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
        LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
    }
}
