package org.apache.ignite.raft.jraft.storage;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.LoadSnapshotClosure;
import org.apache.ignite.raft.jraft.closure.SaveSnapshotClosure;
import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.core.DefaultJRaftServiceFactory;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.TimerManager;
import org.apache.ignite.raft.jraft.option.CopyOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.SnapshotExecutorOptions;
import org.apache.ignite.raft.jraft.rpc.GetFileRequestBuilder;
import org.apache.ignite.raft.jraft.rpc.RaftClientService;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotMetaTable;
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage;
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotWriter;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
/* loaded from: input_file:org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.class */
public class SnapshotExecutorTest extends BaseStorageTest {
    private SnapshotExecutorImpl executor;

    @Mock
    private NodeImpl node;

    @Mock
    private FSMCaller fSMCaller;

    @Mock
    private LogManager logManager;
    private Endpoint addr;

    @Mock
    private RpcContext asyncCtx;

    @Mock
    private RaftClientService raftClientService;
    private String uri;
    private final String hostPort = "localhost:8081";
    private final int readerId = 99;
    private CopyOptions copyOpts;
    private LocalSnapshotMetaTable table;
    private LocalSnapshotWriter writer;
    private LocalSnapshotReader reader;
    private RaftOptions raftOptions;

    @Mock
    private LocalSnapshotStorage snapshotStorage;
    private TimerManager timerManager;
    private NodeOptions options;
    private ExecutorService executorService;

    @BeforeEach
    public void setup() throws Exception {
        this.timerManager = new TimerManager(5);
        this.raftOptions = new RaftOptions();
        this.writer = new LocalSnapshotWriter(this.path.toString(), this.snapshotStorage, this.raftOptions);
        this.reader = new LocalSnapshotReader(this.snapshotStorage, (SnapshotThrottle) null, new Endpoint("localhost", 8081), this.raftOptions, this.path.toString());
        Mockito.lenient().when(this.snapshotStorage.open()).thenReturn(this.reader);
        Mockito.lenient().when(this.snapshotStorage.create(true)).thenReturn(this.writer);
        this.table = new LocalSnapshotMetaTable(this.raftOptions);
        this.table.addFile("testFile", this.raftOptions.getRaftMessagesFactory().localFileMeta().checksum("test").build());
        this.table.setMeta(this.raftOptions.getRaftMessagesFactory().snapshotMeta().lastIncludedIndex(1L).lastIncludedTerm(1L).build());
        this.uri = "remote://localhost:8081/99";
        this.copyOpts = new CopyOptions();
        Mockito.when(this.node.getRaftOptions()).thenReturn(new RaftOptions());
        this.options = new NodeOptions();
        this.options.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
        Mockito.when(this.node.getOptions()).thenReturn(this.options);
        Mockito.when(this.node.getRpcClientService()).thenReturn(this.raftClientService);
        Mockito.when(this.node.getTimerManager()).thenReturn(this.timerManager);
        Mockito.when(this.node.getServiceFactory()).thenReturn(new DefaultJRaftServiceFactory());
        this.executor = new SnapshotExecutorImpl();
        SnapshotExecutorOptions snapshotExecutorOptions = new SnapshotExecutorOptions();
        snapshotExecutorOptions.setFsmCaller(this.fSMCaller);
        snapshotExecutorOptions.setInitTerm(0L);
        snapshotExecutorOptions.setNode(this.node);
        snapshotExecutorOptions.setLogManager(this.logManager);
        snapshotExecutorOptions.setUri(this.path.toString());
        this.addr = new Endpoint("localhost", 8081);
        snapshotExecutorOptions.setAddr(this.addr);
        Assertions.assertTrue(this.executor.init(snapshotExecutorOptions));
    }

    @AfterEach
    public void teardown() throws Exception {
        this.executor.shutdown();
        this.timerManager.shutdown();
        this.options.getCommonExecutor().shutdown();
        ExecutorServiceHelper.shutdownAndAwaitTermination(this.executorService);
    }

