/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

public class StreamsPartitionAssignor
implements PartitionAssignor,
Configurable {
    private static final int UNKNOWN = -1;
    private static final int VERSION_ONE = 1;
    private static final int VERSION_TWO = 2;
    private static final int VERSION_THREE = 3;
    private static final int VERSION_FOUR = 4;
    private static final int EARLIEST_PROBEABLE_VERSION = 3;
    protected final Set<Integer> supportedVersions = new HashSet<Integer>();
    private Logger log;
    private String logPrefix;
    protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = (p1, p2) -> {
        int result = p1.topic().compareTo(p2.topic());
        if (result != 0) {
            return result;
        }
        return Integer.compare(p1.partition(), p2.partition());
    };
    private String userEndPoint;
    private int numStandbyReplicas;
    private TaskManager taskManager;
    private PartitionGrouper partitionGrouper;
    private AtomicInteger assignmentErrorCode;
    protected int usedSubscriptionMetadataVersion = 4;
    private InternalTopicManager internalTopicManager;
    private CopartitionedTopicsValidator copartitionedTopicsValidator;

    protected String userEndPoint() {
        return this.userEndPoint;
    }

    protected TaskManager taskManger() {
        return this.taskManager;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        Object o;
        InternalStreamsConfig streamsConfig = new InternalStreamsConfig(configs);
        this.logPrefix = String.format("stream-thread [%s] ", streamsConfig.getString("client.id"));
        LogContext logContext = new LogContext(this.logPrefix);
        this.log = logContext.logger(this.getClass());
        String upgradeFrom = streamsConfig.getString("upgrade.from");
        if (upgradeFrom != null) {
            switch (upgradeFrom) {
                case "0.10.0": {
                    this.log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", (Object)4);
                    this.usedSubscriptionMetadataVersion = 1;
                    break;
                }
                case "0.10.1": 
                case "0.10.2": 
                case "0.11.0": 
                case "1.0": 
                case "1.1": {
                    this.log.info("Downgrading metadata version from {} to 2 for upgrade from {}.x.", (Object)4, (Object)upgradeFrom);
                    this.usedSubscriptionMetadataVersion = 2;
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
                }
            }
        }
        if ((o = configs.get("__task.manager.instance__")) == null) {
            KafkaException fatalException = new KafkaException("TaskManager is not specified");
            this.log.error(fatalException.getMessage(), fatalException);
            throw fatalException;
        }
        if (!(o instanceof TaskManager)) {
            KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
            this.log.error(fatalException.getMessage(), fatalException);
            throw fatalException;
        }
        this.taskManager = (TaskManager)o;
        Object ai = configs.get("__assignment.error.code__");
        if (ai == null) {
            KafkaException fatalException = new KafkaException("assignmentErrorCode is not specified");
            this.log.error(fatalException.getMessage(), fatalException);
            throw fatalException;
        }
        if (!(ai instanceof AtomicInteger)) {
            KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", ai.getClass().getName(), AtomicInteger.class.getName()));
            this.log.error(fatalException.getMessage(), fatalException);
            throw fatalException;
        }
        this.assignmentErrorCode = (AtomicInteger)ai;
        this.numStandbyReplicas = streamsConfig.getInt("num.standby.replicas");
        this.partitionGrouper = streamsConfig.getConfiguredInstance("partition.grouper", PartitionGrouper.class);
        String userEndPoint = streamsConfig.getString("application.server");
        if (userEndPoint != null && !userEndPoint.isEmpty()) {
            try {
                String host = Utils.getHost(userEndPoint);
                Integer port = Utils.getPort(userEndPoint);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair but received %s", this.logPrefix, "application.server", userEndPoint));
                }
            }
            catch (NumberFormatException nfe) {
                throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s", this.logPrefix, userEndPoint, "application.server"));
            }
            this.userEndPoint = userEndPoint;
        }
        this.internalTopicManager = new InternalTopicManager(this.taskManager.adminClient, streamsConfig);
        this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(this.logPrefix);
    }

    @Override
    public String name() {
        return "stream";
    }

    @Override
    public PartitionAssignor.Subscription subscription(Set<String> topics) {
        Set<TaskId> previousActiveTasks = this.taskManager.prevActiveTaskIds();
        Set<TaskId> standbyTasks = this.taskManager.cachedTasksIds();
        standbyTasks.removeAll(previousActiveTasks);
        SubscriptionInfo data = new SubscriptionInfo(this.usedSubscriptionMetadataVersion, this.taskManager.processId(), previousActiveTasks, standbyTasks, this.userEndPoint);
        this.taskManager.updateSubscriptionsFromMetadata(topics);
        return new PartitionAssignor.Subscription(new ArrayList<String>(topics), data.encode());
    }

    private Map<String, PartitionAssignor.Assignment> errorAssignment(Map<UUID, ClientMetadata> clientsMetadata, String topic, int errorCode) {
        this.log.error("{} is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.", (Object)topic);
        HashMap<String, PartitionAssignor.Assignment> assignment = new HashMap<String, PartitionAssignor.Assignment>();
        for (ClientMetadata clientMetadata : clientsMetadata.values()) {
            for (String consumerId : clientMetadata.consumers) {
                assignment.put(consumerId, new PartitionAssignor.Assignment(Collections.emptyList(), new AssignmentInfo(4, Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), errorCode).encode()));
            }
        }
        return assignment;
    }

    /*
     * WARNING - void declaration
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public Map<String, PartitionAssignor.Assignment> assign(Cluster metadata, Map<String, PartitionAssignor.Subscription> subscriptions) {
        int numPartitions;
        boolean numPartitionsNeeded;
        boolean versionProbing;
        HashMap<UUID, ClientMetadata> clientsMetadata = new HashMap<UUID, ClientMetadata>();
        HashSet<String> futureConsumers = new HashSet<String>();
        int minReceivedMetadataVersion = 4;
        this.supportedVersions.clear();
        int futureMetadataVersion = -1;
        for (Map.Entry<String, PartitionAssignor.Subscription> entry : subscriptions.entrySet()) {
            void var13_23;
            ClientMetadata clientMetadata;
            String consumerId = entry.getKey();
            PartitionAssignor.Subscription subscription = entry.getValue();
            SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
            int usedVersion = subscriptionInfo.version();
            this.supportedVersions.add(subscriptionInfo.latestSupportedVersion());
            if (usedVersion > 4) {
                futureMetadataVersion = usedVersion;
                futureConsumers.add(consumerId);
                continue;
            }
            if (usedVersion < minReceivedMetadataVersion) {
                minReceivedMetadataVersion = usedVersion;
            }
            if ((clientMetadata = (ClientMetadata)clientsMetadata.get(subscriptionInfo.processId())) == null) {
                ClientMetadata clientMetadata2 = new ClientMetadata(subscriptionInfo.userEndPoint());
                clientsMetadata.put(subscriptionInfo.processId(), clientMetadata2);
            }
            var13_23.addConsumer(consumerId, subscriptionInfo);
        }
        if (futureMetadataVersion != -1) {
            if (minReceivedMetadataVersion < 3) throw new IllegalStateException("Received a future (version probing) subscription (version: " + futureMetadataVersion + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion + ") at the same time.");
            this.log.info("Received a future (version probing) subscription (version: {}). Sending empty assignment back (with supported version {}).", (Object)futureMetadataVersion, (Object)4);
            versionProbing = true;
        } else {
            versionProbing = false;
        }
        if (minReceivedMetadataVersion < 4) {
            this.log.info("Downgrading metadata to version {}. Latest supported version is {}.", (Object)minReceivedMetadataVersion, (Object)4);
        }
        this.log.debug("Constructed client metadata {} from the member subscriptions.", (Object)clientsMetadata);
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.taskManager.builder().topicGroups();
        HashMap<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<String, InternalTopicMetadata>();
        for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
            for (String string : topicsInfo.sourceTopics) {
                if (topicsInfo.repartitionSourceTopics.keySet().contains(string) || metadata.topics().contains(string)) continue;
                this.log.error("Missing source topic {} durign assignment. Returning error {}.", (Object)string, (Object)Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
                return this.errorAssignment(clientsMetadata, string, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
            }
            for (InternalTopicConfig internalTopicConfig : topicsInfo.repartitionSourceTopics.values()) {
                repartitionTopicMetadata.put(internalTopicConfig.name(), new InternalTopicMetadata(internalTopicConfig));
            }
        }
        do {
            numPartitionsNeeded = false;
            for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
                for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
                    numPartitions = ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)topicName)).numPartitions;
                    if (numPartitions != -1) continue;
                    for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
                        Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
                        if (!otherSinkTopics.contains(topicName)) continue;
                        for (String string : otherTopicsInfo.sourceTopics) {
                            void var21_59;
                            if (repartitionTopicMetadata.containsKey(string)) {
                                Integer n = ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)string)).numPartitions;
                            } else {
                                Integer n = metadata.partitionCountForTopic(string);
                            }
                            if (var21_59.intValue() <= numPartitions) continue;
                            numPartitions = var21_59.intValue();
                        }
                    }
                    if (numPartitions == -1) {
                        numPartitionsNeeded = true;
                        continue;
                    }
                    ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)topicName)).numPartitions = numPartitions;
                }
            }
        } while (numPartitionsNeeded);
        this.ensureCopartitioning(this.taskManager.builder().copartitionGroups(), repartitionTopicMetadata, metadata);
        this.prepareTopic(repartitionTopicMetadata);
        HashMap<TopicPartition, PartitionInfo> hashMap = new HashMap<TopicPartition, PartitionInfo>();
        for (Map.Entry entry : repartitionTopicMetadata.entrySet()) {
            String topic = (String)entry.getKey();
            numPartitions = ((InternalTopicMetadata)entry.getValue()).numPartitions;
            for (int partition = 0; partition < numPartitions; ++partition) {
                hashMap.put(new TopicPartition(topic, partition), new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
            }
        }
        Cluster fullMetadata = metadata.withPartitions(hashMap);
        this.taskManager.setClusterMetadata(fullMetadata);
        this.log.debug("Created repartition topics {} from the parsed topology.", (Object)hashMap.values());
        HashSet<String> hashSet = new HashSet<String>();
        HashMap<Integer, Set<String>> sourceTopicsByGroup = new HashMap<Integer, Set<String>>();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            hashSet.addAll(entry.getValue().sourceTopics);
            sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
        }
        Map<TaskId, Set<TopicPartition>> partitionsForTask = this.partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
        HashSet allAssignedPartitions = new HashSet();
        HashMap<Integer, Set> tasksByTopicGroup = new HashMap<Integer, Set>();
        for (Map.Entry entry : partitionsForTask.entrySet()) {
            Set set = (Set)entry.getValue();
            for (TopicPartition partition : set) {
                if (!allAssignedPartitions.contains(partition)) continue;
                this.log.warn("Partition {} is assigned to more than one tasks: {}", (Object)partition, (Object)partitionsForTask);
            }
            allAssignedPartitions.addAll(set);
            TaskId taskId = (TaskId)entry.getKey();
            tasksByTopicGroup.computeIfAbsent(taskId.topicGroupId, k -> new HashSet()).add(taskId);
        }
        for (String string : hashSet) {
            List<PartitionInfo> list = fullMetadata.partitionsForTopic(string);
            if (!list.isEmpty()) {
                for (PartitionInfo partitionInfo : list) {
                    TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    if (allAssignedPartitions.contains(topicPartition)) continue;
                    this.log.warn("Partition {} is not assigned to any tasks: {} Possible causes of a partition not getting assigned is that another topic defined in the topology has not been created when starting your streams application, resulting in no tasks created for this topology at all.", (Object)topicPartition, (Object)partitionsForTask);
                }
                continue;
            }
            this.log.warn("No partitions found for topic {}", (Object)string);
        }
        HashMap<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<String, InternalTopicMetadata>();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            int n = entry.getKey();
            Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
            for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
                int numPartitions2 = -1;
                if (tasksByTopicGroup.get(n) != null) {
                    for (TaskId task : (Set)tasksByTopicGroup.get(n)) {
                        if (numPartitions2 >= task.partition + 1) continue;
                        numPartitions2 = task.partition + 1;
                    }
                    InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
                    topicMetadata.numPartitions = numPartitions2;
                    changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
                    continue;
                }
                this.log.debug("No tasks found for topic group {}", (Object)n);
            }
        }
        this.prepareTopic(changelogTopicMetadata);
        this.log.debug("Created state changelog topics {} from the parsed topology.", (Object)changelogTopicMetadata.values());
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : clientsMetadata.entrySet()) {
            hashMap2.put(entry.getKey(), ((ClientMetadata)entry.getValue()).state);
        }
        this.log.debug("Assigning tasks {} to clients {} with number of replicas {}", partitionsForTask.keySet(), hashMap2, this.numStandbyReplicas);
        StickyTaskAssignor stickyTaskAssignor = new StickyTaskAssignor(hashMap2, partitionsForTask.keySet());
        stickyTaskAssignor.assign(this.numStandbyReplicas);
        this.log.info("Assigned tasks to clients as {}.", (Object)hashMap2);
        HashMap<HostInfo, Set<TopicPartition>> hashMap3 = new HashMap<HostInfo, Set<TopicPartition>>();
        if (minReceivedMetadataVersion >= 2) {
            for (Map.Entry entry : clientsMetadata.entrySet()) {
                HostInfo hostInfo = ((ClientMetadata)entry.getValue()).hostInfo;
                if (hostInfo == null) continue;
                HashSet topicPartitions = new HashSet();
                ClientState state = ((ClientMetadata)entry.getValue()).state;
                for (TaskId id : state.activeTasks()) {
                    topicPartitions.addAll(partitionsForTask.get(id));
                }
                hashMap3.put(hostInfo, topicPartitions);
            }
        }
        this.taskManager.setPartitionsByHostState(hashMap3);
        if (!versionProbing) return this.computeNewAssignment(clientsMetadata, partitionsForTask, hashMap3, minReceivedMetadataVersion);
        return this.versionProbingAssignment(clientsMetadata, partitionsForTask, hashMap3, futureConsumers, minReceivedMetadataVersion);
    }

    private Map<String, PartitionAssignor.Assignment> computeNewAssignment(Map<UUID, ClientMetadata> clientsMetadata, Map<TaskId, Set<TopicPartition>> partitionsForTask, Map<HostInfo, Set<TopicPartition>> partitionsByHostState, int minUserMetadataVersion) {
        HashMap<String, PartitionAssignor.Assignment> assignment = new HashMap<String, PartitionAssignor.Assignment>();
        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
            Set<String> consumers = entry.getValue().consumers;
            ClientState state = entry.getValue().state;
            List<List<TaskId>> interleavedActive = this.interleaveTasksByGroupId(state.activeTasks(), consumers.size());
            List<List<TaskId>> interleavedStandby = this.interleaveTasksByGroupId(state.standbyTasks(), consumers.size());
            int consumerTaskIndex = 0;
            for (String consumer : consumers) {
                HashMap<TaskId, Set<TopicPartition>> standby = new HashMap<TaskId, Set<TopicPartition>>();
                ArrayList<AssignedPartition> assignedPartitions = new ArrayList<AssignedPartition>();
                List<TaskId> assignedActiveList = interleavedActive.get(consumerTaskIndex);
                for (TaskId taskId : assignedActiveList) {
                    for (TopicPartition topicPartition : partitionsForTask.get(taskId)) {
                        assignedPartitions.add(new AssignedPartition(taskId, topicPartition));
                    }
                }
                if (!state.standbyTasks().isEmpty()) {
                    List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex);
                    for (TaskId taskId2 : assignedStandbyList) {
                        standby.computeIfAbsent(taskId2, k -> new HashSet()).addAll((Collection)partitionsForTask.get(taskId2));
                    }
                }
                ++consumerTaskIndex;
                Collections.sort(assignedPartitions);
                ArrayList<TaskId> active = new ArrayList<TaskId>();
                ArrayList<TopicPartition> arrayList = new ArrayList<TopicPartition>();
                for (AssignedPartition assignedPartition : assignedPartitions) {
                    active.add(assignedPartition.taskId);
                    arrayList.add(assignedPartition.partition);
                }
                assignment.put(consumer, new PartitionAssignor.Assignment(arrayList, new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState, 0).encode()));
            }
        }
        return assignment;
    }

    private Map<String, PartitionAssignor.Assignment> versionProbingAssignment(Map<UUID, ClientMetadata> clientsMetadata, Map<TaskId, Set<TopicPartition>> partitionsForTask, Map<HostInfo, Set<TopicPartition>> partitionsByHostState, Set<String> futureConsumers, int minUserMetadataVersion) {
        HashMap<String, PartitionAssignor.Assignment> assignment = new HashMap<String, PartitionAssignor.Assignment>();
        for (ClientMetadata clientMetadata : clientsMetadata.values()) {
            for (String consumerId : clientMetadata.consumers) {
                if (futureConsumers.contains(consumerId)) continue;
                ArrayList<TaskId> activeTasks = new ArrayList<TaskId>(clientMetadata.state.prevActiveTasks());
                ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
                for (TaskId taskId : activeTasks) {
                    assignedPartitions.addAll((Collection)partitionsForTask.get(taskId));
                }
                HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
                for (TaskId taskId : clientMetadata.state.prevStandbyTasks()) {
                    standbyTasks.put(taskId, partitionsForTask.get(taskId));
                }
                assignment.put(consumerId, new PartitionAssignor.Assignment(assignedPartitions, new AssignmentInfo(minUserMetadataVersion, activeTasks, standbyTasks, partitionsByHostState, 0).encode()));
            }
        }
        for (String consumerId : futureConsumers) {
            assignment.put(consumerId, new PartitionAssignor.Assignment(Collections.emptyList(), new AssignmentInfo().encode()));
        }
        return assignment;
    }

    List<List<TaskId>> interleaveTasksByGroupId(Collection<TaskId> taskIds, int numberThreads) {
        LinkedList<TaskId> sortedTasks = new LinkedList<TaskId>(taskIds);
        Collections.sort(sortedTasks);
        ArrayList<List<TaskId>> taskIdsForConsumerAssignment = new ArrayList<List<TaskId>>(numberThreads);
        for (int i = 0; i < numberThreads; ++i) {
            taskIdsForConsumerAssignment.add(new ArrayList());
        }
        block1: while (!sortedTasks.isEmpty()) {
            for (List list : taskIdsForConsumerAssignment) {
                TaskId taskId = sortedTasks.poll();
                if (taskId == null) continue block1;
                list.add(taskId);
            }
        }
        return taskIdsForConsumerAssignment;
    }

    @Override
    public void onAssignment(PartitionAssignor.Assignment assignment) {
        Map<HostInfo, Set<TopicPartition>> partitionsByHost;
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>(assignment.partitions());
        Collections.sort(partitions, PARTITION_COMPARATOR);
        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        if (info.errCode() != Error.NONE.code) {
            this.assignmentErrorCode.set(info.errCode());
            return;
        }
        int receivedAssignmentMetadataVersion = info.version();
        int leaderSupportedVersion = info.latestSupportedVersion();
        if (receivedAssignmentMetadataVersion > this.usedSubscriptionMetadataVersion) {
            throw new IllegalStateException("Sent a version " + this.usedSubscriptionMetadataVersion + " subscription but got an assignment with higher version " + receivedAssignmentMetadataVersion + ".");
        }
        if (receivedAssignmentMetadataVersion < this.usedSubscriptionMetadataVersion && receivedAssignmentMetadataVersion >= 3) {
            if (receivedAssignmentMetadataVersion == leaderSupportedVersion) {
                this.log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.", (Object)this.usedSubscriptionMetadataVersion, (Object)receivedAssignmentMetadataVersion);
                this.usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion;
            } else {
                this.log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). Setting subscription metadata to leaders supported version {} and trigger new rebalance.", this.usedSubscriptionMetadataVersion, receivedAssignmentMetadataVersion, leaderSupportedVersion);
                this.usedSubscriptionMetadataVersion = leaderSupportedVersion;
            }
            this.assignmentErrorCode.set(Error.VERSION_PROBING.code);
            return;
        }
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        HashMap<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<TopicPartition, PartitionInfo>();
        switch (receivedAssignmentMetadataVersion) {
            case 1: {
                this.processVersionOneAssignment(info, partitions, activeTasks);
                partitionsByHost = Collections.emptyMap();
                break;
            }
            case 2: {
                this.processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
                partitionsByHost = info.partitionsByHost();
                break;
            }
            case 3: {
                if (leaderSupportedVersion > this.usedSubscriptionMetadataVersion) {
                    this.log.info("Sent a version {} subscription and group leader's latest supported version is {}. Upgrading subscription metadata version to {} for next rebalance.", this.usedSubscriptionMetadataVersion, leaderSupportedVersion, leaderSupportedVersion);
                    this.usedSubscriptionMetadataVersion = leaderSupportedVersion;
                }
                this.processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
                partitionsByHost = info.partitionsByHost();
                break;
            }
            case 4: {
                if (leaderSupportedVersion > this.usedSubscriptionMetadataVersion) {
                    this.log.info("Sent a version {} subscription and group leader's latest supported version is {}. Upgrading subscription metadata version to {} for next rebalance.", this.usedSubscriptionMetadataVersion, leaderSupportedVersion, leaderSupportedVersion);
                    this.usedSubscriptionMetadataVersion = leaderSupportedVersion;
                }
                this.processVersionFourAssignment(info, partitions, activeTasks, topicToPartitionInfo);
                partitionsByHost = info.partitionsByHost();
                break;
            }
            default: {
                throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/");
            }
        }
        this.taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
        this.taskManager.setPartitionsByHostState(partitionsByHost);
        this.taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
        this.taskManager.updateSubscriptionsFromAssignment(partitions);
    }

    private void processVersionOneAssignment(AssignmentInfo info, List<TopicPartition> partitions, Map<TaskId, Set<TopicPartition>> activeTasks) {
        if (partitions.size() != info.activeTasks().size()) {
            throw new TaskAssignmentException(String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d, assignmentInfo=%s", this.logPrefix, partitions.size(), info.activeTasks().size(), info.toString()));
        }
        for (int i = 0; i < partitions.size(); ++i) {
            TopicPartition partition = partitions.get(i);
            TaskId id = info.activeTasks().get(i);
            activeTasks.computeIfAbsent(id, k -> new HashSet()).add(partition);
        }
    }

    private void processVersionTwoAssignment(AssignmentInfo info, List<TopicPartition> partitions, Map<TaskId, Set<TopicPartition>> activeTasks, Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
        this.processVersionOneAssignment(info, partitions, activeTasks);
        Map<HostInfo, Set<TopicPartition>> partitionsByHost = info.partitionsByHost();
        for (Set<TopicPartition> value : partitionsByHost.values()) {
            for (TopicPartition topicPartition : value) {
                topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
            }
        }
    }

    private void processVersionThreeAssignment(AssignmentInfo info, List<TopicPartition> partitions, Map<TaskId, Set<TopicPartition>> activeTasks, Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
        this.processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
    }

    private void processVersionFourAssignment(AssignmentInfo info, List<TopicPartition> partitions, Map<TaskId, Set<TopicPartition>> activeTasks, Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
        this.processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
    }

    protected void processLatestVersionAssignment(AssignmentInfo info, List<TopicPartition> partitions, Map<TaskId, Set<TopicPartition>> activeTasks, Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
        this.processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
    }

    private void prepareTopic(Map<String, InternalTopicMetadata> topicPartitions) {
        this.log.debug("Starting to validate internal topics {} in partition assignor.", (Object)topicPartitions);
        HashMap<String, InternalTopicConfig> topicsToMakeReady = new HashMap<String, InternalTopicConfig>();
        for (InternalTopicMetadata metadata : topicPartitions.values()) {
            InternalTopicConfig topic = metadata.config;
            int numPartitions = metadata.numPartitions;
            if (numPartitions < 0) {
                throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", this.logPrefix, topic.name()));
            }
            topic.setNumberOfPartitions(numPartitions);
            topicsToMakeReady.put(topic.name(), topic);
        }
        if (!topicsToMakeReady.isEmpty()) {
            this.internalTopicManager.makeReady(topicsToMakeReady);
        }
        this.log.debug("Completed validating internal topics {} in partition assignor.", (Object)topicPartitions);
    }

    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, Cluster metadata) {
        for (Set<String> copartitionGroup : copartitionGroups) {
            this.copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
        }
    }

    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }

    static class CopartitionedTopicsValidator {
        private final String logPrefix;
        private final Logger log;

        CopartitionedTopicsValidator(String logPrefix) {
            this.logPrefix = logPrefix;
            LogContext logContext = new LogContext(logPrefix);
            this.log = logContext.logger(this.getClass());
        }

        void validate(Set<String> copartitionGroup, Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, Cluster metadata) {
            int numPartitions = -1;
            for (String string : copartitionGroup) {
                if (allRepartitionTopicsNumPartitions.containsKey(string)) continue;
                Integer partitions = metadata.partitionCountForTopic(string);
                if (partitions == null) {
                    String str = String.format("%sTopic not found: %s", this.logPrefix, string);
                    this.log.error(str);
                    throw new IllegalStateException(str);
                }
                if (numPartitions == -1) {
                    numPartitions = partitions;
                    continue;
                }
                if (numPartitions == partitions) continue;
                Object[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                Arrays.sort(topics);
                throw new TopologyException(String.format("%sTopics not co-partitioned: [%s]", this.logPrefix, Utils.join(Arrays.asList(topics), ",")));
            }
            if (numPartitions == -1) {
                for (Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) {
                    int partitions;
                    if (!copartitionGroup.contains(entry.getKey()) || (partitions = ((InternalTopicMetadata)entry.getValue()).numPartitions) <= numPartitions) continue;
                    numPartitions = partitions;
                }
            }
            for (Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) {
                if (!copartitionGroup.contains(entry.getKey())) continue;
                ((InternalTopicMetadata)entry.getValue()).numPartitions = numPartitions;
            }
        }
    }

    private static final class InternalStreamsConfig
    extends StreamsConfig {
        private InternalStreamsConfig(Map<?, ?> props) {
            super(props, false);
        }
    }

    static class InternalTopicMetadata {
        public final InternalTopicConfig config;
        public int numPartitions;

        InternalTopicMetadata(InternalTopicConfig config) {
            this.config = config;
            this.numPartitions = -1;
        }

        public String toString() {
            return "InternalTopicMetadata(config=" + this.config + ", numPartitions=" + this.numPartitions + ")";
        }
    }

    private static class ClientMetadata {
        final HostInfo hostInfo;
        final Set<String> consumers;
        final ClientState state;

        ClientMetadata(String endPoint) {
            if (endPoint != null) {
                String host = Utils.getHost(endPoint);
                Integer port = Utils.getPort(endPoint);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
                }
                this.hostInfo = new HostInfo(host, port);
            } else {
                this.hostInfo = null;
            }
            this.consumers = new HashSet<String>();
            this.state = new ClientState();
        }

        void addConsumer(String consumerMemberId, SubscriptionInfo info) {
            this.consumers.add(consumerMemberId);
            this.state.addPreviousActiveTasks(info.prevTasks());
            this.state.addPreviousStandbyTasks(info.standbyTasks());
            this.state.incrementCapacity();
        }

        public String toString() {
            return "ClientMetadata{hostInfo=" + this.hostInfo + ", consumers=" + this.consumers + ", state=" + this.state + '}';
        }
    }

    private static class AssignedPartition
    implements Comparable<AssignedPartition> {
        public final TaskId taskId;
        public final TopicPartition partition;

        AssignedPartition(TaskId taskId, TopicPartition partition) {
            this.taskId = taskId;
            this.partition = partition;
        }

        @Override
        public int compareTo(AssignedPartition that) {
            return PARTITION_COMPARATOR.compare(this.partition, that.partition);
        }

        public boolean equals(Object o) {
            if (!(o instanceof AssignedPartition)) {
                return false;
            }
            AssignedPartition other = (AssignedPartition)o;
            return this.compareTo(other) == 0;
        }

        public int hashCode() {
            return this.partition.hashCode();
        }
    }

    public static enum Error {
        NONE(0),
        INCOMPLETE_SOURCE_TOPIC_METADATA(1),
        VERSION_PROBING(2);

        private final int code;

        private Error(int code) {
            this.code = code;
        }

        public int code() {
            return this.code;
        }

        public static Error fromCode(int code) {
            switch (code) {
                case 0: {
                    return NONE;
                }
                case 1: {
                    return INCOMPLETE_SOURCE_TOPIC_METADATA;
                }
                case 2: {
                    return VERSION_PROBING;
                }
            }
            throw new IllegalArgumentException("Unknown error code: " + code);
        }
    }
}

