/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.distributed.ConnectAssignor;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

public class IncrementalCooperativeAssignor
implements ConnectAssignor {
    private final Logger log;
    private final Time time;
    private final int maxDelay;
    private WorkerCoordinator.ConnectorsAndTasks previousAssignment;
    private final WorkerCoordinator.ConnectorsAndTasks previousRevocation;
    private boolean revokedInPrevious;
    protected final Set<String> candidateWorkersForReassignment;
    protected long scheduledRebalance;
    protected int delay;
    protected int previousGenerationId;
    protected Set<String> previousMembers;
    private final ExponentialBackoff consecutiveRevokingRebalancesBackoff;
    private int numSuccessiveRevokingRebalances;

    public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
        this.log = logContext.logger(IncrementalCooperativeAssignor.class);
        this.time = time;
        this.maxDelay = maxDelay;
        this.previousAssignment = WorkerCoordinator.ConnectorsAndTasks.EMPTY;
        this.previousRevocation = new WorkerCoordinator.ConnectorsAndTasks.Builder().build();
        this.scheduledRebalance = 0L;
        this.revokedInPrevious = false;
        this.candidateWorkersForReassignment = new LinkedHashSet<String>();
        this.delay = 0;
        this.previousGenerationId = -1;
        this.previousMembers = Collections.emptySet();
        this.numSuccessiveRevokingRebalances = 0;
        this.consecutiveRevokingRebalancesBackoff = new ExponentialBackoff(maxDelay == 0 ? 0L : 1L, 40, maxDelay, 0.0);
    }

    @Override
    public Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata, WorkerCoordinator coordinator) {
        this.log.debug("Performing task assignment");
        HashMap<String, ExtendedWorkerState> memberConfigs = new HashMap<String, ExtendedWorkerState>();
        for (JoinGroupResponseData.JoinGroupResponseMember member : allMemberMetadata) {
            memberConfigs.put(member.memberId(), IncrementalCooperativeConnectProtocol.deserializeMetadata(ByteBuffer.wrap(member.metadata())));
        }
        this.log.debug("Member configs: {}", (Object)memberConfigs);
        long maxOffset = memberConfigs.values().stream().map(ConnectProtocol.WorkerState::offset).max(Long::compare).get();
        this.log.debug("Max config offset root: {}, local snapshot config offsets root: {}", (Object)maxOffset, (Object)coordinator.configSnapshot().offset());
        short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();
        Long leaderOffset = this.ensureLeaderConfig(maxOffset, coordinator);
        if (leaderOffset == null) {
            Map<String, ExtendedAssignment> assignments = this.fillAssignments(memberConfigs.keySet(), (short)1, leaderId, ((ExtendedWorkerState)memberConfigs.get(leaderId)).url(), maxOffset, ClusterAssignment.EMPTY, 0, protocolVersion);
            return this.serializeAssignments(assignments, protocolVersion);
        }
        return this.performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion);
    }

    private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
        if (coordinator.configSnapshot().offset() < maxOffset) {
            ClusterConfigState updatedSnapshot = coordinator.configFreshSnapshot();
            if (updatedSnapshot.offset() < maxOffset) {
                this.log.info("Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync.");
                return null;
            }
            coordinator.configSnapshot(updatedSnapshot);
            return updatedSnapshot.offset();
        }
        return maxOffset;
    }

    protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ExtendedWorkerState> memberConfigs, WorkerCoordinator coordinator, short protocolVersion) {
        this.log.debug("Performing task assignment during generation: {} with memberId: {}", (Object)coordinator.generationId(), (Object)coordinator.memberId());
        Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments = ConnectUtils.transformValues(memberConfigs, memberConfig -> new WorkerCoordinator.ConnectorsAndTasks.Builder().with(memberConfig.assignment().connectors(), memberConfig.assignment().tasks()).build());
        ClusterAssignment clusterAssignment = this.performTaskAssignment(coordinator.configSnapshot(), coordinator.lastCompletedGenerationId(), coordinator.generationId(), memberAssignments);
        coordinator.leaderState(new WorkerCoordinator.LeaderState(memberConfigs, clusterAssignment.allAssignedConnectors(), clusterAssignment.allAssignedTasks()));
        Map<String, ExtendedAssignment> assignments = this.fillAssignments(memberConfigs.keySet(), (short)0, leaderId, memberConfigs.get(leaderId).url(), maxOffset, clusterAssignment, this.delay, protocolVersion);
        this.log.debug("Actual assignments: {}", (Object)assignments);
        return this.serializeAssignments(assignments, protocolVersion);
    }

    ClusterAssignment performTaskAssignment(ClusterConfigState configSnapshot, int lastCompletedGenerationId, int currentGenerationId, Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments) {
        this.log.debug("Previous assignments: {}", (Object)this.previousAssignment);
        if (this.previousGenerationId != lastCompletedGenerationId) {
            this.log.debug("Clearing the view of previous assignments due to generation mismatch between previous generation ID {} and last completed generation ID {}. This can happen if the leader fails to sync the assignment within a rebalancing round. The following view of previous assignments might be outdated and will be ignored by the leader in the current computation of new assignments. Possibly outdated previous assignments: {}", this.previousGenerationId, lastCompletedGenerationId, this.previousAssignment);
            this.previousAssignment = WorkerCoordinator.ConnectorsAndTasks.EMPTY;
        }
        TreeSet<String> configuredConnectors = new TreeSet<String>(configSnapshot.connectors());
        Set<ConnectorTaskId> configuredTasks = ConnectUtils.combineCollections(configuredConnectors, configSnapshot::tasks, Collectors.toSet());
        WorkerCoordinator.ConnectorsAndTasks configured = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(configuredConnectors, configuredTasks).build();
        this.log.debug("Configured assignments: {}", (Object)configured);
        WorkerCoordinator.ConnectorsAndTasks activeAssignments = this.assignment(memberAssignments);
        this.log.debug("Active assignments: {}", (Object)activeAssignments);
        if (!this.previousRevocation.isEmpty()) {
            if (this.previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c)) || this.previousRevocation.tasks().stream().anyMatch(t2 -> activeAssignments.tasks().contains(t2))) {
                this.previousAssignment = activeAssignments;
            }
            this.previousRevocation.connectors().clear();
            this.previousRevocation.tasks().clear();
        }
        WorkerCoordinator.ConnectorsAndTasks deleted = IncrementalCooperativeAssignor.diff(this.previousAssignment, configured);
        this.log.debug("Deleted assignments: {}", (Object)deleted);
        WorkerCoordinator.ConnectorsAndTasks duplicated = this.duplicatedAssignments(memberAssignments);
        this.log.trace("Duplicated assignments: {}", (Object)duplicated);
        WorkerCoordinator.ConnectorsAndTasks lostAssignments = IncrementalCooperativeAssignor.diff(this.previousAssignment, activeAssignments, deleted);
        this.log.debug("Lost assignments: {}", (Object)lostAssignments);
        WorkerCoordinator.ConnectorsAndTasks created = IncrementalCooperativeAssignor.diff(configured, this.previousAssignment, activeAssignments);
        this.log.debug("Created: {}", (Object)created);
        List<WorkerCoordinator.WorkerLoad> currentWorkerAssignment = IncrementalCooperativeAssignor.workerAssignment(memberAssignments, deleted);
        HashMap<String, WorkerCoordinator.ConnectorsAndTasks.Builder> toRevoke = new HashMap<String, WorkerCoordinator.ConnectorsAndTasks.Builder>();
        Map<String, WorkerCoordinator.ConnectorsAndTasks> deletedToRevoke = IncrementalCooperativeAssignor.intersection(deleted, memberAssignments);
        this.log.debug("Deleted connectors and tasks to revoke from each worker: {}", (Object)deletedToRevoke);
        IncrementalCooperativeAssignor.addAll(toRevoke, deletedToRevoke);
        Map<String, WorkerCoordinator.ConnectorsAndTasks> duplicatedToRevoke = IncrementalCooperativeAssignor.intersection(duplicated, memberAssignments);
        this.log.debug("Duplicated connectors and tasks to revoke from each worker: {}", (Object)duplicatedToRevoke);
        IncrementalCooperativeAssignor.addAll(toRevoke, duplicatedToRevoke);
        List<WorkerCoordinator.WorkerLoad> nextWorkerAssignment = IncrementalCooperativeAssignor.workerLoads(memberAssignments);
        IncrementalCooperativeAssignor.removeAll(nextWorkerAssignment, deletedToRevoke);
        IncrementalCooperativeAssignor.removeAll(nextWorkerAssignment, duplicatedToRevoke);
        WorkerCoordinator.ConnectorsAndTasks.Builder lostAssignmentsToReassignBuilder = new WorkerCoordinator.ConnectorsAndTasks.Builder();
        this.handleLostAssignments(lostAssignments, lostAssignmentsToReassignBuilder, nextWorkerAssignment);
        WorkerCoordinator.ConnectorsAndTasks lostAssignmentsToReassign = lostAssignmentsToReassignBuilder.build();
        if (this.delay == 0) {
            Map<String, WorkerCoordinator.ConnectorsAndTasks> loadBalancingRevocations = this.performLoadBalancingRevocations(configured, nextWorkerAssignment);
            if (this.revokedInPrevious && !loadBalancingRevocations.isEmpty()) {
                ++this.numSuccessiveRevokingRebalances;
                this.log.debug("Consecutive revoking rebalances observed. Computing delay and next scheduled rebalance.");
                this.delay = (int)this.consecutiveRevokingRebalancesBackoff.backoff(this.numSuccessiveRevokingRebalances);
                if (this.delay != 0) {
                    this.scheduledRebalance = this.time.milliseconds() + (long)this.delay;
                    this.log.debug("Skipping revocations in the current round with a delay of {}ms. Next scheduled rebalance:{}", (Object)this.delay, (Object)this.scheduledRebalance);
                } else {
                    this.log.debug("Revoking assignments immediately since scheduled.rebalance.max.delay.ms is set to 0");
                    IncrementalCooperativeAssignor.addAll(toRevoke, loadBalancingRevocations);
                    IncrementalCooperativeAssignor.removeAll(nextWorkerAssignment, loadBalancingRevocations);
                }
            } else if (!loadBalancingRevocations.isEmpty()) {
                this.log.debug("Performing allocation-balancing revocation immediately as no revocations took place during the previous rebalance");
                IncrementalCooperativeAssignor.addAll(toRevoke, loadBalancingRevocations);
                IncrementalCooperativeAssignor.removeAll(nextWorkerAssignment, loadBalancingRevocations);
                this.revokedInPrevious = true;
            } else if (this.revokedInPrevious) {
                this.log.debug("Previous round had revocations but this round didn't. Probably, the cluster has reached a balanced load. Resetting the exponential backoff clock");
                this.revokedInPrevious = false;
                this.numSuccessiveRevokingRebalances = 0;
            } else {
                this.log.debug("No revocations in previous and current round.");
            }
        } else {
            this.log.debug("Delayed rebalance is active. Delaying {}ms before revoking connectors and tasks: {}", (Object)this.delay, (Object)toRevoke);
            this.revokedInPrevious = false;
        }
        WorkerCoordinator.ConnectorsAndTasks toAssign = new WorkerCoordinator.ConnectorsAndTasks.Builder().addAll(created).addAll(lostAssignmentsToReassign).build();
        this.assignConnectors(nextWorkerAssignment, toAssign.connectors());
        this.assignTasks(nextWorkerAssignment, toAssign.tasks());
        Map nextConnectorAssignments = nextWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, WorkerCoordinator.WorkerLoad::connectors));
        Map nextTaskAssignments = nextWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, WorkerCoordinator.WorkerLoad::tasks));
        Map currentConnectorAssignments = currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, WorkerCoordinator.WorkerLoad::connectors));
        Map currentTaskAssignments = currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, WorkerCoordinator.WorkerLoad::tasks));
        Map<String, Collection<String>> incrementalConnectorAssignments = IncrementalCooperativeAssignor.diff(nextConnectorAssignments, currentConnectorAssignments);
        Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments = IncrementalCooperativeAssignor.diff(nextTaskAssignments, currentTaskAssignments);
        Map<String, WorkerCoordinator.ConnectorsAndTasks> revoked = IncrementalCooperativeAssignor.buildAll(toRevoke);
        this.previousAssignment = this.computePreviousAssignment(revoked, nextConnectorAssignments, nextTaskAssignments, lostAssignments);
        this.previousGenerationId = currentGenerationId;
        this.previousMembers = memberAssignments.keySet();
        this.log.debug("Incremental connector assignments: {}", (Object)incrementalConnectorAssignments);
        this.log.debug("Incremental task assignments: {}", (Object)incrementalTaskAssignments);
        Map<String, Collection<String>> revokedConnectors = ConnectUtils.transformValues(revoked, WorkerCoordinator.ConnectorsAndTasks::connectors);
        Map<String, Collection<ConnectorTaskId>> revokedTasks = ConnectUtils.transformValues(revoked, WorkerCoordinator.ConnectorsAndTasks::tasks);
        return new ClusterAssignment(incrementalConnectorAssignments, incrementalTaskAssignments, revokedConnectors, revokedTasks, IncrementalCooperativeAssignor.diff(nextConnectorAssignments, revokedConnectors), IncrementalCooperativeAssignor.diff(nextTaskAssignments, revokedTasks));
    }

    private WorkerCoordinator.ConnectorsAndTasks computePreviousAssignment(Map<String, WorkerCoordinator.ConnectorsAndTasks> toRevoke, Map<String, Collection<String>> connectorAssignments, Map<String, Collection<ConnectorTaskId>> taskAssignments, WorkerCoordinator.ConnectorsAndTasks lostAssignments) {
        WorkerCoordinator.ConnectorsAndTasks previousAssignment = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(ConnectUtils.combineCollections(connectorAssignments.values()), ConnectUtils.combineCollections(taskAssignments.values())).build();
        for (WorkerCoordinator.ConnectorsAndTasks revoked : toRevoke.values()) {
            previousAssignment.connectors().removeAll(revoked.connectors());
            previousAssignment.tasks().removeAll(revoked.tasks());
            this.previousRevocation.connectors().addAll(revoked.connectors());
            this.previousRevocation.tasks().addAll(revoked.tasks());
        }
        previousAssignment.connectors().addAll(lostAssignments.connectors());
        previousAssignment.tasks().addAll(lostAssignments.tasks());
        return previousAssignment;
    }

    private WorkerCoordinator.ConnectorsAndTasks duplicatedAssignments(Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments) {
        Map connectorInstanceCounts = ConnectUtils.combineCollections(memberAssignments.values(), WorkerCoordinator.ConnectorsAndTasks::connectors, Collectors.groupingBy(Function.identity(), Collectors.counting()));
        Set<String> duplicatedConnectors = connectorInstanceCounts.entrySet().stream().filter(entry -> (Long)entry.getValue() > 1L).map(Map.Entry::getKey).collect(Collectors.toSet());
        Map taskInstanceCounts = ConnectUtils.combineCollections(memberAssignments.values(), WorkerCoordinator.ConnectorsAndTasks::tasks, Collectors.groupingBy(Function.identity(), Collectors.counting()));
        Set<ConnectorTaskId> duplicatedTasks = taskInstanceCounts.entrySet().stream().filter(entry -> (Long)entry.getValue() > 1L).map(Map.Entry::getKey).collect(Collectors.toSet());
        return new WorkerCoordinator.ConnectorsAndTasks.Builder().with(duplicatedConnectors, duplicatedTasks).build();
    }

    protected void handleLostAssignments(WorkerCoordinator.ConnectorsAndTasks lostAssignments, WorkerCoordinator.ConnectorsAndTasks.Builder lostAssignmentsToReassign, List<WorkerCoordinator.WorkerLoad> completeWorkerAssignment) {
        if (lostAssignments.isEmpty() && !this.revokedInPrevious) {
            this.resetDelay();
            return;
        }
        long now = this.time.milliseconds();
        this.log.debug("Found the following connectors and tasks missing from previous assignments: " + lostAssignments);
        Set activeMembers = completeWorkerAssignment.stream().map(WorkerCoordinator.WorkerLoad::worker).collect(Collectors.toSet());
        if (this.scheduledRebalance <= 0L && activeMembers.containsAll(this.previousMembers)) {
            this.log.debug("No worker seems to have departed the group during the rebalance. The missing assignments that the leader is detecting are probably due to some workers failing to receive the new assignments in the previous rebalance. Will reassign missing tasks as new tasks");
            lostAssignmentsToReassign.addAll(lostAssignments);
            return;
        }
        if (this.maxDelay == 0) {
            this.log.debug("Scheduled rebalance delays are disabled ({} = 0); reassigning all lost connectors and tasks immediately", (Object)"scheduled.rebalance.max.delay.ms");
            lostAssignmentsToReassign.addAll(lostAssignments);
            return;
        }
        if (this.scheduledRebalance > 0L && now >= this.scheduledRebalance) {
            this.log.debug("Delayed rebalance expired. Reassigning lost tasks");
            List<Object> candidateWorkerLoad = Collections.emptyList();
            if (!this.candidateWorkersForReassignment.isEmpty()) {
                candidateWorkerLoad = this.pickCandidateWorkerForReassignment(completeWorkerAssignment);
            }
            if (!candidateWorkerLoad.isEmpty()) {
                WorkerCoordinator.WorkerLoad worker;
                this.log.debug("Assigning lost tasks to {} candidate workers: {}", (Object)candidateWorkerLoad.size(), (Object)candidateWorkerLoad.stream().map(WorkerCoordinator.WorkerLoad::worker).collect(Collectors.joining(",")));
                Iterator<Object> candidateWorkerIterator = candidateWorkerLoad.iterator();
                for (String connector : lostAssignments.connectors()) {
                    if (!candidateWorkerIterator.hasNext()) {
                        candidateWorkerIterator = candidateWorkerLoad.iterator();
                    }
                    worker = (WorkerCoordinator.WorkerLoad)candidateWorkerIterator.next();
                    this.log.debug("Assigning connector id {} to member {}", (Object)connector, (Object)worker.worker());
                    worker.assign(connector);
                }
                candidateWorkerIterator = candidateWorkerLoad.iterator();
                for (ConnectorTaskId task : lostAssignments.tasks()) {
                    if (!candidateWorkerIterator.hasNext()) {
                        candidateWorkerIterator = candidateWorkerLoad.iterator();
                    }
                    worker = (WorkerCoordinator.WorkerLoad)candidateWorkerIterator.next();
                    this.log.debug("Assigning task id {} to member {}", (Object)task, (Object)worker.worker());
                    worker.assign(task);
                }
            } else {
                this.log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
                lostAssignmentsToReassign.addAll(lostAssignments);
            }
            this.resetDelay();
            this.revokedInPrevious = false;
        } else {
            this.candidateWorkersForReassignment.addAll(this.candidateWorkersForReassignment(completeWorkerAssignment));
            if (now < this.scheduledRebalance) {
                this.delay = this.calculateDelay(now);
                this.log.debug("Delayed rebalance in progress. Task reassignment is postponed. New computed rebalance delay: {}", (Object)this.delay);
            } else {
                this.delay = this.maxDelay;
                this.log.debug("Resetting rebalance delay to the max: {}. scheduledRebalance: {} now: {} diff scheduledRebalance - now: {}", this.delay, this.scheduledRebalance, now, this.scheduledRebalance - now);
            }
            this.scheduledRebalance = now + (long)this.delay;
        }
    }

    private void resetDelay() {
        this.candidateWorkersForReassignment.clear();
        this.scheduledRebalance = 0L;
        if (this.delay != 0) {
            this.log.debug("Resetting delay from previous value: {} to 0", (Object)this.delay);
        }
        this.delay = 0;
    }

    private Set<String> candidateWorkersForReassignment(List<WorkerCoordinator.WorkerLoad> completeWorkerAssignment) {
        return completeWorkerAssignment.stream().filter(WorkerCoordinator.WorkerLoad::isEmpty).map(WorkerCoordinator.WorkerLoad::worker).collect(Collectors.toSet());
    }

    private List<WorkerCoordinator.WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerCoordinator.WorkerLoad> completeWorkerAssignment) {
        Map activeWorkers = completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, Function.identity()));
        return this.candidateWorkersForReassignment.stream().map(activeWorkers::get).filter(Objects::nonNull).collect(Collectors.toList());
    }

    private Map<String, ExtendedAssignment> fillAssignments(Collection<String> members, short error, String leaderId, String leaderUrl, long maxOffset, ClusterAssignment clusterAssignment, int delay, short protocolVersion) {
        HashMap<String, ExtendedAssignment> groupAssignment = new HashMap<String, ExtendedAssignment>();
        for (String member : members) {
            Collection<String> connectorsToStart = clusterAssignment.newlyAssignedConnectors(member);
            Collection<ConnectorTaskId> tasksToStart = clusterAssignment.newlyAssignedTasks(member);
            Collection<String> connectorsToStop = clusterAssignment.newlyRevokedConnectors(member);
            Collection<ConnectorTaskId> tasksToStop = clusterAssignment.newlyRevokedTasks(member);
            ExtendedAssignment assignment = new ExtendedAssignment(protocolVersion, error, leaderId, leaderUrl, maxOffset, connectorsToStart, tasksToStart, connectorsToStop, tasksToStop, delay);
            this.log.debug("Filling assignment: {} -> {}", (Object)member, (Object)assignment);
            groupAssignment.put(member, assignment);
        }
        this.log.debug("Finished assignment");
        return groupAssignment;
    }

    protected Map<String, ByteBuffer> serializeAssignments(Map<String, ExtendedAssignment> assignments, short protocolVersion) {
        boolean sessioned = protocolVersion >= 2;
        return assignments.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> IncrementalCooperativeConnectProtocol.serializeAssignment((ExtendedAssignment)e.getValue(), sessioned)));
    }

    private static WorkerCoordinator.ConnectorsAndTasks diff(WorkerCoordinator.ConnectorsAndTasks base, WorkerCoordinator.ConnectorsAndTasks ... toSubtract) {
        TreeSet<String> connectors = new TreeSet<String>(base.connectors());
        TreeSet<ConnectorTaskId> tasks = new TreeSet<ConnectorTaskId>(base.tasks());
        for (WorkerCoordinator.ConnectorsAndTasks sub : toSubtract) {
            connectors.removeAll(sub.connectors());
            tasks.removeAll(sub.tasks());
        }
        return new WorkerCoordinator.ConnectorsAndTasks.Builder().with(connectors, tasks).build();
    }

    private static <T> Map<String, Collection<T>> diff(Map<String, Collection<T>> base, Map<String, Collection<T>> toSubtract) {
        HashMap<String, Collection<T>> incremental = new HashMap<String, Collection<T>>();
        for (Map.Entry<String, Collection<T>> entry : base.entrySet()) {
            ArrayList<T> values = new ArrayList<T>(entry.getValue());
            values.removeAll(toSubtract.getOrDefault(entry.getKey(), Collections.emptySet()));
            incremental.put(entry.getKey(), values);
        }
        return incremental;
    }

    private WorkerCoordinator.ConnectorsAndTasks assignment(Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments) {
        this.log.debug("Received assignments: {}", (Object)memberAssignments);
        return new WorkerCoordinator.ConnectorsAndTasks.Builder().with(ConnectUtils.combineCollections(memberAssignments.values(), WorkerCoordinator.ConnectorsAndTasks::connectors), ConnectUtils.combineCollections(memberAssignments.values(), WorkerCoordinator.ConnectorsAndTasks::tasks)).build();
    }

    private Map<String, WorkerCoordinator.ConnectorsAndTasks> performLoadBalancingRevocations(WorkerCoordinator.ConnectorsAndTasks configured, Collection<WorkerCoordinator.WorkerLoad> workers) {
        if (this.log.isTraceEnabled()) {
            workers.forEach(wl -> this.log.trace("Per worker current load size; worker: {} connectors: {} tasks: {}", wl.worker(), wl.connectorsSize(), wl.tasksSize()));
        }
        if (workers.stream().allMatch(WorkerCoordinator.WorkerLoad::isEmpty)) {
            this.log.trace("No load-balancing revocations required; all workers are either new or will have all currently-assigned connectors and tasks revoked during this round");
            return Collections.emptyMap();
        }
        if (configured.isEmpty()) {
            this.log.trace("No load-balancing revocations required; no connectors are currently configured on this cluster");
            return Collections.emptyMap();
        }
        HashMap result = new HashMap();
        Map connectorRevocations = this.loadBalancingRevocations("connector", configured.connectors().size(), workers, WorkerCoordinator.WorkerLoad::connectors);
        Map taskRevocations = this.loadBalancingRevocations("task", configured.tasks().size(), workers, WorkerCoordinator.WorkerLoad::tasks);
        connectorRevocations.forEach((worker, revoked) -> result.computeIfAbsent(worker, w -> new WorkerCoordinator.ConnectorsAndTasks.Builder()).addConnectors((Collection<String>)revoked));
        taskRevocations.forEach((worker, revoked) -> result.computeIfAbsent(worker, w -> new WorkerCoordinator.ConnectorsAndTasks.Builder()).addTasks((Collection<ConnectorTaskId>)revoked));
        return IncrementalCooperativeAssignor.buildAll(result);
    }

    private <E> Map<String, Set<E>> loadBalancingRevocations(String allocatedResourceName, int totalToAllocate, Collection<WorkerCoordinator.WorkerLoad> workers, Function<WorkerCoordinator.WorkerLoad, Collection<E>> workerAllocation) {
        int totalWorkers = workers.size();
        int minAllocatedPerWorker = totalToAllocate / totalWorkers;
        int workersToAllocateExtra = totalToAllocate % totalWorkers;
        Function<WorkerCoordinator.WorkerLoad, Integer> workerAllocationSize = workerAllocation.andThen(Collection::size);
        long workersAllocatedMinimum = workers.stream().map(workerAllocationSize).filter(n -> n == minAllocatedPerWorker).count();
        long workersAllocatedSingleExtra = workers.stream().map(workerAllocationSize).filter(n -> n == minAllocatedPerWorker + 1).count();
        if (workersAllocatedSingleExtra == (long)workersToAllocateExtra && workersAllocatedMinimum + workersAllocatedSingleExtra == (long)totalWorkers) {
            this.log.trace("No load-balancing {} revocations required; the current allocations, when combined with any newly-created {}s, should be balanced", (Object)allocatedResourceName, (Object)allocatedResourceName);
            return Collections.emptyMap();
        }
        HashMap<String, Set<E>> result = new HashMap<String, Set<E>>();
        int allocatedExtras = 0;
        block0: for (WorkerCoordinator.WorkerLoad worker : workers) {
            int maxAllocationForWorker;
            int currentAllocationSizeForWorker = workerAllocationSize.apply(worker);
            if (currentAllocationSizeForWorker <= minAllocatedPerWorker) continue;
            if (allocatedExtras < workersToAllocateExtra) {
                ++allocatedExtras;
                if (currentAllocationSizeForWorker == minAllocatedPerWorker + 1) continue;
                maxAllocationForWorker = minAllocatedPerWorker + 1;
            } else {
                maxAllocationForWorker = minAllocatedPerWorker;
            }
            LinkedHashSet<E> revokedFromWorker = new LinkedHashSet<E>();
            result.put(worker.worker(), revokedFromWorker);
            Iterator<E> currentWorkerAllocation = workerAllocation.apply(worker).iterator();
            int numRevoked = 0;
            while (currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker) {
                if (!currentWorkerAllocation.hasNext()) {
                    this.log.warn("Unexpectedly ran out of {}s to revoke from worker {} while performing load-balancing revocations; worker appears to still be allocated {} instances, which is more than the intended allocation of {}", allocatedResourceName, worker.worker(), workerAllocationSize.apply(worker), maxAllocationForWorker);
                    continue block0;
                }
                E revocation = currentWorkerAllocation.next();
                revokedFromWorker.add(revocation);
                ++numRevoked;
            }
        }
        return result;
    }

    private int calculateDelay(long now) {
        long diff = this.scheduledRebalance - now;
        return diff > 0L ? (int)Math.min(diff, (long)this.maxDelay) : 0;
    }

    protected void assignConnectors(List<WorkerCoordinator.WorkerLoad> workerAssignment, Collection<String> connectors) {
        workerAssignment.sort(WorkerCoordinator.WorkerLoad.connectorComparator());
        WorkerCoordinator.WorkerLoad first = workerAssignment.get(0);
        Iterator<String> load = connectors.iterator();
        block0: while (load.hasNext()) {
            int firstLoad = first.connectorsSize();
            int upTo = IntStream.range(0, workerAssignment.size()).filter(i -> ((WorkerCoordinator.WorkerLoad)workerAssignment.get(i)).connectorsSize() > firstLoad).findFirst().orElse(workerAssignment.size());
            for (WorkerCoordinator.WorkerLoad worker : workerAssignment.subList(0, upTo)) {
                String connector = load.next();
                this.log.debug("Assigning connector {} to {}", (Object)connector, (Object)worker.worker());
                worker.assign(connector);
                if (load.hasNext()) continue;
                continue block0;
            }
        }
    }

    protected void assignTasks(List<WorkerCoordinator.WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks) {
        workerAssignment.sort(WorkerCoordinator.WorkerLoad.taskComparator());
        WorkerCoordinator.WorkerLoad first = workerAssignment.get(0);
        Iterator<ConnectorTaskId> load = tasks.iterator();
        block0: while (load.hasNext()) {
            int firstLoad = first.tasksSize();
            int upTo = IntStream.range(0, workerAssignment.size()).filter(i -> ((WorkerCoordinator.WorkerLoad)workerAssignment.get(i)).tasksSize() > firstLoad).findFirst().orElse(workerAssignment.size());
            for (WorkerCoordinator.WorkerLoad worker : workerAssignment.subList(0, upTo)) {
                ConnectorTaskId task = load.next();
                this.log.debug("Assigning task {} to {}", (Object)task, (Object)worker.worker());
                worker.assign(task);
                if (load.hasNext()) continue;
                continue block0;
            }
        }
    }

    private static List<WorkerCoordinator.WorkerLoad> workerAssignment(Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments, WorkerCoordinator.ConnectorsAndTasks toExclude) {
        WorkerCoordinator.ConnectorsAndTasks ignore = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(toExclude.connectors(), toExclude.tasks()).build();
        return memberAssignments.entrySet().stream().map(e -> new WorkerCoordinator.WorkerLoad.Builder((String)e.getKey()).with(((WorkerCoordinator.ConnectorsAndTasks)e.getValue()).connectors().stream().filter(v -> !ignore.connectors().contains(v)).collect(Collectors.toList()), ((WorkerCoordinator.ConnectorsAndTasks)e.getValue()).tasks().stream().filter(v -> !ignore.tasks().contains(v)).collect(Collectors.toList())).build()).collect(Collectors.toList());
    }

    private static void addAll(Map<String, WorkerCoordinator.ConnectorsAndTasks.Builder> base, Map<String, WorkerCoordinator.ConnectorsAndTasks> toAdd) {
        toAdd.forEach((worker, assignment) -> base.computeIfAbsent((String)worker, w -> new WorkerCoordinator.ConnectorsAndTasks.Builder()).addAll((WorkerCoordinator.ConnectorsAndTasks)assignment));
    }

    private static <K> Map<K, WorkerCoordinator.ConnectorsAndTasks> buildAll(Map<K, WorkerCoordinator.ConnectorsAndTasks.Builder> builders) {
        return ConnectUtils.transformValues(builders, WorkerCoordinator.ConnectorsAndTasks.Builder::build);
    }

    private static List<WorkerCoordinator.WorkerLoad> workerLoads(Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments) {
        return memberAssignments.entrySet().stream().map(e -> new WorkerCoordinator.WorkerLoad.Builder((String)e.getKey()).with(((WorkerCoordinator.ConnectorsAndTasks)e.getValue()).connectors(), ((WorkerCoordinator.ConnectorsAndTasks)e.getValue()).tasks()).build()).collect(Collectors.toList());
    }

    private static void removeAll(List<WorkerCoordinator.WorkerLoad> workerLoads, Map<String, WorkerCoordinator.ConnectorsAndTasks> toRemove) {
        workerLoads.forEach(workerLoad -> {
            String worker = workerLoad.worker();
            WorkerCoordinator.ConnectorsAndTasks toRemoveFromWorker = toRemove.getOrDefault(worker, WorkerCoordinator.ConnectorsAndTasks.EMPTY);
            workerLoad.connectors().removeAll(toRemoveFromWorker.connectors());
            workerLoad.tasks().removeAll(toRemoveFromWorker.tasks());
        });
    }

    private static Map<String, WorkerCoordinator.ConnectorsAndTasks> intersection(WorkerCoordinator.ConnectorsAndTasks connectorsAndTasks, Map<String, WorkerCoordinator.ConnectorsAndTasks> assignments) {
        return ConnectUtils.transformValues(assignments, assignment -> {
            HashSet<String> connectors = new HashSet<String>(assignment.connectors());
            connectors.retainAll(connectorsAndTasks.connectors());
            HashSet<ConnectorTaskId> tasks = new HashSet<ConnectorTaskId>(assignment.tasks());
            tasks.retainAll(connectorsAndTasks.tasks());
            return new WorkerCoordinator.ConnectorsAndTasks.Builder().with(connectors, tasks).build();
        });
    }

    static class ClusterAssignment {
        private final Map<String, Collection<String>> newlyAssignedConnectors;
        private final Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks;
        private final Map<String, Collection<String>> newlyRevokedConnectors;
        private final Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks;
        private final Map<String, Collection<String>> allAssignedConnectors;
        private final Map<String, Collection<ConnectorTaskId>> allAssignedTasks;
        private final Set<String> allWorkers;
        public static final ClusterAssignment EMPTY = new ClusterAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());

        public ClusterAssignment(Map<String, Collection<String>> newlyAssignedConnectors, Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks, Map<String, Collection<String>> newlyRevokedConnectors, Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks, Map<String, Collection<String>> allAssignedConnectors, Map<String, Collection<ConnectorTaskId>> allAssignedTasks) {
            this.newlyAssignedConnectors = newlyAssignedConnectors;
            this.newlyAssignedTasks = newlyAssignedTasks;
            this.newlyRevokedConnectors = newlyRevokedConnectors;
            this.newlyRevokedTasks = newlyRevokedTasks;
            this.allAssignedConnectors = allAssignedConnectors;
            this.allAssignedTasks = allAssignedTasks;
            this.allWorkers = ConnectUtils.combineCollections(Arrays.asList(newlyAssignedConnectors, newlyAssignedTasks, newlyRevokedConnectors, newlyRevokedTasks, allAssignedConnectors, allAssignedTasks), Map::keySet, Collectors.toSet());
        }

        public Map<String, Collection<String>> newlyAssignedConnectors() {
            return this.newlyAssignedConnectors;
        }

        public Collection<String> newlyAssignedConnectors(String worker) {
            return this.newlyAssignedConnectors.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks() {
            return this.newlyAssignedTasks;
        }

        public Collection<ConnectorTaskId> newlyAssignedTasks(String worker) {
            return this.newlyAssignedTasks.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<String>> newlyRevokedConnectors() {
            return this.newlyRevokedConnectors;
        }

        public Collection<String> newlyRevokedConnectors(String worker) {
            return this.newlyRevokedConnectors.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks() {
            return this.newlyRevokedTasks;
        }

        public Collection<ConnectorTaskId> newlyRevokedTasks(String worker) {
            return this.newlyRevokedTasks.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<String>> allAssignedConnectors() {
            return this.allAssignedConnectors;
        }

        public Map<String, Collection<ConnectorTaskId>> allAssignedTasks() {
            return this.allAssignedTasks;
        }

        public Set<String> allWorkers() {
            return this.allWorkers;
        }

        public String toString() {
            return "ClusterAssignment{newlyAssignedConnectors=" + this.newlyAssignedConnectors + ", newlyAssignedTasks=" + this.newlyAssignedTasks + ", newlyRevokedConnectors=" + this.newlyRevokedConnectors + ", newlyRevokedTasks=" + this.newlyRevokedTasks + ", allAssignedConnectors=" + this.allAssignedConnectors + ", allAssignedTasks=" + this.allAssignedTasks + ", allWorkers=" + this.allWorkers + '}';
        }
    }
}