    @Test
    public void testInstallSnapshot() throws Exception {
        RaftMessagesFactory raftMessagesFactory = this.raftOptions.getRaftMessagesFactory();
        RpcRequests.InstallSnapshotRequest build = raftMessagesFactory.installSnapshotRequest().groupId("test").peerId(this.addr.toString()).serverId("localhost:8080").uri("remote://localhost:8080/99").term(0L).meta(raftMessagesFactory.snapshotMeta().lastIncludedIndex(1L).lastIncludedTerm(2L).build()).build();
        Mockito.when(Boolean.valueOf(this.raftClientService.connect(new Endpoint("localhost", 8080)))).thenReturn(true);
        CompletableFuture completableFuture = new CompletableFuture();
        GetFileRequestBuilder readPartly = raftMessagesFactory.getFileRequest().readerId(99L).filename("__raft_snapshot_meta").count(2147483647L).offset(0L).readPartly(true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RpcResponseClosure.class);
        Mockito.when(this.raftClientService.getFile((Endpoint) ArgumentMatchers.eq(new Endpoint("localhost", 8080)), (RpcRequests.GetFileRequest) ArgumentMatchers.eq(readPartly.build()), ArgumentMatchers.eq(this.copyOpts.getTimeoutMs()), (RpcResponseClosure) forClass.capture())).thenReturn(completableFuture);
        Future runInThread = Utils.runInThread(ForkJoinPool.commonPool(), () -> {
            this.executor.installSnapshot(build, raftMessagesFactory.installSnapshotResponse(), new RpcRequestClosure(this.asyncCtx, raftMessagesFactory));
        });
        Assertions.assertTrue(TestUtils.waitForArgumentCapture(forClass, 5000L));
        RpcResponseClosure rpcResponseClosure = (RpcResponseClosure) forClass.getValue();
        rpcResponseClosure.setResponse(raftMessagesFactory.getFileResponse().readSize(r0.remaining()).eof(true).data(new ByteString(this.table.saveToByteBufferAsRemote())).build());
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(RpcResponseClosure.class);
        readPartly.filename("testFile");
        readPartly.count(this.raftOptions.getMaxByteCountPerRpc());
        Mockito.when(this.raftClientService.getFile((Endpoint) ArgumentMatchers.eq(new Endpoint("localhost", 8080)), (RpcRequests.GetFileRequest) ArgumentMatchers.eq(readPartly.build()), ArgumentMatchers.eq(this.copyOpts.getTimeoutMs()), (RpcResponseClosure) forClass2.capture())).thenReturn(completableFuture);
        rpcResponseClosure.run(Status.OK());
        Assertions.assertTrue(TestUtils.waitForArgumentCapture(forClass2, 5000L));
        RpcResponseClosure rpcResponseClosure2 = (RpcResponseClosure) forClass2.getValue();
        rpcResponseClosure2.setResponse(raftMessagesFactory.getFileResponse().readSize(100L).eof(true).data(new ByteString(new byte[100])).build());
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(LoadSnapshotClosure.class);
        Mockito.when(Boolean.valueOf(this.fSMCaller.onSnapshotLoad((LoadSnapshotClosure) forClass3.capture()))).thenReturn(true);
        rpcResponseClosure2.run(Status.OK());
        Assertions.assertTrue(TestUtils.waitForArgumentCapture(forClass3, 5000L));
        LoadSnapshotClosure loadSnapshotClosure = (LoadSnapshotClosure) forClass3.getValue();
        SnapshotReader start = loadSnapshotClosure.start();
        Assertions.assertNotNull(start);
        Assertions.assertEquals(1, start.listFiles().size());
        Assertions.assertTrue(start.listFiles().contains("testFile"));
        loadSnapshotClosure.run(Status.OK());
        this.executor.join();
        Assertions.assertTrue(runInThread.isDone());
        Assertions.assertEquals(2L, this.executor.getLastSnapshotTerm());
        Assertions.assertEquals(1L, this.executor.getLastSnapshotIndex());
    }

