package io.camunda.zeebe.broker.partitioning;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.partition.RaftPartitionGroup;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.engine.impl.LongPollingJobNotification;
import io.camunda.zeebe.broker.exporter.repo.ExporterRepository;
import io.camunda.zeebe.broker.logstreams.state.StatePositionSupplier;
import io.camunda.zeebe.broker.partitioning.topology.TopologyManager;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.system.partitions.PartitionStartupAndTransitionContextImpl;
import io.camunda.zeebe.broker.system.partitions.PartitionStartupContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.broker.system.partitions.TypedRecordProcessorsFactory;
import io.camunda.zeebe.broker.system.partitions.ZeebePartition;
import io.camunda.zeebe.broker.system.partitions.impl.AtomixPartitionMessagingService;
import io.camunda.zeebe.broker.system.partitions.impl.AtomixRecordEntrySupplierImpl;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionProcessingState;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionTransitionImpl;
import io.camunda.zeebe.broker.system.partitions.impl.StateControllerImpl;
import io.camunda.zeebe.broker.system.partitions.impl.steps.BackupApiRequestHandlerStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.BackupServiceTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.BackupStoreTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.ExporterDirectorPartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.InterPartitionCommandServiceStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogStoragePartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogStreamPartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.QueryServicePartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.RockDbMetricExporterPartitionStartupStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.SnapshotDirectorPartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.StreamProcessorTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.ZeebeDbPartitionTransitionStep;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiService;
import io.camunda.zeebe.db.impl.rocksdb.ZeebeRocksDbFactory;
import io.camunda.zeebe.engine.api.InterPartitionCommandSender;
import io.camunda.zeebe.engine.processing.EngineProcessors;
import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentDistributionCommandSender;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.startup.StartupStep;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStoreFactory;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
import io.camunda.zeebe.util.FeatureFlags;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/camunda/zeebe/broker/partitioning/PartitionFactory.class */
public final class PartitionFactory {
    private static final List<StartupStep<PartitionStartupContext>> STARTUP_STEPS = List.of(new RockDbMetricExporterPartitionStartupStep());
    private static final List<PartitionTransitionStep> TRANSITION_STEPS = List.of((Object[]) new PartitionTransitionStep[]{new LogStoragePartitionTransitionStep(), new LogStreamPartitionTransitionStep(), new ZeebeDbPartitionTransitionStep(), new QueryServicePartitionTransitionStep(), new BackupStoreTransitionStep(), new BackupServiceTransitionStep(), new InterPartitionCommandServiceStep(), new StreamProcessorTransitionStep(), new SnapshotDirectorPartitionTransitionStep(), new ExporterDirectorPartitionTransitionStep(), new BackupApiRequestHandlerStep()});
    private final ActorSchedulingService actorSchedulingService;
    private final BrokerCfg brokerCfg;
    private final BrokerInfo localBroker;
    private final CommandApiService commandApiService;
    private final FileBasedSnapshotStoreFactory snapshotStoreFactory;
    private final ClusterServices clusterServices;
    private final ExporterRepository exporterRepository;
    private final BrokerHealthCheckService healthCheckService;
    private final DiskSpaceUsageMonitor diskSpaceUsageMonitor;
    private final AtomixServerTransport gatewayBrokerTransport;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionFactory(ActorSchedulingService actorSchedulingService, BrokerCfg brokerCfg, BrokerInfo brokerInfo, CommandApiService commandApiService, FileBasedSnapshotStoreFactory fileBasedSnapshotStoreFactory, ClusterServices clusterServices, ExporterRepository exporterRepository, BrokerHealthCheckService brokerHealthCheckService, DiskSpaceUsageMonitor diskSpaceUsageMonitor, AtomixServerTransport atomixServerTransport) {
        this.actorSchedulingService = actorSchedulingService;
        this.brokerCfg = brokerCfg;
        this.localBroker = brokerInfo;
        this.commandApiService = commandApiService;
        this.snapshotStoreFactory = fileBasedSnapshotStoreFactory;
        this.clusterServices = clusterServices;
        this.exporterRepository = exporterRepository;
        this.healthCheckService = brokerHealthCheckService;
        this.diskSpaceUsageMonitor = diskSpaceUsageMonitor;
        this.gatewayBrokerTransport = atomixServerTransport;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ZeebePartition> constructPartitions(RaftPartitionGroup raftPartitionGroup, List<PartitionListener> list, TopologyManager topologyManager, FeatureFlags featureFlags) {
        ArrayList arrayList = new ArrayList();
        ClusterCommunicationService communicationService = this.clusterServices.getCommunicationService();
        ClusterEventService eventService = this.clusterServices.getEventService();
        ClusterMembershipService membershipService = this.clusterServices.getMembershipService();
        Stream stream = raftPartitionGroup.getPartitionsWithMember(membershipService.getLocalMember().id()).stream();
        Class<RaftPartition> cls = RaftPartition.class;
        Objects.requireNonNull(RaftPartition.class);
        List<RaftPartition> list2 = stream.map((v1) -> {
            return r1.cast(v1);
        }).toList();
        TypedRecordProcessorsFactory createFactory = createFactory(this.localBroker, eventService, featureFlags);
        for (RaftPartition raftPartition : list2) {
            Integer num = (Integer) raftPartition.id().id();
            ConstructableSnapshotStore constructableSnapshotStore = this.snapshotStoreFactory.getConstructableSnapshotStore(num.intValue());
            StateController createStateController = createStateController(raftPartition, constructableSnapshotStore, this.snapshotStoreFactory.getSnapshotStoreConcurrencyControl(num.intValue()));
            int nodeId = this.localBroker.getNodeId();
            AtomixPartitionMessagingService atomixPartitionMessagingService = new AtomixPartitionMessagingService(communicationService, membershipService, raftPartition.members());
            ActorSchedulingService actorSchedulingService = this.actorSchedulingService;
            BrokerCfg brokerCfg = this.brokerCfg;
            CommandApiService commandApiService = this.commandApiService;
            Objects.requireNonNull(commandApiService);
            ZeebePartition zeebePartition = new ZeebePartition(new PartitionStartupAndTransitionContextImpl(nodeId, communicationService, raftPartition, list, atomixPartitionMessagingService, actorSchedulingService, brokerCfg, commandApiService::newCommandResponseWriter, () -> {
                return this.commandApiService.getOnProcessedListener(num.intValue());
            }, constructableSnapshotStore, createStateController, createFactory, this.exporterRepository, new PartitionProcessingState(raftPartition), this.diskSpaceUsageMonitor, this.gatewayBrokerTransport, topologyManager), new PartitionTransitionImpl(TRANSITION_STEPS), STARTUP_STEPS);
            this.healthCheckService.registerMonitoredPartition(zeebePartition.getPartitionId(), zeebePartition);
            arrayList.add(zeebePartition);
        }
        return arrayList;
    }

    private StateController createStateController(RaftPartition raftPartition, ConstructableSnapshotStore constructableSnapshotStore, ConcurrencyControl concurrencyControl) {
        return new StateControllerImpl(new ZeebeRocksDbFactory(this.brokerCfg.getExperimental().getRocksdb().createRocksDbConfiguration(), this.brokerCfg.getExperimental().getConsistencyChecks().getSettings()), constructableSnapshotStore, raftPartition.dataDirectory().toPath().resolve("runtime"), new AtomixRecordEntrySupplierImpl(raftPartition.getServer()), StatePositionSupplier::getHighestExportedPosition, concurrencyControl);
    }

    private TypedRecordProcessorsFactory createFactory(BrokerInfo brokerInfo, ClusterEventService clusterEventService, FeatureFlags featureFlags) {
        return typedRecordProcessorContext -> {
            InterPartitionCommandSender partitionCommandSender = typedRecordProcessorContext.getPartitionCommandSender();
            SubscriptionCommandSender subscriptionCommandSender = new SubscriptionCommandSender(typedRecordProcessorContext.getPartitionId(), partitionCommandSender);
            DeploymentDistributionCommandSender deploymentDistributionCommandSender = new DeploymentDistributionCommandSender(typedRecordProcessorContext.getPartitionId(), partitionCommandSender);
            LongPollingJobNotification longPollingJobNotification = new LongPollingJobNotification(clusterEventService);
            int partitionsCount = brokerInfo.getPartitionsCount();
            Objects.requireNonNull(longPollingJobNotification);
            return EngineProcessors.createEngineProcessors(typedRecordProcessorContext, partitionsCount, subscriptionCommandSender, deploymentDistributionCommandSender, longPollingJobNotification::onJobsAvailable, featureFlags);
        };
    }
}
