package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManager.class */
public class TaskManager {
    private final Logger log;
    private final AssignedTasks active;
    private final AssignedTasks standby;
    private final ChangelogReader changelogReader;
    private final String logPrefix;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final StreamThread.AbstractTaskCreator taskCreator;
    private final StreamThread.AbstractTaskCreator standbyTaskCreator;
    private ThreadMetadataProvider threadMetadataProvider;
    private Consumer<byte[], byte[]> consumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskManager(ChangelogReader changelogReader, String str, Consumer<byte[], byte[]> consumer, StreamThread.AbstractTaskCreator abstractTaskCreator, StreamThread.AbstractTaskCreator abstractTaskCreator2, AssignedTasks assignedTasks, AssignedTasks assignedTasks2) {
        this.changelogReader = changelogReader;
        this.logPrefix = str;
        this.restoreConsumer = consumer;
        this.taskCreator = abstractTaskCreator;
        this.standbyTaskCreator = abstractTaskCreator2;
        this.active = assignedTasks;
        this.standby = assignedTasks2;
        this.log = new LogContext(str).logger(getClass());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createTasks(Collection<TopicPartition> collection) {
        if (this.threadMetadataProvider == null) {
            throw new IllegalStateException(this.logPrefix + "taskIdProvider has not been initialized while adding stream tasks. This should not happen.");
        }
        if (this.consumer == null) {
            throw new IllegalStateException(this.logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
        }
        this.changelogReader.reset();
        this.standby.closeNonAssignedSuspendedTasks(this.threadMetadataProvider.standbyTasks());
        this.active.closeNonAssignedSuspendedTasks(this.threadMetadataProvider.activeTasks());
        addStreamTasks(collection);
        addStandbyTasks();
        Set<TopicPartition> uninitializedPartitions = this.active.uninitializedPartitions();
        this.log.trace("pausing partitions: {}", uninitializedPartitions);
        this.consumer.pause(uninitializedPartitions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setThreadMetadataProvider(ThreadMetadataProvider threadMetadataProvider) {
        this.threadMetadataProvider = threadMetadataProvider;
    }

    private void addStreamTasks(Collection<TopicPartition> collection) {
        Map<TaskId, Set<TopicPartition>> activeTasks = this.threadMetadataProvider.activeTasks();
        if (activeTasks.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        this.log.debug("Adding assigned tasks as active: {}", activeTasks);
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : activeTasks.entrySet()) {
            TaskId key = entry.getKey();
            Set<TopicPartition> value = entry.getValue();
            if (collection.containsAll(value)) {
                try {
                    if (!this.active.maybeResumeSuspendedTask(key, value)) {
                        hashMap.put(key, value);
                    }
                } catch (StreamsException e) {
                    this.log.error("Failed to resume an active task {} due to the following error:", key, e);
                    throw e;
                }
            } else {
                this.log.warn("Task {} owned partitions {} are not contained in the assignment {}", key, value, collection);
            }
        }
        if (hashMap.isEmpty()) {
            return;
        }
        this.log.trace("New active tasks to be created: {}", hashMap);
        Iterator<Task> it = this.taskCreator.createTasks(this.consumer, hashMap).iterator();
        while (it.hasNext()) {
            this.active.addNewTask(it.next());
        }
    }

    private void addStandbyTasks() {
        Map<TaskId, Set<TopicPartition>> standbyTasks = this.threadMetadataProvider.standbyTasks();
        if (standbyTasks.isEmpty()) {
            return;
        }
        this.log.debug("Adding assigned standby tasks {}", standbyTasks);
        HashMap hashMap = new HashMap();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
            TaskId key = entry.getKey();
            Set<TopicPartition> value = entry.getValue();
            if (!this.standby.maybeResumeSuspendedTask(key, value)) {
                hashMap.put(key, value);
            }
        }
        if (hashMap.isEmpty()) {
            return;
        }
        this.log.trace("New standby tasks to be created: {}", hashMap);
        Iterator<Task> it = this.standbyTaskCreator.createTasks(this.consumer, hashMap).iterator();
        while (it.hasNext()) {
            this.standby.addNewTask(it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> activeTaskIds() {
        return this.active.allAssignedTaskIds();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> standbyTaskIds() {
        return this.standby.allAssignedTaskIds();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> prevActiveTaskIds() {
        return this.active.previousTaskIds();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void suspendTasksAndState() {
        this.log.debug("Suspending all active tasks {} and standby tasks {}", this.active.runningTaskIds(), this.standby.runningTaskIds());
        AtomicReference atomicReference = new AtomicReference(null);
        atomicReference.compareAndSet(null, this.active.suspend());
        atomicReference.compareAndSet(null, this.standby.suspend());
        this.restoreConsumer.assign(Collections.emptyList());
        Exception exc = (Exception) atomicReference.get();
        if (exc != null) {
            throw new StreamsException(this.logPrefix + "failed to suspend stream tasks", exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown(boolean z) {
        this.log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", this.active.runningTaskIds(), this.standby.runningTaskIds(), this.active.previousTaskIds(), this.standby.previousTaskIds());
        this.active.close(z);
        this.standby.close(z);
        try {
            this.threadMetadataProvider.close();
        } catch (Throwable th) {
            this.log.error("Failed to close KafkaStreamClient due to the following error:", th);
        }
        this.restoreConsumer.assign(Collections.emptyList());
        this.taskCreator.close();
        this.standbyTaskCreator.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> suspendedActiveTaskIds() {
        return this.active.previousTaskIds();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> suspendedStandbyTaskIds() {
        return this.standby.previousTaskIds();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Task activeTask(TopicPartition topicPartition) {
        return this.active.runningTaskFor(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Task standbyTask(TopicPartition topicPartition) {
        return this.standby.runningTaskFor(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> activeTasks() {
        return this.active.runningTaskMap();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> standbyTasks() {
        return this.standby.runningTaskMap();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setConsumer(Consumer<byte[], byte[]> consumer) {
        this.consumer = consumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean updateNewAndRestoringTasks() {
        Set<TopicPartition> initializeNewTasks = this.active.initializeNewTasks();
        this.standby.initializeNewTasks();
        initializeNewTasks.addAll(this.active.updateRestored(this.changelogReader.restore(this.active)));
        if (!initializeNewTasks.isEmpty()) {
            this.log.trace("resuming partitions {}", initializeNewTasks);
            this.consumer.resume(initializeNewTasks);
        }
        if (!this.active.allTasksRunning()) {
            return false;
        }
        assignStandbyPartitions();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasActiveRunningTasks() {
        return this.active.hasRunningTasks();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasStandbyRunningTasks() {
        return this.standby.hasRunningTasks();
    }

    private void assignStandbyPartitions() {
        Collection<Task> running = this.standby.running();
        HashMap hashMap = new HashMap();
        Iterator<Task> it = running.iterator();
        while (it.hasNext()) {
            hashMap.putAll(it.next().checkpointedOffsets());
        }
        this.restoreConsumer.assign(hashMap.keySet());
        for (Map.Entry entry : hashMap.entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            long longValue = ((Long) entry.getValue()).longValue();
            if (longValue >= 0) {
                this.restoreConsumer.seek(topicPartition, longValue);
            } else {
                this.restoreConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int commitAll() {
        return this.active.commit() + this.standby.commit();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int process() {
        return this.active.process();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int punctuate() {
        return this.active.punctuate();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int maybeCommitActiveTasks() {
        return this.active.maybeCommit();
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append(str).append("\tActive tasks:\n");
        sb.append(this.active.toString(str + "\t\t"));
        sb.append(str).append("\tStandby tasks:\n");
        sb.append(this.standby.toString(str + "\t\t"));
        return sb.toString();
    }
}
