package io.camunda.zeebe.broker.system.partitions;

import io.atomix.raft.storage.log.entry.ApplicationEntry;
import io.camunda.zeebe.broker.system.partitions.impl.AsyncSnapshotDirector;
import io.camunda.zeebe.broker.system.partitions.impl.StateControllerImpl;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStoreFactory;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ObjectAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/AsyncSnapshottingTest.class */
public final class AsyncSnapshottingTest {
    private final TemporaryFolder tempFolderRule = new TemporaryFolder();
    private final AutoCloseableRule autoCloseableRule = new AutoCloseableRule();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();

    @Rule
    public final RuleChain chain = RuleChain.outerRule(this.autoCloseableRule).around(this.tempFolderRule).around(this.actorSchedulerRule);
    private StateControllerImpl snapshotController;
    private AsyncSnapshotDirector asyncSnapshotDirector;
    private StreamProcessor mockStreamProcessor;
    private ConstructableSnapshotStore persistedSnapshotStore;

    @Before
    public void setup() throws IOException {
        Path path = this.tempFolderRule.getRoot().toPath();
        FileBasedSnapshotStoreFactory fileBasedSnapshotStoreFactory = new FileBasedSnapshotStoreFactory(this.actorSchedulerRule.get(), 1);
        fileBasedSnapshotStoreFactory.createReceivableSnapshotStore(path, 1);
        this.persistedSnapshotStore = fileBasedSnapshotStoreFactory.getConstructableSnapshotStore(1);
        this.snapshotController = new StateControllerImpl(DefaultZeebeDbFactory.defaultFactory(), this.persistedSnapshotStore, path.resolve("runtime"), j -> {
            return Optional.of(new TestIndexedRaftLogEntry(j + 100, 1L, new ApplicationEntry(1L, 10L, new UnsafeBuffer())));
        }, zeebeDb -> {
            return Long.MAX_VALUE;
        }, new TestConcurrencyControl());
        this.snapshotController.recover().join();
        this.autoCloseableRule.manage(this.snapshotController);
        this.snapshotController = (StateControllerImpl) Mockito.spy(this.snapshotController);
        createStreamProcessorControllerMock();
    }

    private void setCommitPosition(long j) {
        this.asyncSnapshotDirector.newPositionCommitted(j);
    }

    private void createStreamProcessorControllerMock() {
        this.mockStreamProcessor = (StreamProcessor) Mockito.mock(StreamProcessor.class);
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(0L)).thenReturn(CompletableActorFuture.completed(25L)).thenReturn(CompletableActorFuture.completed(32L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(99L)).thenReturn(CompletableActorFuture.completed(100L));
    }

    private void createAsyncSnapshotDirectorOfProcessingMode() {
        this.asyncSnapshotDirector = AsyncSnapshotDirector.ofProcessingMode(0, 1, this.mockStreamProcessor, this.snapshotController, Duration.ofMinutes(1L));
        this.actorSchedulerRule.submitActor(this.asyncSnapshotDirector).join();
    }

    private void createAsyncSnapshotDirectorOfReplayMode() {
        this.asyncSnapshotDirector = AsyncSnapshotDirector.ofReplayMode(0, 1, this.mockStreamProcessor, this.snapshotController, Duration.ofMinutes(1L));
        this.actorSchedulerRule.submitActor(this.asyncSnapshotDirector).join();
    }

    @Test
    public void shouldValidSnapshotWhenCommitPositionGreaterEquals() {
        createAsyncSnapshotDirectorOfProcessingMode();
        CompletableActorFuture forceSnapshot = this.asyncSnapshotDirector.forceSnapshot();
        setCommitPosition(100L);
        Assertions.assertThat((PersistedSnapshot) forceSnapshot.join()).isNotNull();
        Assertions.assertThat(this.persistedSnapshotStore.getLatestSnapshot()).hasValue((PersistedSnapshot) forceSnapshot.join());
    }

