/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AssignedTasks;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.StreamTask;

class AssignedStreamsTasks
extends AssignedTasks<StreamTask>
implements RestoringTasks {
    private final Map<TaskId, StreamTask> restoring = new HashMap<TaskId, StreamTask>();
    private final Set<TopicPartition> restoredPartitions = new HashSet<TopicPartition>();
    private final Map<TopicPartition, StreamTask> restoringByPartition = new HashMap<TopicPartition, StreamTask>();

    AssignedStreamsTasks(LogContext logContext) {
        super(logContext, "stream task");
    }

    @Override
    public StreamTask restoringTaskFor(TopicPartition partition) {
        return this.restoringByPartition.get(partition);
    }

    @Override
    List<StreamTask> allTasks() {
        List<StreamTask> tasks = super.allTasks();
        tasks.addAll(this.restoring.values());
        return tasks;
    }

    @Override
    Set<TaskId> allAssignedTaskIds() {
        Set<TaskId> taskIds = super.allAssignedTaskIds();
        taskIds.addAll(this.restoring.keySet());
        return taskIds;
    }

    @Override
    boolean allTasksRunning() {
        return super.allTasksRunning() && this.restoring.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    RuntimeException closeAllRestoringTasks() {
        RuntimeException exception = null;
        this.log.trace("Closing all restoring stream tasks {}", (Object)this.restoring.keySet());
        Iterator<StreamTask> restoringTaskIterator = this.restoring.values().iterator();
        while (restoringTaskIterator.hasNext()) {
            StreamTask task = restoringTaskIterator.next();
            this.log.debug("Closing restoring task {}", (Object)task.id());
            try {
                task.closeStateManager(true);
            }
            catch (RuntimeException e) {
                this.log.error("Failed to remove restoring task {} due to the following error:", (Object)task.id(), (Object)e);
                if (exception != null) continue;
                exception = e;
            }
            finally {
                restoringTaskIterator.remove();
            }
        }
        this.restoring.clear();
        this.restoredPartitions.clear();
        this.restoringByPartition.clear();
        return exception;
    }

    void updateRestored(Collection<TopicPartition> restored) {
        if (restored.isEmpty()) {
            return;
        }
        this.log.trace("Stream task changelog partitions that have completed restoring so far: {}", (Object)restored);
        this.restoredPartitions.addAll(restored);
        Iterator<Map.Entry<TaskId, StreamTask>> it = this.restoring.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, StreamTask> entry = it.next();
            StreamTask task = entry.getValue();
            if (this.restoredPartitions.containsAll(task.changelogPartitions())) {
                this.transitionToRunning(task);
                it.remove();
                this.log.trace("Stream task {} completed restoration as all its changelog partitions {} have been applied to restore state", (Object)task.id(), (Object)task.changelogPartitions());
                continue;
            }
            if (!this.log.isTraceEnabled()) continue;
            HashSet<TopicPartition> outstandingPartitions = new HashSet<TopicPartition>(task.changelogPartitions());
            outstandingPartitions.removeAll(this.restoredPartitions);
            this.log.trace("Stream task {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}", (Object)task.id(), (Object)outstandingPartitions);
        }
        if (this.allTasksRunning()) {
            this.restoredPartitions.clear();
        }
    }

    void addToRestoring(StreamTask task) {
        this.restoring.put(task.id(), task);
        for (TopicPartition topicPartition : task.partitions()) {
            this.restoringByPartition.put(topicPartition, task);
        }
        for (TopicPartition topicPartition : task.changelogPartitions()) {
            this.restoringByPartition.put(topicPartition, task);
        }
    }

    int maybeCommitPerUserRequested() {
        int committed = 0;
        RuntimeException firstException = null;
        Iterator it = this.running().iterator();
        while (it.hasNext()) {
            StreamTask task = (StreamTask)it.next();
            try {
                if (!task.commitRequested() || !task.commitNeeded()) continue;
                task.commit();
                ++committed;
                this.log.debug("Committed active task {} per user request in", (Object)task.id());
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to commit {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)task.id());
                RuntimeException fatalException = this.closeZombieTask(task);
                if (fatalException != null) {
                    throw fatalException;
                }
                it.remove();
                throw e;
            }
            catch (RuntimeException t) {
                this.log.error("Failed to commit StreamTask {} due to the following error:", (Object)task.id(), (Object)t);
                if (firstException != null) continue;
                firstException = t;
            }
        }
        if (firstException != null) {
            throw firstException;
        }
        return committed;
    }

    Map<TopicPartition, Long> recordsToDelete() {
        HashMap<TopicPartition, Long> recordsToDelete = new HashMap<TopicPartition, Long>();
        for (StreamTask task : this.running.values()) {
            recordsToDelete.putAll(task.purgableOffsets());
        }
        return recordsToDelete;
    }

    int process(long now) {
        int processed = 0;
        Iterator it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask task = (StreamTask)it.next().getValue();
            try {
                if (!task.isProcessable(now) || !task.process()) continue;
                ++processed;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to process stream task {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)task.id());
                RuntimeException fatalException = this.closeZombieTask(task);
                if (fatalException != null) {
                    throw fatalException;
                }
                it.remove();
                throw e;
            }
            catch (RuntimeException e) {
                this.log.error("Failed to process stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return processed;
    }

    int punctuate() {
        int punctuated = 0;
        Iterator it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask task = (StreamTask)it.next().getValue();
            try {
                if (task.maybePunctuateStreamTime()) {
                    ++punctuated;
                }
                if (!task.maybePunctuateSystemTime()) continue;
                ++punctuated;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to punctuate stream task {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)task.id());
                RuntimeException fatalException = this.closeZombieTask(task);
                if (fatalException != null) {
                    throw fatalException;
                }
                it.remove();
                throw e;
            }
            catch (KafkaException e) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return punctuated;
    }

    @Override
    void clear() {
        super.clear();
        this.restoring.clear();
        this.restoringByPartition.clear();
        this.restoredPartitions.clear();
    }

    @Override
    public String toString(String indent) {
        StringBuilder builder = new StringBuilder();
        builder.append(super.toString(indent));
        this.describe(builder, this.restoring.values(), indent, "Restoring:");
        return builder.toString();
    }

    Collection<StreamTask> restoringTasks() {
        return Collections.unmodifiableCollection(this.restoring.values());
    }
}

