package io.camunda.zeebe.broker.system.management;

import io.atomix.raft.RaftServer;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.broker.exporter.stream.ExporterPhase;
import io.camunda.zeebe.broker.partitioning.NoOpPartitionAdminAccess;
import io.camunda.zeebe.broker.partitioning.PartitionAdminAccess;
import io.camunda.zeebe.broker.system.partitions.ZeebePartition;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/system/management/BrokerAdminServiceImpl.class */
public final class BrokerAdminServiceImpl extends Actor implements BrokerAdminService {
    private static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private PartitionAdminAccess adminAccess = new NoOpPartitionAdminAccess();
    private List<ZeebePartition> partitions = Collections.emptyList();

    public void injectAdminAccess(PartitionAdminAccess partitionAdminAccess) {
        this.adminAccess = (PartitionAdminAccess) Objects.requireNonNull(partitionAdminAccess);
    }

    public void injectPartitionInfoSource(@Deprecated List<ZeebePartition> list) {
        this.partitions = list;
    }

    @Override // io.camunda.zeebe.broker.system.management.BrokerAdminService
    public void pauseStreamProcessing() {
        this.actor.call(this::pauseStreamProcessingOnAllPartitions);
    }

    @Override // io.camunda.zeebe.broker.system.management.BrokerAdminService
    public void resumeStreamProcessing() {
        LOG.info("Resuming paused StreamProcessor on all partitions.");
        this.actor.call(() -> {
            return this.adminAccess.resumeProcessing();
        });
    }

    @Override // io.camunda.zeebe.broker.system.management.BrokerAdminService
    public void pauseExporting() {
        this.actor.call(this::pauseExportingOnAllPartitions);
    }

    @Override // io.camunda.zeebe.broker.system.management.BrokerAdminService
    public void softPauseExporting() {
        this.actor.call(this::softPauseExportingOnAllPartitions);
    }

    @Override // io.camunda.zeebe.broker.system.management.BrokerAdminService
    public void resumeExporting() {
        LOG.info("Resuming exporting on all partitions.");
        this.actor.call(() -> {
            return this.adminAccess.resumeExporting();
        });
    }

    @Override // io.camunda.zeebe.broker.system.management.BrokerAdminService
    public void takeSnapshot() {
        this.actor.call(this::takeSnapshotOnAllPartitions);
    }

    @Override // io.camunda.zeebe.broker.system.management.BrokerAdminService
    public void prepareForUpgrade() {
        this.actor.call(this::prepareAllPartitionsForSafeUpgrade);
    }

    @Override // io.camunda.zeebe.broker.system.management.BrokerAdminService
    public Map<Integer, PartitionStatus> getPartitionStatus() {
        CompletableFuture completableFuture = new CompletableFuture();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        this.actor.call(() -> {
            if (this.partitions.isEmpty()) {
                completableFuture.complete(concurrentHashMap);
            } else {
                CompletableFuture.allOf((CompletableFuture[]) this.partitions.stream().map(zeebePartition -> {
                    return getPartitionStatus(zeebePartition).whenComplete((partitionStatus, th) -> {
                        if (th == null) {
                            concurrentHashMap.put(Integer.valueOf(zeebePartition.getPartitionId()), partitionStatus);
                        }
                    });
                }).toList().toArray(i -> {
                    return new CompletableFuture[i];
                })).thenAccept(r5 -> {
                    completableFuture.complete(concurrentHashMap);
                });
            }
        });
        try {
            return (Map) completableFuture.get(5L, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.warn("Error when querying partition status", e);
            return Map.of();
        }
    }

    private CompletableFuture<PartitionStatus> getPartitionStatus(ZeebePartition zeebePartition) {
        CompletableFuture<PartitionStatus> completableFuture = new CompletableFuture<>();
        ActorFuture<RaftServer.Role> currentRole = zeebePartition.getCurrentRole();
        ActorFuture<Optional<StreamProcessor>> streamProcessor = zeebePartition.getStreamProcessor();
        ActorFuture<Optional<ExporterDirector>> exporterDirector = zeebePartition.getExporterDirector();
        this.actor.runOnCompletion(List.of(streamProcessor, exporterDirector), th -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            if (currentRole.join() != RaftServer.Role.LEADER) {
                getFollowerPartitionStatus(zeebePartition, completableFuture);
                return;
            }
            Optional optional = (Optional) streamProcessor.join();
            Optional optional2 = (Optional) exporterDirector.join();
            if (optional.isEmpty() || optional2.isEmpty()) {
                completableFuture.completeExceptionally(new IllegalStateException("No streamprocessor or exporter found for leader partition."));
            } else {
                getLeaderPartitionStatus(zeebePartition, (StreamProcessor) optional.get(), (ExporterDirector) optional2.get(), completableFuture);
            }
        });
        return completableFuture;
    }

