package io.camunda.zeebe.broker.partitioning;

import io.atomix.cluster.MemberId;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.partition.PartitionMetadata;
import io.atomix.primitive.partition.impl.DefaultPartitionManagementService;
import io.atomix.raft.partition.RaftPartition;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.PartitionRaftListener;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.exporter.repo.ExporterRepository;
import io.camunda.zeebe.broker.partitioning.startup.PartitionStartupContext;
import io.camunda.zeebe.broker.partitioning.startup.RaftPartitionFactory;
import io.camunda.zeebe.broker.partitioning.startup.ZeebePartitionFactory;
import io.camunda.zeebe.broker.partitioning.topology.PartitionDistribution;
import io.camunda.zeebe.broker.partitioning.topology.TopologyManagerImpl;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.system.partitions.ZeebePartition;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiService;
import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.ActorFutureCollector;
import io.camunda.zeebe.scheduler.startup.StartupProcessShutdownException;
import io.camunda.zeebe.topology.changes.PartitionChangeExecutor;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
import io.camunda.zeebe.util.FeatureFlags;
import io.camunda.zeebe.util.health.HealthStatus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/broker/partitioning/PartitionManagerImpl.class */
public final class PartitionManagerImpl implements PartitionManager, PartitionChangeExecutor {
    public static final String GROUP_NAME = "raft-partition";
    private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManagerImpl.class);
    private final ConcurrencyControl concurrencyControl;
    private final BrokerHealthCheckService healthCheckService;
    private final ActorSchedulingService actorSchedulingService;
    private final TopologyManagerImpl topologyManager;
    private final Map<Integer, Partition> partitions = new ConcurrentHashMap();
    private final DiskSpaceUsageMonitor diskSpaceUsageMonitor;
    private final PartitionDistribution partitionDistribution;
    private final DefaultPartitionManagementService managementService;
    private final BrokerCfg brokerCfg;
    private final ZeebePartitionFactory zeebePartitionFactory;
    private final RaftPartitionFactory raftPartitionFactory;

    public PartitionManagerImpl(ConcurrencyControl concurrencyControl, ActorSchedulingService actorSchedulingService, BrokerCfg brokerCfg, BrokerInfo brokerInfo, ClusterServices clusterServices, BrokerHealthCheckService brokerHealthCheckService, DiskSpaceUsageMonitor diskSpaceUsageMonitor, List<PartitionListener> list, List<PartitionRaftListener> list2, CommandApiService commandApiService, ExporterRepository exporterRepository, AtomixServerTransport atomixServerTransport, JobStreamer jobStreamer, PartitionDistribution partitionDistribution) {
        this.brokerCfg = brokerCfg;
        this.concurrencyControl = concurrencyControl;
        this.actorSchedulingService = actorSchedulingService;
        this.healthCheckService = brokerHealthCheckService;
        this.diskSpaceUsageMonitor = diskSpaceUsageMonitor;
        FeatureFlags featureFlags = brokerCfg.getExperimental().getFeatures().toFeatureFlags();
        this.partitionDistribution = partitionDistribution;
        this.topologyManager = new TopologyManagerImpl(clusterServices.getMembershipService(), brokerInfo);
        ArrayList arrayList = new ArrayList(list);
        arrayList.add(this.topologyManager);
        this.zeebePartitionFactory = new ZeebePartitionFactory(actorSchedulingService, brokerCfg, brokerInfo, commandApiService, clusterServices, exporterRepository, diskSpaceUsageMonitor, atomixServerTransport, jobStreamer, arrayList, list2, this.topologyManager, featureFlags);
        this.managementService = new DefaultPartitionManagementService(clusterServices.getMembershipService(), clusterServices.getCommunicationService());
        this.raftPartitionFactory = new RaftPartitionFactory(brokerCfg);
    }

    public void start() {
        this.actorSchedulingService.submitActor(this.topologyManager);
        MemberId id = this.managementService.getMembershipService().getLocalMember().id();
        List<PartitionMetadata> list = this.partitionDistribution.partitions().stream().filter(partitionMetadata -> {
            return partitionMetadata.members().contains(id);
        }).toList();
        this.healthCheckService.registerBootstrapPartitions(list);
        Iterator<PartitionMetadata> it = list.iterator();
        while (it.hasNext()) {
            bootstrapPartition(it.next());
        }
    }

    private ActorFuture<Void> bootstrapPartition(PartitionMetadata partitionMetadata) {
        ActorFuture<Void> createFuture = this.concurrencyControl.createFuture();
        Integer num = (Integer) partitionMetadata.id().id();
        Partition bootstrapping = Partition.bootstrapping(new PartitionStartupContext(this.actorSchedulingService, this.concurrencyControl, this.topologyManager, this.diskSpaceUsageMonitor, this.healthCheckService, this.managementService, partitionMetadata, this.raftPartitionFactory, this.zeebePartitionFactory, this.brokerCfg));
        this.partitions.put(num, bootstrapping);
        this.concurrencyControl.runOnCompletion(bootstrapping.start(), (partition, th) -> {
            completePartitionStart(num.intValue(), th, createFuture);
        });
        return createFuture;
    }

    private ActorFuture<Void> joinPartition(PartitionMetadata partitionMetadata) {
        ActorFuture<Void> createFuture = this.concurrencyControl.createFuture();
        Integer num = (Integer) partitionMetadata.id().id();
        Partition joining = Partition.joining(new PartitionStartupContext(this.actorSchedulingService, this.concurrencyControl, this.topologyManager, this.diskSpaceUsageMonitor, this.healthCheckService, this.managementService, partitionMetadata, this.raftPartitionFactory, this.zeebePartitionFactory, this.brokerCfg));
        if (this.partitions.putIfAbsent(num, joining) != null) {
            createFuture.completeExceptionally(new IllegalStateException(String.format("Partition %d already exists", num)));
            return createFuture;
        }
        this.concurrencyControl.run(() -> {
            this.concurrencyControl.runOnCompletion(joining.start(), (partition, th) -> {
                completePartitionStart(num.intValue(), th, createFuture);
            });
        });
        return createFuture;
    }

    private void completePartitionStart(int i, Throwable th, ActorFuture<Void> actorFuture) {
        if (th == null) {
            LOGGER.info("Started partition {}", Integer.valueOf(i));
            actorFuture.complete((Object) null);
            return;
        }
        if (th instanceof StartupProcessShutdownException) {
            LOGGER.warn("Aborting startup of partition {}", Integer.valueOf(i), th);
        } else {
            LOGGER.error("Failed to start partition {}", Integer.valueOf(i), th);
        }
        this.topologyManager.onHealthChanged(i, HealthStatus.DEAD);
        actorFuture.completeExceptionally(th);
    }

    public ActorFuture<Void> stop() {
        ActorFuture<Void> createFuture = this.concurrencyControl.createFuture();
        this.concurrencyControl.runOnCompletion((ActorFuture) this.partitions.values().stream().map((v0) -> {
            return v0.stop();
        }).collect(new ActorFutureCollector(this.concurrencyControl)), (list, th) -> {
            if (th != null) {
                LOGGER.error("Failed to stop partitions", th);
                createFuture.completeExceptionally(th);
            } else {
                this.partitions.clear();
                this.topologyManager.closeAsync().onComplete(createFuture);
            }
        });
        return createFuture;
    }

    public String toString() {
        return "PartitionManagerImpl{partitions=" + this.partitions + "}";
    }

    @Override // io.camunda.zeebe.broker.partitioning.PartitionManager
    public RaftPartition getRaftPartition(int i) {
        return this.partitions.get(Integer.valueOf(i)).raftPartition();
    }

    @Override // io.camunda.zeebe.broker.partitioning.PartitionManager
    public Collection<RaftPartition> getRaftPartitions() {
        return this.partitions.values().stream().map((v0) -> {
            return v0.raftPartition();
        }).toList();
    }

    @Override // io.camunda.zeebe.broker.partitioning.PartitionManager
    public Collection<ZeebePartition> getZeebePartitions() {
        return this.partitions.values().stream().map((v0) -> {
            return v0.zeebePartition();
        }).toList();
    }

    public ActorFuture<Void> join(int i, Map<MemberId, Integer> map) {
        int intValue = ((Integer) Collections.max(map.values())).intValue();
        Set<MemberId> keySet = map.keySet();
        List list = map.entrySet().stream().filter(entry -> {
            return ((Integer) entry.getValue()).intValue() == intValue;
        }).map((v0) -> {
            return v0.getKey();
        }).toList();
        MemberId memberId = null;
        if (list.size() == 1) {
            memberId = (MemberId) list.get(0);
        }
        return joinPartition(new PartitionMetadata(PartitionId.from("raft-partition", i), keySet, map, intValue, memberId));
    }

    public ActorFuture<Void> leave(int i) {
        throw new UnsupportedOperationException("Not yet implemented");
    }
}
