package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/ShardManager.class */
public class ShardManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ShardManager.class);
    private final ParallelConsumerOptions options;
    private final Map<Object, NavigableMap<Long, WorkContainer<K, V>>> processingShards = new HashMap();

    private Map<Object, NavigableMap<Long, WorkContainer<K, V>>> getShards() {
        return this.processingShards;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NavigableMap<Long, WorkContainer<K, V>> getShard(Object obj) {
        return this.processingShards.get(obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LoopingResumingIterator<Object, NavigableMap<Long, WorkContainer<K, V>>> getIterator(Optional<Object> optional) {
        return new LoopingResumingIterator<>(optional, getShards());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Object computeShardKey(ConsumerRecord<K, V> consumerRecord) {
        switch (this.options.getOrdering()) {
            case KEY:
                return consumerRecord.key();
            case PARTITION:
            case UNORDERED:
                return new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    public WorkContainer<K, V> getWorkContainerForRecord(ConsumerRecord<K, V> consumerRecord) {
        return (WorkContainer) this.processingShards.get(computeShardKey(consumerRecord)).get(Long.valueOf(consumerRecord.offset()));
    }

    public int getWorkQueuedInShardsCount() {
        int i = 0;
        Iterator<Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>>> it = this.processingShards.entrySet().iterator();
        while (it.hasNext()) {
            i += it.next().getValue().size();
        }
        return i;
    }

    public boolean workIsWaitingToBeProcessed() {
        Iterator<NavigableMap<Long, WorkContainer<K, V>>> it = this.processingShards.values().iterator();
        while (it.hasNext()) {
            if (!it.next().isEmpty()) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeShardsFoundIn(NavigableMap<Long, WorkContainer<K, V>> navigableMap) {
        Iterator<WorkContainer<K, V>> it = navigableMap.values().iterator();
        while (it.hasNext()) {
            removeWorkFromShard(it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeWorkFromShard(WorkContainer<K, V> workContainer) {
        Object computeShardKey = computeShardKey(workContainer.getCr());
        log.debug("Removing expired work {} for shard key: {}", workContainer, computeShardKey);
        this.processingShards.remove(computeShardKey);
    }

    public void addWorkContainer(WorkContainer<K, V> workContainer) {
        this.processingShards.computeIfAbsent(computeShardKey(workContainer.getCr()), obj -> {
            return new ConcurrentSkipListMap();
        }).put(Long.valueOf(workContainer.offset()), workContainer);
    }

    void removeShard(Object obj) {
        getShards().remove(obj);
    }

    public void onSuccess(ConsumerRecord<K, V> consumerRecord) {
        Object computeShardKey = computeShardKey(consumerRecord);
        NavigableMap<Long, WorkContainer<K, V>> shard = getShard(computeShardKey);
        if (shard == null) {
            throw new NullPointerException(StringUtils.msg("Shard is missing for key {}", computeShardKey));
        }
        shard.remove(Long.valueOf(consumerRecord.offset()));
        if (this.options.getOrdering().equals(ParallelConsumerOptions.ProcessingOrder.KEY) && shard.isEmpty()) {
            log.trace("Removing empty shard (key: {})", computeShardKey);
            removeShard(computeShardKey);
        }
    }

    public ShardManager(ParallelConsumerOptions parallelConsumerOptions) {
        this.options = parallelConsumerOptions;
    }

    public ParallelConsumerOptions getOptions() {
        return this.options;
    }
}
