package io.camunda.zeebe.broker.system.partitions.impl.steps;

import io.atomix.raft.RaftServer;
import io.camunda.zeebe.broker.system.partitions.TestPartitionTransitionContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorBuilder;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.util.health.HealthMonitor;
import io.camunda.zeebe.util.sched.TestConcurrencyControl;
import io.camunda.zeebe.util.sched.future.TestActorFuture;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/steps/StreamProcessorTransitionStepTest.class */
class StreamProcessorTransitionStepTest {
    private static final TestConcurrencyControl TEST_CONCURRENCY_CONTROL = new TestConcurrencyControl();
    TestPartitionTransitionContext transitionContext = new TestPartitionTransitionContext();
    final StreamProcessorBuilder streamProcessorBuilder = (StreamProcessorBuilder) Mockito.spy(StreamProcessorBuilder.class);
    final StreamProcessor streamProcessor = (StreamProcessor) Mockito.mock(StreamProcessor.class);
    final StreamProcessor streamProcessorFromPrevRole = (StreamProcessor) Mockito.mock(StreamProcessor.class);
    private StreamProcessorTransitionStep step;

    StreamProcessorTransitionStepTest() {
    }

    @BeforeEach
    void setup() {
        this.transitionContext.setLogStream((LogStream) Mockito.mock(LogStream.class));
        this.transitionContext.setComponentHealthMonitor((HealthMonitor) Mockito.mock(HealthMonitor.class));
        this.transitionContext.setConcurrencyControl(TEST_CONCURRENCY_CONTROL);
        ((StreamProcessorBuilder) Mockito.doReturn(this.streamProcessor).when(this.streamProcessorBuilder)).build();
        Mockito.when(this.streamProcessor.openAsync(ArgumentMatchers.anyBoolean())).thenReturn(TestActorFuture.completedFuture((Object) null));
        Mockito.when(this.streamProcessor.closeAsync()).thenReturn(TestActorFuture.completedFuture((Object) null));
        Mockito.when(this.streamProcessorFromPrevRole.closeAsync()).thenReturn(TestActorFuture.completedFuture((Object) null));
        this.step = new StreamProcessorTransitionStep((partitionTransitionContext, role) -> {
            return this.streamProcessor;
        });
    }

    @MethodSource({"provideTransitionsThatShouldCloseExistingStreamProcessor"})
    @ParameterizedTest
    void shouldCloseExistingStreamProcessor(RaftServer.Role role, RaftServer.Role role2) {
        this.transitionContext.setCurrentRole(role);
        if (role != null && role != RaftServer.Role.INACTIVE) {
            this.transitionContext.setStreamProcessor(this.streamProcessorFromPrevRole);
        }
        this.step.prepareTransition(this.transitionContext, 1L, role2).join();
        Assertions.assertThat(this.transitionContext.getStreamProcessor()).isNull();
        ((StreamProcessor) Mockito.verify(this.streamProcessorFromPrevRole)).closeAsync();
    }

    @MethodSource({"provideTransitionsThatShouldReInstallStreamProcessor"})
    @ParameterizedTest
    void shouldReInstallStreamProcessor(RaftServer.Role role, RaftServer.Role role2) {
        this.transitionContext.setCurrentRole(role);
        if (role != null && role != RaftServer.Role.INACTIVE) {
            this.transitionContext.setStreamProcessor(this.streamProcessorFromPrevRole);
        }
        transitionTo(role2);
        Assertions.assertThat(this.transitionContext.getStreamProcessor()).isNotNull().isNotEqualTo(this.streamProcessorFromPrevRole);
        ((StreamProcessor) Mockito.verify(this.streamProcessor)).openAsync(ArgumentMatchers.anyBoolean());
    }

    @MethodSource({"provideTransitionsThatShouldDoNothing"})
    @ParameterizedTest
    void shouldNotCloseExistingStreamProcessor(RaftServer.Role role, RaftServer.Role role2) {
        this.transitionContext.setCurrentRole(role);
        if (role != null && role != RaftServer.Role.INACTIVE) {
            this.transitionContext.setStreamProcessor(this.streamProcessorFromPrevRole);
        }
        StreamProcessor streamProcessor = this.transitionContext.getStreamProcessor();
        this.step.prepareTransition(this.transitionContext, 1L, role2).join();
        Assertions.assertThat(this.transitionContext.getStreamProcessor()).isEqualTo(streamProcessor);
        ((StreamProcessor) Mockito.verify(this.streamProcessorFromPrevRole, Mockito.never())).closeAsync();
    }