    private void getFollowerPartitionStatus(ZeebePartition zeebePartition, CompletableFuture<PartitionStatus> completableFuture) {
        Optional<String> snapshotId = getSnapshotId(zeebePartition);
        completableFuture.complete(PartitionStatus.ofFollower(snapshotId.orElse(null), (Long) snapshotId.flatMap(FileBasedSnapshotId::ofFileName).map((v0) -> {
            return v0.getProcessedPosition();
        }).orElse(null)));
    }

    private void getLeaderPartitionStatus(ZeebePartition zeebePartition, StreamProcessor streamProcessor, ExporterDirector exporterDirector, CompletableFuture<PartitionStatus> completableFuture) {
        ActorFuture lastProcessedPositionAsync = streamProcessor.getLastProcessedPositionAsync();
        ActorFuture currentPhase = streamProcessor.getCurrentPhase();
        ActorFuture<ExporterPhase> phase = exporterDirector.getPhase();
        ActorFuture<Long> lowestPosition = exporterDirector.getLowestPosition();
        Optional<String> snapshotId = getSnapshotId(zeebePartition);
        Long l = (Long) snapshotId.flatMap(FileBasedSnapshotId::ofFileName).map((v0) -> {
            return v0.getProcessedPosition();
        }).orElse(null);
        this.actor.runOnCompletion(List.of(lastProcessedPositionAsync, currentPhase, phase, lowestPosition), th -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            completableFuture.complete(PartitionStatus.ofLeader((Long) lastProcessedPositionAsync.join(), (String) snapshotId.orElse(null), l, (StreamProcessor.Phase) currentPhase.join(), (ExporterPhase) phase.join(), ((Long) lowestPosition.join()).longValue()));
        });
    }

    private Optional<String> getSnapshotId(ZeebePartition zeebePartition) {
        return zeebePartition.getSnapshotStore().getLatestSnapshot().map((v0) -> {
            return v0.getId();
        });
    }

    private void prepareAllPartitionsForSafeUpgrade() {
        LOG.info("Preparing for safe upgrade.");
        this.actor.runOnCompletion((List) Stream.of((Object[]) new ActorFuture[]{pauseStreamProcessingOnAllPartitions(), pauseExportingOnAllPartitions()}).collect(Collectors.toList()), th -> {
            takeSnapshotOnAllPartitions();
        });
    }

    private ActorFuture<Void> pauseStreamProcessingOnAllPartitions() {
        LOG.info("Pausing StreamProcessor on all partitions.");
        return this.adminAccess.pauseProcessing();
    }

    private ActorFuture<Void> takeSnapshotOnAllPartitions() {
        LOG.info("Triggering Snapshots on all partitions.");
        return this.adminAccess.takeSnapshot();
    }

    private ActorFuture<Void> softPauseExportingOnAllPartitions() {
        LOG.info("Soft Pausing exporting on all partitions.");
        return this.adminAccess.softPauseExporting();
    }

    private ActorFuture<Void> pauseExportingOnAllPartitions() {
        LOG.info("Pausing exporting on all partitions.");
        return this.adminAccess.pauseExporting();
    }
}
