/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.state.ProcessingShard;
import io.confluent.parallelconsumer.state.ShardKey;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ShardManager.class);
    private final ParallelConsumerOptions options;
    private final WorkManager<K, V> wm;
    private final Clock clock;
    private final Map<ShardKey, ProcessingShard<K, V>> processingShards = new ConcurrentHashMap<ShardKey, ProcessingShard<K, V>>();
    private final NavigableSet<WorkContainer<?, ?>> retryQueue = new TreeSet<WorkContainer>(Comparator.comparing(wc -> wc.getDelayUntilRetryDue()));
    private Optional<ShardKey> iterationResumePoint = Optional.empty();

    Optional<ProcessingShard<K, V>> getShard(ShardKey key) {
        return Optional.ofNullable(this.processingShards.get(key));
    }

    private LoopingResumingIterator<ShardKey, ProcessingShard<K, V>> getIterator(Optional<ShardKey> iterationResumePoint) {
        return new LoopingResumingIterator<ShardKey, ProcessingShard<K, V>>(iterationResumePoint, this.processingShards);
    }

    ShardKey computeShardKey(WorkContainer<?, ?> wc) {
        return ShardKey.of(wc, this.options.getOrdering());
    }

    public long getNumberOfWorkQueuedInShardsAwaitingSelection() {
        return this.processingShards.values().stream().mapToLong(ProcessingShard::getCountOfWorkAwaitingSelection).sum();
    }

    public boolean workIsWaitingToBeProcessed() {
        Collection<ProcessingShard<K, V>> allShards = this.processingShards.values();
        return allShards.parallelStream().anyMatch(ProcessingShard::workIsWaitingToBeProcessed);
    }

    void removeAnyShardsReferencedBy(NavigableMap<Long, WorkContainer<K, V>> workFromRemovedPartition) {
        for (WorkContainer work : workFromRemovedPartition.values()) {
            this.removeShardFor(work);
        }
    }

    private void removeShardFor(WorkContainer<K, V> work) {
        ShardKey shardKey = this.computeShardKey(work);
        if (this.processingShards.containsKey(shardKey)) {
            ProcessingShard<K, V> shard = this.processingShards.get(shardKey);
            shard.remove(work.offset());
            this.removeShardIfEmpty(shardKey);
        } else {
            log.trace("Shard referenced by WC: {} with shard key: {} already removed", work, (Object)shardKey);
        }
        this.retryQueue.remove(work);
    }

    public void addWorkContainer(WorkContainer<K, V> wc) {
        ShardKey shardKey = this.computeShardKey(wc);
        ProcessingShard shard = this.processingShards.computeIfAbsent(shardKey, ignore -> new ProcessingShard<K, V>(shardKey, this.options, this.wm.getPm()));
        shard.addWorkContainer(wc);
    }

    void removeShardIfEmpty(ShardKey key) {
        Optional<ProcessingShard<K, V>> shardOpt = this.getShard(key);
        boolean keyOrdering = this.options.getOrdering().equals((Object)ParallelConsumerOptions.ProcessingOrder.KEY);
        if (keyOrdering && shardOpt.isPresent() && shardOpt.get().isEmpty()) {
            log.trace("Removing empty shard (key: {})", (Object)key);
            this.processingShards.remove(key);
        }
    }

    public void onSuccess(WorkContainer<?, ?> wc) {
        this.retryQueue.remove(wc);
        ShardKey key = this.computeShardKey(wc);
        Optional<ProcessingShard<K, V>> shardOptional = this.getShard(key);
        if (shardOptional.isPresent()) {
            shardOptional.get().onSuccess(wc);
            this.removeShardIfEmpty(key);
        } else {
            log.trace("Dropping successful result for revoked partition {}. Record in question was: {}", (Object)key, wc.getCr());
        }
    }

    public void onFailure(WorkContainer<?, ?> wc) {
        log.debug("Work FAILED");
        this.retryQueue.add(wc);
    }

    public Optional<Duration> getLowestRetryTime() {
        for (WorkContainer<?, ?> workContainer : this.retryQueue) {
            if (!workContainer.isNotInFlight()) continue;
            return Optional.of(workContainer.getDelayUntilRetryDue());
        }
        return Optional.empty();
    }

    public List<WorkContainer<K, V>> getWorkIfAvailable(int requestedMaxWorkToRetrieve) {
        LoopingResumingIterator<ShardKey, ProcessingShard<K, V>> shardQueueIterator = this.getIterator(this.iterationResumePoint);
        ArrayList<WorkContainer<K, V>> workFromAllShards = new ArrayList<WorkContainer<K, V>>();
        while (workFromAllShards.size() < requestedMaxWorkToRetrieve && shardQueueIterator.hasNext()) {
            Object shardEntry = shardQueueIterator.next();
            ProcessingShard shard = (ProcessingShard)shardEntry.getValue();
            int remainingToGet = requestedMaxWorkToRetrieve - workFromAllShards.size();
            ArrayList work = shard.getWorkIfAvailable(remainingToGet);
            workFromAllShards.addAll(work);
        }
        if (workFromAllShards.size() >= requestedMaxWorkToRetrieve) {
            log.debug("Work taken is now over max (iteration resume point is {})", this.iterationResumePoint);
        }
        this.updateResumePoint(shardQueueIterator);
        return workFromAllShards;
    }

    private void updateResumePoint(LoopingResumingIterator<ShardKey, ProcessingShard<K, V>> shardQueueIterator) {
        if (shardQueueIterator.hasNext()) {
            Object shardEntry = shardQueueIterator.next();
            this.iterationResumePoint = Optional.of((ShardKey)shardEntry.getKey());
            log.debug("Work taken is now over max, stopping (saving iteration resume point {})", this.iterationResumePoint);
        }
    }

    public ShardManager(ParallelConsumerOptions options, WorkManager<K, V> wm, Clock clock) {
        this.options = options;
        this.wm = wm;
        this.clock = clock;
    }

    public ParallelConsumerOptions getOptions() {
        return this.options;
    }

    private Clock getClock() {
        return this.clock;
    }
}