    @Test
    public void shouldTakeSnapshotsOneByOne() {
        createAsyncSnapshotDirectorOfProcessingMode();
        CompletableActorFuture forceSnapshot = this.asyncSnapshotDirector.forceSnapshot();
        setCommitPosition(99L);
        Assertions.assertThat((PersistedSnapshot) forceSnapshot.join()).isNotNull();
        long index = ((PersistedSnapshot) forceSnapshot.join()).getIndex();
        CompletableActorFuture forceSnapshot2 = this.asyncSnapshotDirector.forceSnapshot();
        setCommitPosition(100L);
        ((ObjectAssert) ((ObjectAssert) Assertions.assertThat((PersistedSnapshot) forceSnapshot2.join()).describedAs("Second snapshot is taken", new Object[0])).isNotNull().describedAs("Second snapshot has a higher index", new Object[0])).extracting((v0) -> {
            return v0.getIndex();
        }, Assertions.as(InstanceOfAssertFactories.LONG)).isGreaterThan(index);
        Assertions.assertThat(this.persistedSnapshotStore.getLatestSnapshot()).hasValue((PersistedSnapshot) forceSnapshot2.join());
    }

    @Test
    public void shouldSucceedToTakeSnapshotOnNextIntervalWhenLastWritePosRetrievingFailed() {
        createAsyncSnapshotDirectorOfProcessingMode();
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        RuntimeException runtimeException = new RuntimeException("getLastWrittenPositionAsync fails");
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completedExceptionally(runtimeException));
        setCommitPosition(100L);
        Assertions.assertThatThrownBy(() -> {
            this.asyncSnapshotDirector.forceSnapshot().join();
        }).hasCause(runtimeException);
        ((StreamProcessor) Mockito.verify(this.mockStreamProcessor, Mockito.timeout(10000L).times(1))).getLastWrittenPositionAsync();
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(26L));
        Assertions.assertThat((PersistedSnapshot) this.asyncSnapshotDirector.forceSnapshot().join()).isNotNull();
        Assertions.assertThat(this.persistedSnapshotStore.getLatestSnapshot()).isPresent();
        ((StreamProcessor) Mockito.verify(this.mockStreamProcessor, Mockito.timeout(10000L).times(2))).getLastWrittenPositionAsync();
    }

    @Test
    public void shouldSucceedToTakeSnapshotOnNextIntervalWhenLastProcessedPosRetrievingFailed() {
        createAsyncSnapshotDirectorOfProcessingMode();
        RuntimeException runtimeException = new RuntimeException("getLastProcessedPositionAsync fails");
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completedExceptionally(runtimeException));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(26L));
        Assertions.assertThatThrownBy(() -> {
            this.asyncSnapshotDirector.forceSnapshot().join();
        }).hasCause(runtimeException);
        ((StreamProcessor) Mockito.verify(this.mockStreamProcessor, Mockito.timeout(5000L).times(1))).getLastProcessedPositionAsync();
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        CompletableActorFuture forceSnapshot = this.asyncSnapshotDirector.forceSnapshot();
        setCommitPosition(100L);
        Assertions.assertThat((PersistedSnapshot) forceSnapshot.join()).isNotNull();
        Assertions.assertThat(this.persistedSnapshotStore.getLatestSnapshot()).hasValue((PersistedSnapshot) forceSnapshot.join());
    }

    @Test
    public void shouldPersistSnapshotWithoutWaitingForCommitWhenInReplayMode() {
        createAsyncSnapshotDirectorOfReplayMode();
        CompletableActorFuture forceSnapshot = this.asyncSnapshotDirector.forceSnapshot();
        Assertions.assertThat((PersistedSnapshot) forceSnapshot.join()).isNotNull();
        Assertions.assertThat(this.persistedSnapshotStore.getLatestSnapshot()).hasValue((PersistedSnapshot) forceSnapshot.join());
    }
}
