package io.camunda.zeebe.broker.bootstrap;

import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.clustering.ClusterServicesImpl;
import io.camunda.zeebe.broker.system.management.LeaderManagementRequestHandler;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.TestConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.time.Duration;
import java.util.Objects;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/broker/bootstrap/LeaderManagementRequestHandlerStepTest.class */
class LeaderManagementRequestHandlerStepTest {
    private static final TestConcurrencyControl CONCURRENCY_CONTROL = new TestConcurrencyControl();
    private static final Duration TIME_OUT = Duration.ofSeconds(10);
    private BrokerStartupContext mockBrokerStartupContext;
    private ActorSchedulingService mockActorSchedulingService;
    private LeaderManagementRequestHandler mockLeaderManagementRequestHandler;
    private ActorFuture<BrokerStartupContext> future;
    private final LeaderManagementRequestHandlerStep sut = new LeaderManagementRequestHandlerStep();

    LeaderManagementRequestHandlerStepTest() {
    }

    @BeforeEach
    void setUp() {
        this.mockActorSchedulingService = (ActorSchedulingService) Mockito.mock(ActorSchedulingService.class);
        Mockito.when(this.mockActorSchedulingService.submitActor((Actor) ArgumentMatchers.any())).thenReturn(CONCURRENCY_CONTROL.completedFuture((Object) null));
        this.mockLeaderManagementRequestHandler = (LeaderManagementRequestHandler) Mockito.mock(LeaderManagementRequestHandler.class);
        Mockito.when(this.mockLeaderManagementRequestHandler.closeAsync()).thenReturn(CONCURRENCY_CONTROL.completedFuture((Object) null));
        this.mockBrokerStartupContext = (BrokerStartupContext) Mockito.mock(BrokerStartupContext.class);
        Mockito.when(this.mockBrokerStartupContext.getConcurrencyControl()).thenReturn(CONCURRENCY_CONTROL);
        Mockito.when(this.mockBrokerStartupContext.getBrokerInfo()).thenReturn((BrokerInfo) Mockito.mock(BrokerInfo.class));
        Mockito.when(this.mockBrokerStartupContext.getActorSchedulingService()).thenReturn(this.mockActorSchedulingService);
        Mockito.when(this.mockBrokerStartupContext.getLeaderManagementRequestHandler()).thenReturn(this.mockLeaderManagementRequestHandler);
        Mockito.when(this.mockBrokerStartupContext.getClusterServices()).thenReturn((ClusterServicesImpl) Mockito.mock(ClusterServicesImpl.class, Mockito.RETURNS_DEEP_STUBS));
        this.future = CONCURRENCY_CONTROL.createFuture();
    }

    @Test
    void shouldCompleteFutureOnStartup() {
        this.sut.startupInternal(this.mockBrokerStartupContext, CONCURRENCY_CONTROL, this.future);
        Assertions.assertThat(this.future).succeedsWithin(TIME_OUT);
        Assertions.assertThat((BrokerStartupContext) this.future.join()).isNotNull();
    }

    @Test
    void shouldScheduleLeaderManagementRequestHandlerActorOnStartup() {
        this.sut.startupInternal(this.mockBrokerStartupContext, CONCURRENCY_CONTROL, this.future);
        ConditionFactory await = Awaitility.await();
        ActorFuture<BrokerStartupContext> actorFuture = this.future;
        Objects.requireNonNull(actorFuture);
        await.until(actorFuture::isDone);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(LeaderManagementRequestHandler.class);
        ((ActorSchedulingService) Mockito.verify(this.mockActorSchedulingService)).submitActor((Actor) forClass.capture());
        ((BrokerStartupContext) Mockito.verify(this.mockBrokerStartupContext)).setLeaderManagementRequestHandler((LeaderManagementRequestHandler) forClass.getValue());
    }