    @Test
    public void testInterruptInstalling() throws Exception {
        RaftMessagesFactory raftMessagesFactory = this.raftOptions.getRaftMessagesFactory();
        RpcRequests.InstallSnapshotRequest build = raftMessagesFactory.installSnapshotRequest().groupId("test").peerId(this.addr.toString()).serverId("localhost:8080").uri("remote://localhost:8080/99").term(0L).meta(raftMessagesFactory.snapshotMeta().lastIncludedIndex(1L).lastIncludedTerm(1L).build()).build();
        Mockito.lenient().when(Boolean.valueOf(this.raftClientService.connect(new Endpoint("localhost", 8080)))).thenReturn(true);
        Mockito.lenient().when(this.raftClientService.getFile((Endpoint) ArgumentMatchers.eq(new Endpoint("localhost", 8080)), (RpcRequests.GetFileRequest) ArgumentMatchers.eq(raftMessagesFactory.getFileRequest().readerId(99L).filename("__raft_snapshot_meta").count(2147483647L).offset(0L).readPartly(true).build()), ArgumentMatchers.eq(this.copyOpts.getTimeoutMs()), (RpcResponseClosure) ArgumentCaptor.forClass(RpcResponseClosure.class).capture())).thenReturn(new CompletableFuture());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        this.executorService = newSingleThreadExecutor;
        Utils.runInThread(newSingleThreadExecutor, () -> {
            this.executor.installSnapshot(build, raftMessagesFactory.installSnapshotResponse(), new RpcRequestClosure(this.asyncCtx, raftMessagesFactory));
        });
        this.executor.interruptDownloadingSnapshots(1L);
        this.executor.join();
        Assertions.assertEquals(0L, this.executor.getLastSnapshotTerm());
        Assertions.assertEquals(0L, this.executor.getLastSnapshotIndex());
    }

    @Test
    public void testDoSnapshot() throws Exception {
        Mockito.when(Long.valueOf(this.fSMCaller.getLastAppliedIndex())).thenReturn(1L);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(SaveSnapshotClosure.class);
        Mockito.when(Boolean.valueOf(this.fSMCaller.onSnapshotSave((SaveSnapshotClosure) forClass.capture()))).thenReturn(true);
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        this.executor.doSnapshot(synchronizedClosure);
        SaveSnapshotClosure saveSnapshotClosure = (SaveSnapshotClosure) forClass.getValue();
        Assertions.assertNotNull(saveSnapshotClosure);
        saveSnapshotClosure.start(this.raftOptions.getRaftMessagesFactory().snapshotMeta().lastIncludedIndex(2L).lastIncludedTerm(1L).build());
        saveSnapshotClosure.run(Status.OK());
        synchronizedClosure.await();
        this.executor.join();
        Assertions.assertTrue(synchronizedClosure.getStatus().isOk());
        Assertions.assertEquals(1L, this.executor.getLastSnapshotTerm());
        Assertions.assertEquals(2L, this.executor.getLastSnapshotIndex());
    }

    @Test
    public void testNotDoSnapshotWithIntervalDist() throws Exception {
        NodeOptions nodeOptions = new NodeOptions();
        nodeOptions.setSnapshotLogIndexMargin(10);
        ExecutorService createExecutor = JRaftUtils.createExecutor("test-executor", Utils.cpus());
        this.executorService = createExecutor;
        nodeOptions.setCommonExecutor(createExecutor);
        Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
        Mockito.when(Long.valueOf(this.fSMCaller.getLastAppliedIndex())).thenReturn(1L);
        this.executor.doSnapshot((Closure) null);
        this.executor.join();
        Assertions.assertEquals(0L, this.executor.getLastSnapshotTerm());
        Assertions.assertEquals(0L, this.executor.getLastSnapshotIndex());
    }

    @Test
    public void testDoSnapshotWithIntervalDist() throws Exception {
        NodeOptions nodeOptions = new NodeOptions();
        nodeOptions.setSnapshotLogIndexMargin(5);
        ExecutorService createExecutor = JRaftUtils.createExecutor("test-executor", Utils.cpus());
        this.executorService = createExecutor;
        nodeOptions.setCommonExecutor(createExecutor);
        Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
        Mockito.when(Long.valueOf(this.fSMCaller.getLastAppliedIndex())).thenReturn(6L);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(SaveSnapshotClosure.class);
        Mockito.when(Boolean.valueOf(this.fSMCaller.onSnapshotSave((SaveSnapshotClosure) forClass.capture()))).thenReturn(true);
        SynchronizedClosure synchronizedClosure = new SynchronizedClosure();
        this.executor.doSnapshot(synchronizedClosure);
        SaveSnapshotClosure saveSnapshotClosure = (SaveSnapshotClosure) forClass.getValue();
        Assertions.assertNotNull(saveSnapshotClosure);
        saveSnapshotClosure.start(this.raftOptions.getRaftMessagesFactory().snapshotMeta().lastIncludedIndex(6L).lastIncludedTerm(1L).build());
        saveSnapshotClosure.run(Status.OK());
        synchronizedClosure.await();
        this.executor.join();
        Assertions.assertEquals(1L, this.executor.getLastSnapshotTerm());
        Assertions.assertEquals(6L, this.executor.getLastSnapshotIndex());
    }
}
