package org.apache.pulsar.functions.worker;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.pulsar.shade.io.netty.handler.codec.http2.Http2CodecUtil;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Runnables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager.class */
public class SchedulerManager implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SchedulerManager.class);
    private final WorkerConfig workerConfig;
    private final ErrorNotifier errorNotifier;
    private final WorkerStatsManager workerStatsManager;
    private ThreadPoolExecutor executorService;
    private final PulsarClient pulsarClient;
    private FunctionMetaDataManager functionMetaDataManager;
    private LeaderService leaderService;
    private MembershipManager membershipManager;
    private FunctionRuntimeManager functionRuntimeManager;
    private final IScheduler scheduler;
    private Producer<byte[]> exclusiveProducer;
    private ScheduledExecutorService scheduledExecutorService;
    private final PulsarAdmin admin;
    private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60;
    public static final String HEARTBEAT_TENANT = "pulsar-function";
    public static final String HEARTBEAT_NAMESPACE = "heartbeat";
    private Future<?> currentRebalanceFuture;
    private Future<?> currentDrainFuture;
    private List<Function.Assignment> assignmentsMovedInLastDrain;
    private Lock schedulerLock = new ReentrantLock(true);
    private volatile boolean isRunning = false;
    AtomicBoolean isCompactionNeeded = new AtomicBoolean(false);
    private MessageId lastMessageProduced = null;
    private MessageId metadataTopicLastMessage = MessageId.earliest;
    private AtomicBoolean rebalanceInProgress = new AtomicBoolean(false);
    private AtomicBoolean drainInProgressFlag = new AtomicBoolean(false);
    private ConcurrentHashMap<String, DrainOpStatus> drainOpStatusMap = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager$DrainInProgressException.class */
    public static class DrainInProgressException extends RuntimeException {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager$DrainOpStatus.class */
    public enum DrainOpStatus {
        DrainNotInProgress,
        DrainInProgress,
        DrainCompleted
    }

    /* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager$RebalanceInProgressException.class */
    public static class RebalanceInProgressException extends RuntimeException {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager$SchedulerStats.class */
    public static class SchedulerStats {
        private Map<String, WorkerStats> workerStatsMap = new HashMap();
        private Map<String, String> instanceToWorkerId = new HashMap();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager$SchedulerStats$WorkerStats.class */
        public static class WorkerStats {
            private int originalNumAssignments;
            private int finalNumAssignments;
            private int instancesAdded;
            private int instancesRemoved;
            private int instancesUpdated;
            private boolean alive;

            /* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager$SchedulerStats$WorkerStats$WorkerStatsBuilder.class */
            public static class WorkerStatsBuilder {
                private int originalNumAssignments;
                private int finalNumAssignments;
                private int instancesAdded;
                private int instancesRemoved;
                private int instancesUpdated;
                private boolean alive;

                WorkerStatsBuilder() {
                }

                public WorkerStatsBuilder originalNumAssignments(int i) {
                    this.originalNumAssignments = i;
                    return this;
                }

                public WorkerStatsBuilder finalNumAssignments(int i) {
                    this.finalNumAssignments = i;
                    return this;
                }

                public WorkerStatsBuilder instancesAdded(int i) {
                    this.instancesAdded = i;
                    return this;
                }

                public WorkerStatsBuilder instancesRemoved(int i) {
                    this.instancesRemoved = i;
                    return this;
                }

                public WorkerStatsBuilder instancesUpdated(int i) {
                    this.instancesUpdated = i;
                    return this;
                }

                public WorkerStatsBuilder alive(boolean z) {
                    this.alive = z;
                    return this;
                }

                public WorkerStats build() {
                    return new WorkerStats(this.originalNumAssignments, this.finalNumAssignments, this.instancesAdded, this.instancesRemoved, this.instancesUpdated, this.alive);
                }

                public String toString() {
                    return "SchedulerManager.SchedulerStats.WorkerStats.WorkerStatsBuilder(originalNumAssignments=" + this.originalNumAssignments + ", finalNumAssignments=" + this.finalNumAssignments + ", instancesAdded=" + this.instancesAdded + ", instancesRemoved=" + this.instancesRemoved + ", instancesUpdated=" + this.instancesUpdated + ", alive=" + this.alive + DefaultExpressionEngine.DEFAULT_INDEX_END;
                }
            }

            WorkerStats(int i, int i2, int i3, int i4, int i5, boolean z) {
                this.originalNumAssignments = i;
                this.finalNumAssignments = i2;
                this.instancesAdded = i3;
                this.instancesRemoved = i4;
                this.instancesUpdated = i5;
                this.alive = z;
            }

            public static WorkerStatsBuilder builder() {
                return new WorkerStatsBuilder();
            }

            public int getOriginalNumAssignments() {
                return this.originalNumAssignments;
            }

            public int getFinalNumAssignments() {
                return this.finalNumAssignments;
            }

            public int getInstancesAdded() {
                return this.instancesAdded;
            }

            public int getInstancesRemoved() {
                return this.instancesRemoved;
            }

            public int getInstancesUpdated() {
                return this.instancesUpdated;
            }

            public boolean isAlive() {
                return this.alive;
            }

            public void setOriginalNumAssignments(int i) {
                this.originalNumAssignments = i;
            }

            public void setFinalNumAssignments(int i) {
                this.finalNumAssignments = i;
            }

            public void setInstancesAdded(int i) {
                this.instancesAdded = i;
            }

            public void setInstancesRemoved(int i) {
                this.instancesRemoved = i;
            }

            public void setInstancesUpdated(int i) {
                this.instancesUpdated = i;
            }

            public void setAlive(boolean z) {
                this.alive = z;
            }

            public boolean equals(Object obj) {
                if (obj == this) {
                    return true;
                }
                if (!(obj instanceof WorkerStats)) {
                    return false;
                }
                WorkerStats workerStats = (WorkerStats) obj;
                return workerStats.canEqual(this) && getOriginalNumAssignments() == workerStats.getOriginalNumAssignments() && getFinalNumAssignments() == workerStats.getFinalNumAssignments() && getInstancesAdded() == workerStats.getInstancesAdded() && getInstancesRemoved() == workerStats.getInstancesRemoved() && getInstancesUpdated() == workerStats.getInstancesUpdated() && isAlive() == workerStats.isAlive();
            }

            protected boolean canEqual(Object obj) {
                return obj instanceof WorkerStats;
            }

            public int hashCode() {
                return (((((((((((1 * 59) + getOriginalNumAssignments()) * 59) + getFinalNumAssignments()) * 59) + getInstancesAdded()) * 59) + getInstancesRemoved()) * 59) + getInstancesUpdated()) * 59) + (isAlive() ? 79 : 97);
            }

            public String toString() {
                return "SchedulerManager.SchedulerStats.WorkerStats(originalNumAssignments=" + getOriginalNumAssignments() + ", finalNumAssignments=" + getFinalNumAssignments() + ", instancesAdded=" + getInstancesAdded() + ", instancesRemoved=" + getInstancesRemoved() + ", instancesUpdated=" + getInstancesUpdated() + ", alive=" + isAlive() + DefaultExpressionEngine.DEFAULT_INDEX_END;
            }

            static /* synthetic */ int access$008(WorkerStats workerStats) {
                int i = workerStats.instancesRemoved;
                workerStats.instancesRemoved = i + 1;
                return i;
            }

            static /* synthetic */ int access$110(WorkerStats workerStats) {
                int i = workerStats.finalNumAssignments;
                workerStats.finalNumAssignments = i - 1;
                return i;
            }

            static /* synthetic */ int access$208(WorkerStats workerStats) {
                int i = workerStats.instancesAdded;
                workerStats.instancesAdded = i + 1;
                return i;
            }

            static /* synthetic */ int access$108(WorkerStats workerStats) {
                int i = workerStats.finalNumAssignments;
                workerStats.finalNumAssignments = i + 1;
                return i;
            }

            static /* synthetic */ int access$308(WorkerStats workerStats) {
                int i = workerStats.instancesUpdated;
                workerStats.instancesUpdated = i + 1;
                return i;
            }
        }

        public SchedulerStats(Map<String, Map<String, Function.Assignment>> map, Set<String> set) {
            for (String str : set) {
                WorkerStats.WorkerStatsBuilder alive = WorkerStats.builder().alive(true);
                Map<String, Function.Assignment> map2 = map.get(str);
                if (map2 != null) {
                    alive.originalNumAssignments(map2.size());
                    alive.finalNumAssignments(map2.size());
                    Iterator<String> it = map2.keySet().iterator();
                    while (it.hasNext()) {
                        this.instanceToWorkerId.put(it.next(), str);
                    }
                } else {
                    alive.originalNumAssignments(0);
                    alive.finalNumAssignments(0);
                }
                this.workerStatsMap.put(str, alive.build());
            }
            for (Map.Entry<String, Map<String, Function.Assignment>> entry : map.entrySet()) {
                String key = entry.getKey();
                Map<String, Function.Assignment> value = entry.getValue();
                if (!set.contains(key)) {
                    this.workerStatsMap.put(key, WorkerStats.builder().alive(false).originalNumAssignments(value.size()).finalNumAssignments(value.size()).build());
                }
            }
        }

        public void removedAssignment(Function.Assignment assignment) {
            WorkerStats workerStats = this.workerStatsMap.get(assignment.getWorkerId());
            Preconditions.checkNotNull(workerStats);
            WorkerStats.access$008(workerStats);
            WorkerStats.access$110(workerStats);
        }

        public void newAssignment(Function.Assignment assignment) {
            String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
            String workerId = assignment.getWorkerId();
            String str = this.instanceToWorkerId.get(fullyQualifiedInstanceId);
            if (str != null) {
                WorkerStats workerStats = this.workerStatsMap.get(str);
                Preconditions.checkNotNull(workerStats);
                WorkerStats.access$008(workerStats);
                WorkerStats.access$110(workerStats);
            }
            WorkerStats workerStats2 = this.workerStatsMap.get(workerId);
            Preconditions.checkNotNull(workerStats2);
            WorkerStats.access$208(workerStats2);
            WorkerStats.access$108(workerStats2);
        }

        public void updatedAssignment(Function.Assignment assignment) {
            WorkerStats workerStats = this.workerStatsMap.get(assignment.getWorkerId());
            Preconditions.checkNotNull(workerStats);
            WorkerStats.access$308(workerStats);
        }

        public String getSummary() {
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            Iterator<Map.Entry<String, WorkerStats>> it = this.workerStatsMap.entrySet().iterator();
            while (it.hasNext()) {
                WorkerStats value = it.next().getValue();
                i += value.instancesAdded;
                i2 += value.instancesUpdated;
                i3 += value.instancesRemoved;
            }
            return String.format("{\"Added\": %d, \"Updated\": %d, \"removed\": %d}", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
        }

        public String toString() {
            try {
                return ObjectMapperFactory.getThreadLocal().writerWithDefaultPrettyPrinter().writeValueAsString(this.workerStatsMap);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager$TooFewWorkersException.class */
    public static class TooFewWorkersException extends RuntimeException {
    }

    /* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager$UnknownWorkerException.class */
    public static class UnknownWorkerException extends RuntimeException {
    }

    /* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager$WorkerNotRemovedAfterPriorDrainException.class */
    public static class WorkerNotRemovedAfterPriorDrainException extends RuntimeException {
    }

    public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, WorkerStatsManager workerStatsManager, ErrorNotifier errorNotifier) {
        this.workerConfig = workerConfig;
        this.pulsarClient = pulsarClient;
        this.admin = pulsarAdmin;
        this.scheduler = (IScheduler) Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class, Thread.currentThread().getContextClassLoader());
        this.workerStatsManager = workerStatsManager;
        this.errorNotifier = errorNotifier;
    }

    public Producer<byte[]> acquireExclusiveWrite(Supplier<Boolean> supplier) throws WorkerUtils.NotLeaderAnymore {
        return WorkerUtils.createExclusiveProducerWithRetry(this.pulsarClient, this.workerConfig.getFunctionAssignmentTopic(), this.workerConfig.getWorkerId() + "-scheduler-manager", supplier, Http2CodecUtil.DEFAULT_MAX_QUEUED_CONTROL_FRAMES);
    }

    public synchronized void initialize(Producer<byte[]> producer) {
        if (this.isRunning) {
            log.error("Scheduler Manager entered invalid state");
            this.errorNotifier.triggerError(new IllegalStateException());
            return;
        }
        log.info("Initializing scheduler manager");
        this.exclusiveProducer = producer;
        this.executorService = new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(5));
        this.executorService.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("worker-scheduler-%d").build());
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-assignment-topic-compactor"));
        if (this.workerConfig.getTopicCompactionFrequencySec() > 0) {
            scheduleCompaction(this.scheduledExecutorService, this.workerConfig.getTopicCompactionFrequencySec());
        }
        this.isRunning = true;
        this.lastMessageProduced = null;
    }

    private Future<?> scheduleInternal(Runnable runnable, String str) {
        if (!this.leaderService.isLeader()) {
            return CompletableFuture.completedFuture(null);
        }
        try {
            return this.executorService.submit(() -> {
                this.schedulerLock.lock();
                try {
                    if (this.leaderService.isLeader()) {
                        try {
                            runnable.run();
                        } catch (Throwable th) {
                            log.error("Encountered error when invoking scheduler [{}]", str);
                            this.errorNotifier.triggerError(th);
                        }
                    }
                } finally {
                    this.schedulerLock.unlock();
                }
            });
        } catch (RejectedExecutionException e) {
            log.debug("Rejected task to invoke scheduler since task queue is already full");
            return CompletableFuture.completedFuture(null);
        }
    }

    public Future<?> schedule() {
        return scheduleInternal(() -> {
            this.workerStatsManager.scheduleTotalExecTimeStart();
            invokeScheduler();
            this.workerStatsManager.scheduleTotalExecTimeEnd();
        }, "Encountered error when invoking scheduler");
    }

    private Future<?> rebalance() {
        return scheduleInternal(() -> {
            this.workerStatsManager.rebalanceTotalExecTimeStart();
            invokeRebalance();
            this.workerStatsManager.rebalanceTotalExecTimeEnd();
        }, "Encountered error when invoking rebalance");
    }

    public Future<?> rebalanceIfNotInprogress() {
        if (!this.rebalanceInProgress.compareAndSet(false, true)) {
            throw new RebalanceInProgressException();
        }
        if (getCurrentAvailableNumWorkers() <= 1) {
            this.rebalanceInProgress.set(false);
            throw new TooFewWorkersException();
        }
        this.currentRebalanceFuture = rebalance();
        return this.currentRebalanceFuture;
    }

    private Future<?> drain(String str) {
        return scheduleInternal(() -> {
            this.workerStatsManager.drainTotalExecTimeStart();
            this.assignmentsMovedInLastDrain = invokeDrain(str);
            this.workerStatsManager.drainTotalExecTimeEnd();
        }, "Encountered error when invoking drain");
    }

    public Future<?> drainIfNotInProgress(String str) {
        if (!this.drainInProgressFlag.compareAndSet(false, true)) {
            throw new DrainInProgressException();
        }
        try {
            Set<String> currentAvailableWorkers = getCurrentAvailableWorkers();
            if (currentAvailableWorkers.size() <= 1) {
                throw new TooFewWorkersException();
            }
            Preconditions.checkNotNull(str);
            if (this.drainOpStatusMap.containsKey(str)) {
                log.warn("Worker " + str + " was not removed yet from SchedulerManager after previous drain op");
                throw new WorkerNotRemovedAfterPriorDrainException();
            }
            if (!currentAvailableWorkers.contains(str)) {
                log.info("invokeDrain was called for a worker={} which is not currently active", str);
                throw new UnknownWorkerException();
            }
            this.currentDrainFuture = drain(str);
            Future<?> future = this.currentDrainFuture;
            this.drainInProgressFlag.set(false);
            return future;
        } catch (Throwable th) {
            this.drainInProgressFlag.set(false);
            throw th;
        }
    }

    public LongRunningProcessStatus getDrainStatus(String str) {
        long nanoTime = System.nanoTime();
        LongRunningProcessStatus longRunningProcessStatus = new LongRunningProcessStatus();
        try {
            DrainOpStatus drainOpStatus = this.drainOpStatusMap.get(str);
            if (drainOpStatus != null) {
                switch (drainOpStatus) {
                    case DrainCompleted:
                        longRunningProcessStatus = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
                        break;
                    case DrainInProgress:
                        longRunningProcessStatus = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
                        break;
                    case DrainNotInProgress:
                        longRunningProcessStatus = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
                        break;
                    default:
                        longRunningProcessStatus = LongRunningProcessStatus.forError("getDrainStatus: Unexpected status " + drainOpStatus + " found for worker " + str);
                        break;
                }
            } else {
                longRunningProcessStatus = LongRunningProcessStatus.forError("Worker " + str + " not found in drain records");
            }
            log.info("Get drain status for worker {} - execution time: {} sec; returning status={}, error={}", new Object[]{str, Double.valueOf((System.nanoTime() - nanoTime) / Math.pow(10.0d, 9.0d)), longRunningProcessStatus.status, longRunningProcessStatus.lastError});
            return longRunningProcessStatus;
        } catch (Throwable th) {
            log.info("Get drain status for worker {} - execution time: {} sec; returning status={}, error={}", new Object[]{str, Double.valueOf((System.nanoTime() - nanoTime) / Math.pow(10.0d, 9.0d)), longRunningProcessStatus.status, longRunningProcessStatus.lastError});
            return longRunningProcessStatus;
        }
    }

    @VisibleForTesting
    void clearDrainOpsStatus() {
        this.drainOpStatusMap.clear();
        log.warn("Cleared drain op status map");
    }

    @VisibleForTesting
    void setDrainOpsStatus(String str, DrainOpStatus drainOpStatus) {
        this.drainOpStatusMap.put(str, drainOpStatus);
        log.warn("setDrainOpsStatus: updated drain status of worker {} to {}", str, drainOpStatus);
    }

    @VisibleForTesting
    ConcurrentHashMap<String, DrainOpStatus> getDrainOpsStatusMap() {
        return new ConcurrentHashMap<>(this.drainOpStatusMap);
    }

    private synchronized int getCurrentAvailableNumWorkers() {
        return getCurrentAvailableWorkers().size();
    }

    private synchronized Set<String> getCurrentAvailableWorkers() {
        Set<String> set = (Set) this.membershipManager.getCurrentMembership().stream().map(workerInfo -> {
            return workerInfo.getWorkerId();
        }).collect(Collectors.toSet());
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            if (this.drainOpStatusMap.containsKey(it.next())) {
                it.remove();
            }
        }
        return set;
    }

    void invokeScheduler() {
        long nanoTime = System.nanoTime();
        Set<String> currentAvailableWorkers = getCurrentAvailableWorkers();
        Map<String, Function.Instance> computeAllInstances = computeAllInstances(this.functionMetaDataManager.getAllFunctionMetaData(), this.functionRuntimeManager.getRuntimeFactory().externallyManaged());
        Map<String, Map<String, Function.Assignment>> currentAssignments = this.functionRuntimeManager.getCurrentAssignments();
        SchedulerStats schedulerStats = new SchedulerStats(currentAssignments, currentAvailableWorkers);
        Iterator<Map.Entry<String, Map<String, Function.Assignment>>> it = currentAssignments.entrySet().iterator();
        while (it.hasNext()) {
            Map<String, Function.Assignment> value = it.next().getValue();
            value.entrySet().removeIf(entry -> {
                String str = (String) entry.getKey();
                boolean z = !computeAllInstances.containsKey(str);
                if (z) {
                    Function.Assignment assignment = (Function.Assignment) entry.getValue();
                    MessageId publishNewAssignment = publishNewAssignment(assignment.toBuilder().build(), true);
                    log.info("Deleting assignment: {}", assignment);
                    this.functionRuntimeManager.deleteAssignment(str);
                    this.lastMessageProduced = publishNewAssignment;
                    schedulerStats.removedAssignment(assignment);
                }
                return z;
            });
            for (Map.Entry<String, Function.Assignment> entry2 : value.entrySet()) {
                String key = entry2.getKey();
                Function.Assignment value2 = entry2.getValue();
                Function.Instance instance = computeAllInstances.get(key);
                if (!value2.getInstance().equals(instance)) {
                    value.put(key, value2.toBuilder().setInstance(instance).build());
                    Function.Assignment build = value2.toBuilder().setInstance(instance).build().toBuilder().build();
                    MessageId publishNewAssignment = publishNewAssignment(build, false);
                    log.info("Updating assignment: {}", build);
                    this.functionRuntimeManager.processAssignment(build);
                    this.lastMessageProduced = publishNewAssignment;
                    schedulerStats.updatedAssignment(build);
                }
                if (value.isEmpty()) {
                    it.remove();
                }
            }
        }
        List<Function.Assignment> list = (List) currentAssignments.entrySet().stream().filter(entry3 -> {
            return currentAvailableWorkers.contains((String) entry3.getKey());
        }).flatMap(entry4 -> {
            return ((Map) entry4.getValue()).values().stream();
        }).collect(Collectors.toList());
        Pair<List<Function.Instance>, List<Function.Assignment>> unassignedFunctionInstances = getUnassignedFunctionInstances(currentAssignments, computeAllInstances);
        this.workerStatsManager.scheduleStrategyExecTimeStartStart();
        List<Function.Assignment> schedule = this.scheduler.schedule(unassignedFunctionInstances.getLeft(), list, currentAvailableWorkers);
        this.workerStatsManager.scheduleStrategyExecTimeStartEnd();
        schedule.addAll(unassignedFunctionInstances.getRight());
        if (log.isDebugEnabled()) {
            log.debug("New assignments computed: {}", schedule);
        }
        this.isCompactionNeeded.set(!schedule.isEmpty());
        for (Function.Assignment assignment : schedule) {
            MessageId publishNewAssignment2 = publishNewAssignment(assignment, false);
            log.info("Adding assignment: {}", assignment);
            this.functionRuntimeManager.processAssignment(assignment);
            this.lastMessageProduced = publishNewAssignment2;
            schedulerStats.newAssignment(assignment);
        }
        log.info("Schedule summary - execution time: {} sec | total unassigned: {} | stats: {}\n{}", new Object[]{Double.valueOf((System.nanoTime() - nanoTime) / Math.pow(10.0d, 9.0d)), Integer.valueOf(unassignedFunctionInstances.getLeft().size()), schedulerStats.getSummary(), schedulerStats});
    }

    private void invokeRebalance() {
        long nanoTime = System.nanoTime();
        Set<String> currentAvailableWorkers = getCurrentAvailableWorkers();
        Map<String, Map<String, Function.Assignment>> currentAssignments = this.functionRuntimeManager.getCurrentAssignments();
        SchedulerStats schedulerStats = new SchedulerStats(currentAssignments, currentAvailableWorkers);
        List<Function.Assignment> list = (List) currentAssignments.entrySet().stream().filter(entry -> {
            return currentAvailableWorkers.contains((String) entry.getKey());
        }).flatMap(entry2 -> {
            return ((Map) entry2.getValue()).values().stream();
        }).collect(Collectors.toList());
        this.workerStatsManager.rebalanceStrategyExecTimeStart();
        List<Function.Assignment> rebalance = this.scheduler.rebalance(list, currentAvailableWorkers);
        this.workerStatsManager.rebalanceStrategyExecTimeEnd();
        for (Function.Assignment assignment : rebalance) {
            MessageId publishNewAssignment = publishNewAssignment(assignment, false);
            log.info("Rebalance - new assignment: {}", assignment);
            this.functionRuntimeManager.processAssignment(assignment);
            this.lastMessageProduced = publishNewAssignment;
            schedulerStats.newAssignment(assignment);
        }
        log.info("Rebalance summary - execution time: {} sec | stats: {}\n{}", new Object[]{Double.valueOf((System.nanoTime() - nanoTime) / Math.pow(10.0d, 9.0d)), schedulerStats.getSummary(), schedulerStats});
        this.rebalanceInProgress.set(false);
    }

    private void scheduleCompaction(ScheduledExecutorService scheduledExecutorService, long j) {
        if (scheduledExecutorService != null) {
            scheduledExecutorService.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(() -> {
                if (this.leaderService.isLeader() && this.isCompactionNeeded.get()) {
                    compactAssignmentTopic();
                    this.isCompactionNeeded.set(false);
                }
            }), j, j, TimeUnit.SECONDS);
            scheduledExecutorService.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(() -> {
                if (!this.leaderService.isLeader() || this.metadataTopicLastMessage.compareTo(this.functionMetaDataManager.getLastMessageSeen()) == 0) {
                    return;
                }
                this.metadataTopicLastMessage = this.functionMetaDataManager.getLastMessageSeen();
                compactFunctionMetadataTopic();
            }), j, j, TimeUnit.SECONDS);
        }
    }

    @VisibleForTesting
    List<Function.Assignment> getAssignmentsMovedInLastDrain() {
        return this.assignmentsMovedInLastDrain;
    }

    @VisibleForTesting
    void clearAssignmentsMovedInLastDrain() {
        this.assignmentsMovedInLastDrain = null;
    }

    List<Function.Assignment> invokeDrain(String str) {
        long nanoTime = System.nanoTime();
        Set<String> currentAvailableWorkers = getCurrentAvailableWorkers();
        Map<String, Map<String, Function.Assignment>> currentAssignments = this.functionRuntimeManager.getCurrentAssignments();
        SchedulerStats schedulerStats = new SchedulerStats(currentAssignments, currentAvailableWorkers);
        boolean z = false;
        List<Function.Assignment> list = null;
        try {
            this.drainOpStatusMap.put(str, DrainOpStatus.DrainInProgress);
            currentAvailableWorkers.remove(str);
            Map<String, Function.Instance> computeAllInstances = computeAllInstances(this.functionMetaDataManager.getAllFunctionMetaData(), this.functionRuntimeManager.getRuntimeFactory().externallyManaged());
            HashMap hashMap = new HashMap();
            List<Function.Assignment> list2 = (List) currentAssignments.entrySet().stream().filter(entry -> {
                if (((String) entry.getKey()).compareTo(str) == 0) {
                    return false;
                }
                hashMap.put((String) entry.getKey(), (Map) entry.getValue());
                return true;
            }).flatMap(entry2 -> {
                return ((Map) entry2.getValue()).values().stream();
            }).collect(Collectors.toList());
            Pair<List<Function.Instance>, List<Function.Assignment>> unassignedFunctionInstances = getUnassignedFunctionInstances(hashMap, computeAllInstances);
            this.workerStatsManager.drainTotalExecTimeStart();
            try {
                list = this.scheduler.schedule(unassignedFunctionInstances.getLeft(), list2, currentAvailableWorkers);
            } catch (Exception e) {
                log.info("invokeDrain: Got exception from schedule: ", e);
            }
            this.workerStatsManager.drainTotalExecTimeEnd();
            if (list != null) {
                for (Function.Assignment assignment : list) {
                    MessageId publishNewAssignment = publishNewAssignment(assignment, false);
                    this.functionRuntimeManager.processAssignment(assignment);
                    this.lastMessageProduced = publishNewAssignment;
                    schedulerStats.newAssignment(assignment);
                }
            }
            this.drainOpStatusMap.put(str, DrainOpStatus.DrainCompleted);
            z = true;
            Logger logger = log;
            Object[] objArr = new Object[5];
            objArr[0] = str;
            objArr[1] = 1 != 0 ? "" : "un";
            objArr[2] = Double.valueOf((System.nanoTime() - nanoTime) / Math.pow(10.0d, 9.0d));
            objArr[3] = schedulerStats.getSummary();
            objArr[4] = schedulerStats;
            logger.info("Draining worker {} was {}successful; summary [] - execution time: {} sec | stats: {}\n{}", objArr);
            return list;
        } catch (Throwable th) {
            Logger logger2 = log;
            Object[] objArr2 = new Object[5];
            objArr2[0] = str;
            objArr2[1] = z ? "" : "un";
            objArr2[2] = Double.valueOf((System.nanoTime() - nanoTime) / Math.pow(10.0d, 9.0d));
            objArr2[3] = schedulerStats.getSummary();
            objArr2[4] = schedulerStats;
            logger2.info("Draining worker {} was {}successful; summary [] - execution time: {} sec | stats: {}\n{}", objArr2);
            return list;
        }
    }

    private void compactAssignmentTopic() {
        if (this.admin != null) {
            try {
                this.admin.topics().triggerCompaction(this.workerConfig.getFunctionAssignmentTopic());
            } catch (PulsarAdminException e) {
                log.error("Failed to trigger compaction", e);
                this.scheduledExecutorService.schedule(() -> {
                    compactAssignmentTopic();
                }, DEFAULT_ADMIN_API_BACKOFF_SEC, TimeUnit.SECONDS);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized int updateWorkerDrainMap() {
        long nanoTime = System.nanoTime();
        int i = 0;
        if (this.drainOpStatusMap.size() > 0) {
            Set set = (Set) this.membershipManager.getCurrentMembership().stream().map(workerInfo -> {
                return workerInfo.getWorkerId();
            }).collect(Collectors.toSet());
            ArrayList arrayList = new ArrayList();
            Iterator it = this.drainOpStatusMap.keySet().iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                if (!set.contains(str)) {
                    arrayList.add(str);
                }
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                this.drainOpStatusMap.remove((String) it2.next());
            }
            i = arrayList.size();
        }
        if (i > 0) {
            log.info("cleanupWorkerDrainMap removed {} stale workerIds in {} sec", Integer.valueOf(i), Double.valueOf((System.nanoTime() - nanoTime) / Math.pow(10.0d, 9.0d)));
        }
        return i;
    }

    private void compactFunctionMetadataTopic() {
        if (this.admin != null) {
            try {
                this.admin.topics().triggerCompaction(this.workerConfig.getFunctionMetadataTopic());
            } catch (PulsarAdminException e) {
                log.error("Failed to trigger compaction", e);
                this.scheduledExecutorService.schedule(() -> {
                    compactFunctionMetadataTopic();
                }, DEFAULT_ADMIN_API_BACKOFF_SEC, TimeUnit.SECONDS);
            }
        }
    }

    private MessageId publishNewAssignment(Function.Assignment assignment, boolean z) {
        try {
            return this.exclusiveProducer.newMessage().key(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance())).value(z ? "".getBytes() : assignment.toByteArray()).send();
        } catch (Exception e) {
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = assignment;
            objArr[1] = z ? "send" : "deleted";
            objArr[2] = e;
            logger.error("Failed to {} assignment update {}", objArr);
            throw new RuntimeException(e);
        }
    }

    private static Map<String, Function.Instance> computeAllInstances(List<Function.FunctionMetaData> list, boolean z) {
        HashMap hashMap = new HashMap();
        Iterator<Function.FunctionMetaData> it = list.iterator();
        while (it.hasNext()) {
            for (Function.Instance instance : computeInstances(it.next(), z)) {
                hashMap.put(FunctionCommon.getFullyQualifiedInstanceId(instance), instance);
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static List<Function.Instance> computeInstances(Function.FunctionMetaData functionMetaData, boolean z) {
        LinkedList linkedList = new LinkedList();
        if (z) {
            linkedList.add(Function.Instance.newBuilder().setFunctionMetaData(functionMetaData).setInstanceId(-1).build());
        } else {
            int parallelism = functionMetaData.getFunctionDetails().getParallelism();
            for (int i = 0; i < parallelism; i++) {
                linkedList.add(Function.Instance.newBuilder().setFunctionMetaData(functionMetaData).setInstanceId(i).build());
            }
        }
        return linkedList;
    }

    private Pair<List<Function.Instance>, List<Function.Assignment>> getUnassignedFunctionInstances(Map<String, Map<String, Function.Assignment>> map, Map<String, Function.Instance> map2) {
        LinkedList linkedList = new LinkedList();
        ArrayList newArrayList = Lists.newArrayList();
        HashMap hashMap = new HashMap();
        if (map != null) {
            Iterator<Map<String, Function.Assignment>> it = map.values().iterator();
            while (it.hasNext()) {
                hashMap.putAll(it.next());
            }
        }
        for (Map.Entry<String, Function.Instance> entry : map2.entrySet()) {
            String key = entry.getKey();
            Function.Instance value = entry.getValue();
            String checkHeartBeatFunction = checkHeartBeatFunction(value);
            if (checkHeartBeatFunction != null) {
                newArrayList.add(Function.Assignment.newBuilder().setInstance(value).setWorkerId(checkHeartBeatFunction).build());
            } else if (!hashMap.containsKey(key)) {
                linkedList.add(value);
            }
        }
        return ImmutablePair.of(linkedList, newArrayList);
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        log.info("Closing scheduler manager");
        this.schedulerLock.lock();
        try {
            this.isRunning = false;
            if (this.scheduledExecutorService != null) {
                this.scheduledExecutorService.shutdown();
            }
            if (this.executorService != null) {
                this.executorService.shutdown();
            }
            if (this.exclusiveProducer != null) {
                try {
                    this.exclusiveProducer.close();
                } catch (PulsarClientException e) {
                    log.warn("Failed to shutdown scheduler manager assignment producer", e);
                }
            }
        } finally {
            this.schedulerLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String checkHeartBeatFunction(Function.Instance instance) {
        if (instance.getFunctionMetaData() == null || instance.getFunctionMetaData().getFunctionDetails() == null) {
            return null;
        }
        Function.FunctionDetails functionDetails = instance.getFunctionMetaData().getFunctionDetails();
        if (HEARTBEAT_TENANT.equals(functionDetails.getTenant()) && HEARTBEAT_NAMESPACE.equals(functionDetails.getNamespace())) {
            return functionDetails.getName();
        }
        return null;
    }

    public void setFunctionMetaDataManager(FunctionMetaDataManager functionMetaDataManager) {
        this.functionMetaDataManager = functionMetaDataManager;
    }

    public void setLeaderService(LeaderService leaderService) {
        this.leaderService = leaderService;
    }

    public void setMembershipManager(MembershipManager membershipManager) {
        this.membershipManager = membershipManager;
    }

    public void setFunctionRuntimeManager(FunctionRuntimeManager functionRuntimeManager) {
        this.functionRuntimeManager = functionRuntimeManager;
    }

    public Lock getSchedulerLock() {
        return this.schedulerLock;
    }

    public MessageId getLastMessageProduced() {
        return this.lastMessageProduced;
    }
}
