package org.apache.ignite.raft.jraft.rpc;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.network.annotations.Transferable;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.class */
public abstract class AbstractRpcTest {
    protected Endpoint endpoint;
    private RpcServer<?> server;
    private final List<RpcClient> clients = new ArrayList();
    private final TestRaftMessagesFactory msgFactory = new TestRaftMessagesFactory();

    @Transferable(value = 0, autoSerializable = false)
    /* loaded from: input_file:org/apache/ignite/raft/jraft/rpc/AbstractRpcTest$Request1.class */
    public interface Request1 extends Message {
        int val();
    }

    /* loaded from: input_file:org/apache/ignite/raft/jraft/rpc/AbstractRpcTest$Request1RpcProcessor.class */
    private class Request1RpcProcessor implements RpcProcessor<Request1> {
        private Request1RpcProcessor() {
        }

        public void handleRequest(RpcContext rpcContext, Request1 request1) {
            if (request1.val() == 10000) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
            rpcContext.sendResponse(AbstractRpcTest.this.msgFactory.response1().val(request1.val() + 1).build());
        }

        public String interest() {
            return Request1.class.getName();
        }
    }

    @Transferable(value = 1, autoSerializable = false)
    /* loaded from: input_file:org/apache/ignite/raft/jraft/rpc/AbstractRpcTest$Request2.class */
    public interface Request2 extends Message {
        int val();
    }

    /* loaded from: input_file:org/apache/ignite/raft/jraft/rpc/AbstractRpcTest$Request2RpcProcessor.class */
    private class Request2RpcProcessor implements RpcProcessor<Request2> {
        private Request2RpcProcessor() {
        }

        public void handleRequest(RpcContext rpcContext, Request2 request2) {
            rpcContext.sendResponse(AbstractRpcTest.this.msgFactory.response2().val(request2.val() + 1).build());
        }

        public String interest() {
            return Request2.class.getName();
        }
    }

    @Transferable(value = 2, autoSerializable = false)
    /* loaded from: input_file:org/apache/ignite/raft/jraft/rpc/AbstractRpcTest$Response1.class */
    public interface Response1 extends Message {
        int val();
    }

    @Transferable(value = 3, autoSerializable = false)
    /* loaded from: input_file:org/apache/ignite/raft/jraft/rpc/AbstractRpcTest$Response2.class */
    public interface Response2 extends Message {
        int val();
    }

    @BeforeEach
    public void setup() {
        this.endpoint = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
        this.server = createServer(this.endpoint);
        this.server.registerProcessor(new Request1RpcProcessor());
        this.server.registerProcessor(new Request2RpcProcessor());
        this.server.init((Object) null);
    }

    @AfterEach
    public void tearDown() {
        this.clients.forEach((v0) -> {
            v0.shutdown();
        });
        this.server.shutdown();
    }

    public abstract RpcServer<?> createServer(Endpoint endpoint);

    private RpcClient createClient() {
        RpcClient createClient0 = createClient0();
        createClient0.init((Object) null);
        this.clients.add(createClient0);
        return createClient0;
    }

    public abstract RpcClient createClient0();

    @Test
    public void testConnection() {
        Assertions.assertTrue(createClient().checkConnection(this.endpoint));
    }

    @Test
    public void testAsyncProcessing() throws Exception {
        RpcClient createClient = createClient();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        createClient.invokeAsync(this.endpoint, this.msgFactory.request1().build(), new InvokeContext(), (obj, th) -> {
            atomicReference.set((Response1) obj);
            countDownLatch.countDown();
        }, 5000L);
        countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertNotNull(atomicReference);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicReference atomicReference2 = new AtomicReference();
        createClient.invokeAsync(this.endpoint, this.msgFactory.request2().build(), new InvokeContext(), (obj2, th2) -> {
            atomicReference2.set((Response2) obj2);
            countDownLatch2.countDown();
        }, 5000L);
        countDownLatch2.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertNotNull(atomicReference2);
    }

    @Test
    public void testDisconnect() {
        RpcClient createClient = createClient();
        RpcClient createClient2 = createClient();
        Assertions.assertTrue(createClient.checkConnection(this.endpoint));
        Assertions.assertTrue(createClient2.checkConnection(this.endpoint));
        this.server.shutdown();
        Assertions.assertTrue(waitForTopology(createClient, 2, 5000L));
        Assertions.assertTrue(waitForTopology(createClient2, 2, 5000L));
        Assertions.assertFalse(createClient.checkConnection(this.endpoint));
        Assertions.assertFalse(createClient2.checkConnection(this.endpoint));
    }

    @Test
    public void testRecordedAsync() throws Exception {
        RpcClientEx createClient = createClient();
        createClient.recordMessages((obj, str) -> {
            return true;
        });
        Assertions.assertTrue(createClient.checkConnection(this.endpoint));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        createClient.invokeAsync(this.endpoint, this.msgFactory.request1().build(), (InvokeContext) null, (obj2, th) -> {
            countDownLatch.countDown();
        }, 500L);
        createClient.invokeAsync(this.endpoint, this.msgFactory.request2().build(), (InvokeContext) null, (obj3, th2) -> {
            countDownLatch.countDown();
        }, 500L);
        countDownLatch.await();
        Assertions.assertEquals(4, createClient.recordedMessages().size());
    }

    @Test
    public void testRecordedAsyncTimeout() {
        RpcClientEx createClient = createClient();
        createClient.recordMessages((obj, str) -> {
            return true;
        });
        Assertions.assertTrue(createClient.checkConnection(this.endpoint));
        try {
            Request1 build = this.msgFactory.request1().val(10000).build();
            CompletableFuture completableFuture = new CompletableFuture();
            createClient.invokeAsync(this.endpoint, build, (InvokeContext) null, (obj2, th) -> {
                if (th == null) {
                    completableFuture.complete(obj2);
                } else {
                    completableFuture.completeExceptionally(th);
                }
            }, 500L);
            completableFuture.get();
            Assertions.fail();
        } catch (Exception e) {
        }
        Queue recordedMessages = createClient.recordedMessages();
        Assertions.assertEquals(1, recordedMessages.size());
        Assertions.assertTrue(((Object[]) recordedMessages.poll())[0] instanceof Request1);
    }

    @Test
    public void testBlockedAsync() throws Exception {
        RpcClientEx createClient = createClient();
        createClient.blockMessages((obj, str) -> {
            return obj instanceof Request1;
        });
        Assertions.assertTrue(createClient.checkConnection(this.endpoint));
        CompletableFuture completableFuture = new CompletableFuture();
        createClient.invokeAsync(this.endpoint, this.msgFactory.request1().build(), (InvokeContext) null, (obj2, th) -> {
            completableFuture.complete(obj2);
        }, 30000L);
        Thread.sleep(500L);
        Assertions.assertEquals(1, createClient.blockedMessages().size());
        Assertions.assertFalse(completableFuture.isDone());
        createClient.stopBlock();
        completableFuture.get(5000L, TimeUnit.MILLISECONDS);
    }

    protected abstract boolean waitForTopology(RpcClient rpcClient, int i, long j);
}