    @MethodSource({"provideTransitionsThatShouldDoNothing"})
    @ParameterizedTest
    void shouldNotReInstallStreamProcessor(RaftServer.Role role, RaftServer.Role role2) {
        this.transitionContext.setCurrentRole(role);
        if (role != null && role != RaftServer.Role.INACTIVE) {
            this.transitionContext.setStreamProcessor(this.streamProcessorFromPrevRole);
        }
        StreamProcessor streamProcessor = this.transitionContext.getStreamProcessor();
        transitionTo(role2);
        Assertions.assertThat(this.transitionContext.getStreamProcessor()).isEqualTo(streamProcessor);
    }

    @EnumSource(value = RaftServer.Role.class, names = {"FOLLOWER", "LEADER", "CANDIDATE"})
    @ParameterizedTest
    void shouldCloseStreamProcessorWhenTransitioningToInactive(RaftServer.Role role) {
        this.transitionContext.setCurrentRole(role);
        this.transitionContext.setStreamProcessor(this.streamProcessorFromPrevRole);
        transitionTo(RaftServer.Role.INACTIVE);
        Assertions.assertThat(this.transitionContext.getStreamProcessor()).isNull();
        ((StreamProcessor) Mockito.verify(this.streamProcessorFromPrevRole)).closeAsync();
    }

    private static Stream<Arguments> provideTransitionsThatShouldCloseExistingStreamProcessor() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{RaftServer.Role.FOLLOWER, RaftServer.Role.LEADER}), Arguments.of(new Object[]{RaftServer.Role.CANDIDATE, RaftServer.Role.LEADER}), Arguments.of(new Object[]{RaftServer.Role.LEADER, RaftServer.Role.FOLLOWER}), Arguments.of(new Object[]{RaftServer.Role.LEADER, RaftServer.Role.INACTIVE}), Arguments.of(new Object[]{RaftServer.Role.FOLLOWER, RaftServer.Role.INACTIVE}), Arguments.of(new Object[]{RaftServer.Role.CANDIDATE, RaftServer.Role.INACTIVE})});
    }

    private static Stream<Arguments> provideTransitionsThatShouldReInstallStreamProcessor() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{null, RaftServer.Role.FOLLOWER}), Arguments.of(new Object[]{null, RaftServer.Role.LEADER}), Arguments.of(new Object[]{null, RaftServer.Role.CANDIDATE}), Arguments.of(new Object[]{RaftServer.Role.FOLLOWER, RaftServer.Role.LEADER}), Arguments.of(new Object[]{RaftServer.Role.CANDIDATE, RaftServer.Role.LEADER}), Arguments.of(new Object[]{RaftServer.Role.LEADER, RaftServer.Role.FOLLOWER}), Arguments.of(new Object[]{RaftServer.Role.LEADER, RaftServer.Role.CANDIDATE}), Arguments.of(new Object[]{RaftServer.Role.INACTIVE, RaftServer.Role.FOLLOWER}), Arguments.of(new Object[]{RaftServer.Role.INACTIVE, RaftServer.Role.LEADER}), Arguments.of(new Object[]{RaftServer.Role.INACTIVE, RaftServer.Role.CANDIDATE})});
    }

    private static Stream<Arguments> provideTransitionsThatShouldDoNothing() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{RaftServer.Role.CANDIDATE, RaftServer.Role.FOLLOWER}), Arguments.of(new Object[]{RaftServer.Role.FOLLOWER, RaftServer.Role.CANDIDATE}), Arguments.of(new Object[]{null, RaftServer.Role.INACTIVE})});
    }

    private void transitionTo(RaftServer.Role role) {
        this.step.prepareTransition(this.transitionContext, 1L, role).join();
        this.step.transitionTo(this.transitionContext, 1L, role).join();
        this.transitionContext.setCurrentRole(role);
    }
}