    @Test
    void shouldRegisterLeaderRequestManagementHandlerAsPartitionListenerOnStartup() {
        this.sut.startupInternal(this.mockBrokerStartupContext, CONCURRENCY_CONTROL, this.future);
        ConditionFactory await = Awaitility.await();
        ActorFuture<BrokerStartupContext> actorFuture = this.future;
        Objects.requireNonNull(actorFuture);
        await.until(actorFuture::isDone);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(LeaderManagementRequestHandler.class);
        ((BrokerStartupContext) Mockito.verify(this.mockBrokerStartupContext)).setLeaderManagementRequestHandler((LeaderManagementRequestHandler) forClass.capture());
        ((BrokerStartupContext) Mockito.verify(this.mockBrokerStartupContext)).addPartitionListener((PartitionListener) forClass.getValue());
    }

    @Test
    void shouldRegisterLeaderRequestManagementHandlerAsDiskSpaceListenerOnStartup() {
        this.sut.startupInternal(this.mockBrokerStartupContext, CONCURRENCY_CONTROL, this.future);
        ConditionFactory await = Awaitility.await();
        ActorFuture<BrokerStartupContext> actorFuture = this.future;
        Objects.requireNonNull(actorFuture);
        await.until(actorFuture::isDone);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(LeaderManagementRequestHandler.class);
        ((BrokerStartupContext) Mockito.verify(this.mockBrokerStartupContext)).setLeaderManagementRequestHandler((LeaderManagementRequestHandler) forClass.capture());
        ((BrokerStartupContext) Mockito.verify(this.mockBrokerStartupContext)).addDiskSpaceUsageListener((DiskSpaceUsageListener) forClass.getValue());
    }

    @Test
    void shouldCompleteFutureOnShutdown() {
        this.sut.shutdownInternal(this.mockBrokerStartupContext, CONCURRENCY_CONTROL, this.future);
        Assertions.assertThat(this.future).succeedsWithin(TIME_OUT);
        Assertions.assertThat((BrokerStartupContext) this.future.join()).isNotNull();
    }

    @Test
    void shouldStopHealthCheckServiceOnShutdown() {
        this.sut.shutdownInternal(this.mockBrokerStartupContext, CONCURRENCY_CONTROL, this.future);
        ConditionFactory await = Awaitility.await();
        ActorFuture<BrokerStartupContext> actorFuture = this.future;
        Objects.requireNonNull(actorFuture);
        await.until(actorFuture::isDone);
        ((LeaderManagementRequestHandler) Mockito.verify(this.mockLeaderManagementRequestHandler)).closeAsync();
        ((BrokerStartupContext) Mockito.verify(this.mockBrokerStartupContext)).setLeaderManagementRequestHandler((LeaderManagementRequestHandler) null);
    }

    @Test
    void shouldRemoveLeaderRequestManagementHandlerAsPartitionListenerOnStartup() {
        this.sut.shutdownInternal(this.mockBrokerStartupContext, CONCURRENCY_CONTROL, this.future);
        ConditionFactory await = Awaitility.await();
        ActorFuture<BrokerStartupContext> actorFuture = this.future;
        Objects.requireNonNull(actorFuture);
        await.until(actorFuture::isDone);
        ((BrokerStartupContext) Mockito.verify(this.mockBrokerStartupContext)).removePartitionListener(this.mockLeaderManagementRequestHandler);
    }

    @Test
    void shouldRemoveLeaderRequestManagementHandlerAsDiskSpaceListenerOnStartup() {
        this.sut.shutdownInternal(this.mockBrokerStartupContext, CONCURRENCY_CONTROL, this.future);
        ConditionFactory await = Awaitility.await();
        ActorFuture<BrokerStartupContext> actorFuture = this.future;
        Objects.requireNonNull(actorFuture);
        await.until(actorFuture::isDone);
        ((BrokerStartupContext) Mockito.verify(this.mockBrokerStartupContext)).removeDiskSpaceUsageListener(this.mockLeaderManagementRequestHandler);
    }
}
