package org.apache.samza.coordinator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.samza.AzureClient;
import org.apache.samza.config.AzureConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
import org.apache.samza.coordinator.data.BarrierState;
import org.apache.samza.coordinator.data.ProcessorEntity;
import org.apache.samza.coordinator.scheduler.HeartbeatScheduler;
import org.apache.samza.coordinator.scheduler.JMVersionUpgradeScheduler;
import org.apache.samza.coordinator.scheduler.LeaderBarrierCompleteScheduler;
import org.apache.samza.coordinator.scheduler.LeaderLivenessCheckScheduler;
import org.apache.samza.coordinator.scheduler.LivenessCheckScheduler;
import org.apache.samza.coordinator.scheduler.RenewLeaseScheduler;
import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.BlobUtils;
import org.apache.samza.util.LeaseBlobManager;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.TableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/samza/coordinator/AzureJobCoordinator.class */
public class AzureJobCoordinator implements JobCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(AzureJobCoordinator.class);
    private static final int METADATA_CACHE_TTL_MS = 5000;
    private static final String INITIAL_STATE = "UNASSIGNED";
    private final Consumer<String> errorHandler;
    private final AzureLeaderElector azureLeaderElector;
    private final BlobUtils leaderBlob;
    private final TableUtils table;
    private final Config config;
    private final String processorId;
    private final AtomicBoolean versionUpgradeDetected;
    private final HeartbeatScheduler heartbeat;
    private final JMVersionUpgradeScheduler versionUpgrade;
    private final LeaderLivenessCheckScheduler leaderAlive;
    private LivenessCheckScheduler liveness;
    private RenewLeaseScheduler renewLease;
    private LeaderBarrierCompleteScheduler leaderBarrierScheduler;
    private StreamMetadataCache streamMetadataCache = null;
    private SystemAdmins systemAdmins = null;
    private JobCoordinatorListener coordinatorListener = null;
    private JobModel jobModel = null;
    private final AtomicReference<String> currentJMVersion = new AtomicReference<>(INITIAL_STATE);

    /* loaded from: input_file:org/apache/samza/coordinator/AzureJobCoordinator$AzureLeaderElectorListener.class */
    public class AzureLeaderElectorListener implements LeaderElectorListener {
        public AzureLeaderElectorListener() {
        }

        public void onBecomingLeader() {
            AzureJobCoordinator.this.table.updateIsLeader((String) AzureJobCoordinator.this.currentJMVersion.get(), AzureJobCoordinator.this.processorId, true);
            AzureJobCoordinator.LOG.info("Starting scheduler to keep renewing lease held by the leader.");
            AzureJobCoordinator.this.renewLease = new RenewLeaseScheduler(str -> {
                AzureJobCoordinator.LOG.error(str);
                AzureJobCoordinator.this.table.updateIsLeader((String) AzureJobCoordinator.this.currentJMVersion.get(), AzureJobCoordinator.this.processorId, false);
                AzureJobCoordinator.this.azureLeaderElector.resignLeadership();
                AzureJobCoordinator.this.renewLease.shutdown();
                AzureJobCoordinator.this.liveness.shutdown();
            }, AzureJobCoordinator.this.azureLeaderElector.getLeaseBlobManager(), AzureJobCoordinator.this.azureLeaderElector.getLeaseId());
            AzureJobCoordinator.this.renewLease.scheduleTask();
            AzureJobCoordinator.this.doOnProcessorChange(new ArrayList());
            AzureJobCoordinator.LOG.info("Starting scheduler to check for change in list of live processors in the system.");
            AzureJobCoordinator.this.liveness = new LivenessCheckScheduler(AzureJobCoordinator.this.errorHandler, AzureJobCoordinator.this.table, AzureJobCoordinator.this.leaderBlob, AzureJobCoordinator.this.currentJMVersion, AzureJobCoordinator.this.processorId);
            AzureJobCoordinator.this.liveness.setStateChangeListener(AzureJobCoordinator.this.createLivenessListener(AzureJobCoordinator.this.liveness.getLiveProcessors()));
            AzureJobCoordinator.this.liveness.scheduleTask();
        }
    }

    public AzureJobCoordinator(String str, Config config, MetricsRegistry metricsRegistry) {
        this.processorId = str;
        this.config = config;
        AzureConfig azureConfig = new AzureConfig(config);
        AzureClient azureClient = new AzureClient(azureConfig.getAzureConnectionString());
        this.leaderBlob = new BlobUtils(azureClient, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength());
        this.errorHandler = str2 -> {
            LOG.error(str2);
            stop();
        };
        this.table = new TableUtils(azureClient, azureConfig.getAzureTableName(), INITIAL_STATE);
        this.azureLeaderElector = new AzureLeaderElector(new LeaseBlobManager(this.leaderBlob.getBlob()));
        this.azureLeaderElector.setLeaderElectorListener(new AzureLeaderElectorListener());
        this.versionUpgradeDetected = new AtomicBoolean(false);
        this.heartbeat = new HeartbeatScheduler(this.errorHandler, this.table, this.currentJMVersion, str);
        this.versionUpgrade = new JMVersionUpgradeScheduler(this.errorHandler, this.leaderBlob, this.currentJMVersion, this.versionUpgradeDetected, str);
        this.leaderAlive = new LeaderLivenessCheckScheduler(this.errorHandler, this.table, this.leaderBlob, this.currentJMVersion, INITIAL_STATE);
        this.leaderBarrierScheduler = null;
        this.renewLease = null;
        this.liveness = null;
    }

    public void start() {
        LOG.info("Starting Azure job coordinator.");
        this.systemAdmins = new SystemAdmins(this.config);
        this.systemAdmins.start();
        this.streamMetadataCache = new StreamMetadataCache(this.systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
        this.table.addProcessorEntity(INITIAL_STATE, this.processorId, false);
        LOG.info("Starting scheduler for heartbeating.");
        this.heartbeat.scheduleTask();
        this.azureLeaderElector.tryBecomeLeader();
        LOG.info("Starting scheduler to check for job model version upgrades.");
        this.versionUpgrade.setStateChangeListener(createJMVersionUpgradeListener());
        this.versionUpgrade.scheduleTask();
        LOG.info("Starting scheduler to check for leader liveness.");
        this.leaderAlive.setStateChangeListener(createLeaderLivenessListener());
        this.leaderAlive.scheduleTask();
    }

    public void stop() {
        LOG.info("Shutting down Azure job coordinator.");
        this.azureLeaderElector.resignLeadership();
        this.table.deleteProcessorEntity(this.currentJMVersion.get(), this.processorId, true);
        shutdownSchedulers();
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onJobModelExpired();
        }
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onCoordinatorStop();
        }
        this.systemAdmins.stop();
    }

    public String getProcessorId() {
        return this.processorId;
    }

    public void setListener(JobCoordinatorListener jobCoordinatorListener) {
        this.coordinatorListener = jobCoordinatorListener;
    }

    public JobModel getJobModel() {
        return this.jobModel;
    }

    private void shutdownSchedulers() {
        if (this.renewLease != null) {
            this.renewLease.shutdown();
        }
        if (this.leaderBarrierScheduler != null) {
            this.leaderBarrierScheduler.shutdown();
        }
        if (this.liveness != null) {
            this.liveness.shutdown();
        }
        this.heartbeat.shutdown();
        this.leaderAlive.shutdown();
        this.versionUpgrade.shutdown();
    }

    private SchedulerStateChangeListener createLeaderBarrierCompleteListener(String str, AtomicBoolean atomicBoolean) {
        return () -> {
            String str2;
            this.versionUpgradeDetected.getAndSet(false);
            if (atomicBoolean.get()) {
                LOG.error("Barrier timed out for version {}", str);
                str2 = BarrierState.TIMEOUT.name() + " " + str;
            } else {
                LOG.info("Leader detected barrier completion.");
                str2 = BarrierState.END.name() + " " + str;
            }
            if (!this.leaderBlob.publishBarrierState(str2, this.azureLeaderElector.getLeaseId().get())) {
                LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", this.jobModel, this.processorId);
                stop();
            }
            this.leaderBarrierScheduler.shutdown();
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SchedulerStateChangeListener createLivenessListener(AtomicReference<List<String>> atomicReference) {
        return () -> {
            LOG.info("Leader detected change in list of live processors.");
            doOnProcessorChange((List) atomicReference.get());
        };
    }

    private SchedulerStateChangeListener createJMVersionUpgradeListener() {
        return () -> {
            LOG.info("Job model version upgrade detected.");
            this.versionUpgradeDetected.getAndSet(true);
            onNewJobModelAvailable(this.leaderBlob.getJobModelVersion());
        };
    }

    private SchedulerStateChangeListener createLeaderLivenessListener() {
        return () -> {
            LOG.info("Existing leader died.");
            this.azureLeaderElector.tryBecomeLeader();
        };
    }

    private Set<SystemStreamPartition> getInputStreamPartitions() {
        return (Set) ((Map) JavaConverters.mapAsJavaMapConverter(this.streamMetadataCache.getStreamMetadata(((scala.collection.mutable.Set) JavaConverters.asScalaSetConverter(new TaskConfig(this.config).getInputStreams()).asScala()).toSet(), true)).asJava()).entrySet().stream().flatMap(this::mapSSMToSSP).collect(Collectors.toSet());
    }

    private Stream<SystemStreamPartition> mapSSMToSSP(Map.Entry<SystemStream, SystemStreamMetadata> entry) {
        return entry.getValue().getSystemStreamPartitionMetadata().keySet().stream().map(partition -> {
            return new SystemStreamPartition((SystemStream) entry.getKey(), partition);
        });
    }

    private SystemStreamPartitionGrouper getSystemStreamPartitionGrouper() {
        JobConfig jobConfig = new JobConfig(this.config);
        return ((SystemStreamPartitionGrouperFactory) ReflectionUtil.getObj(jobConfig.getSystemStreamPartitionGrouperFactory(), SystemStreamPartitionGrouperFactory.class)).getSystemStreamPartitionGrouper(jobConfig);
    }

    private int getMaxNumTasks() {
        Set<SystemStreamPartition> inputStreamPartitions = getInputStreamPartitions();
        SystemStreamPartitionGrouper systemStreamPartitionGrouper = getSystemStreamPartitionGrouper();
        Map group = systemStreamPartitionGrouper.group(inputStreamPartitions);
        LOG.info("SystemStreamPartitionGrouper " + systemStreamPartitionGrouper.toString() + " has grouped the SystemStreamPartitions into " + Integer.toString(group.size()) + " tasks with the following taskNames: {}", group.keySet());
        return group.size();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doOnProcessorChange(List<String> list) {
        String num;
        ArrayList arrayList = new ArrayList(list);
        int maxNumTasks = getMaxNumTasks();
        if (list.size() > maxNumTasks) {
            int i = 0;
            while (list.size() != maxNumTasks) {
                if (!list.get(i).equals(this.processorId)) {
                    list.remove(i);
                    i++;
                }
            }
        }
        LOG.info("currentProcessorIds = {}", list);
        LOG.info("initialProcessorIds = {}", arrayList);
        String str = this.currentJMVersion.get();
        JobModel jobModel = this.jobModel;
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (list.isEmpty()) {
            num = this.currentJMVersion.get().equals(INITIAL_STATE) ? "1" : Integer.toString(Integer.valueOf(str).intValue() + 1);
            arrayList = new ArrayList(this.table.getActiveProcessorsList(this.currentJMVersion));
        } else {
            String jobModelVersion = this.leaderBlob.getJobModelVersion();
            num = Integer.toString(Integer.valueOf(str).intValue() + 1);
            if (jobModelVersion != null && Integer.valueOf(jobModelVersion).intValue() > Integer.valueOf(str).intValue()) {
                str = jobModelVersion;
                jobModel = this.leaderBlob.getJobModel();
                num = Integer.toString(Integer.valueOf(jobModelVersion).intValue() + 1);
                this.versionUpgradeDetected.getAndSet(false);
                this.leaderBarrierScheduler.shutdown();
                this.leaderBlob.publishBarrierState(BarrierState.TIMEOUT.name() + " " + jobModelVersion, this.azureLeaderElector.getLeaseId().get());
            }
        }
        JobModel calculateJobModel = JobModelCalculator.INSTANCE.calculateJobModel(this.config, Collections.emptyMap(), this.streamMetadataCache, new GrouperMetadataImpl(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()));
        LOG.info("pid=" + this.processorId + "Generated new Job Model. Version = " + num);
        boolean publishJobModel = this.leaderBlob.publishJobModel(jobModel, calculateJobModel, str, num, this.azureLeaderElector.getLeaseId().get());
        boolean publishBarrierState = this.leaderBlob.publishBarrierState(BarrierState.START.name() + " " + num, this.azureLeaderElector.getLeaseId().get());
        atomicBoolean.set(false);
        boolean publishLiveProcessorList = this.leaderBlob.publishLiveProcessorList(arrayList, this.azureLeaderElector.getLeaseId().get());
        if (!publishJobModel || !publishBarrierState || !publishLiveProcessorList) {
            LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", this.jobModel, this.processorId);
            stop();
        }
        LOG.info("pid=" + this.processorId + "Published new Job Model. Version = " + num);
        this.leaderBarrierScheduler = new LeaderBarrierCompleteScheduler(this.errorHandler, this.table, num, arrayList, System.currentTimeMillis(), atomicBoolean, this.currentJMVersion, this.processorId);
        this.leaderBarrierScheduler.setStateChangeListener(createLeaderBarrierCompleteListener(num, atomicBoolean));
        this.leaderBarrierScheduler.scheduleTask();
    }

    private void onNewJobModelAvailable(String str) {
        LOG.info("pid=" + this.processorId + "new JobModel available with job model version {}", str);
        this.jobModel = this.leaderBlob.getJobModel();
        LOG.info("pid=" + this.processorId + ": new JobModel available. ver=" + str + "; jm = " + this.jobModel);
        if (!this.jobModel.getContainers().containsKey(this.processorId)) {
            LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", this.jobModel, this.processorId);
            stop();
            return;
        }
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onJobModelExpired();
        }
        this.table.addProcessorEntity(str, this.processorId, this.azureLeaderElector.amILeader());
        Random random = new Random();
        String barrierState = this.leaderBlob.getBarrierState();
        while (true) {
            String str2 = barrierState;
            if (str2.equals(BarrierState.END.name() + " " + str)) {
                LOG.info("Barrier completion detected by the worker for barrier version {}.", str);
                this.versionUpgradeDetected.getAndSet(false);
                onNewJobModelConfirmed(str);
                return;
            } else {
                if (str2.equals(BarrierState.TIMEOUT.name() + " " + str) || Integer.valueOf(this.leaderBlob.getJobModelVersion()).intValue() > Integer.valueOf(str).intValue()) {
                    break;
                }
                try {
                    Thread.sleep(random.nextInt(METADATA_CACHE_TTL_MS));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                LOG.info("Checking for barrier state on the blob again...");
                barrierState = this.leaderBlob.getBarrierState();
            }
        }
        LOG.info("Barrier timed out for version number {}", str);
        this.versionUpgradeDetected.getAndSet(false);
    }

    private void onNewJobModelConfirmed(String str) {
        LOG.info("pid=" + this.processorId + "new version " + str + " of the job model got confirmed");
        String str2 = this.currentJMVersion.get();
        this.currentJMVersion.getAndSet(str);
        ProcessorEntity entity = this.table.getEntity(str2, this.processorId);
        if (entity != null) {
            entity.setEtag("*");
            this.table.deleteProcessorEntity(entity);
        }
        ProcessorEntity entity2 = this.table.getEntity(INITIAL_STATE, this.processorId);
        if (entity2 != null) {
            entity2.setEtag("*");
            this.table.deleteProcessorEntity(entity2);
        }
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onNewJobModel(this.processorId, this.jobModel);
        }
    }
}
