package io.mantisrx.connector.kafka.source;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.netty.util.internal.ConcurrentSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/connector/kafka/source/TopicPartitionStateManager.class */
public class TopicPartitionStateManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicPartitionStateManager.class);
    private final int checkpointReadyCheckDelayMs;
    private final Counter waitingForAckCount;
    public static final long DEFAULT_LAST_READ_OFFSET = 0;
    private final ConcurrentMap<TopicPartition, State> partitionState = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/connector/kafka/source/TopicPartitionStateManager$State.class */
    public class State {
        private final AtomicLong lastReadOffset;
        private final ConcurrentSet<Long> unAckedOffsets;

        private State() {
            this.lastReadOffset = new AtomicLong(0L);
            this.unAckedOffsets = new ConcurrentSet<>();
        }
    }

    public TopicPartitionStateManager(Registry registry, String str, int i) {
        this.checkpointReadyCheckDelayMs = i;
        this.waitingForAckCount = registry.counter("waitingOnAck", new String[]{"client-id", str});
    }

    public void recordMessageRead(TopicPartition topicPartition, long j) {
        if (!this.partitionState.containsKey(topicPartition)) {
            this.partitionState.putIfAbsent(topicPartition, new State());
        }
        this.partitionState.get(topicPartition).unAckedOffsets.add(Long.valueOf(j));
        this.partitionState.get(topicPartition).lastReadOffset.set(j);
    }

    public void recordMessageAck(TopicPartition topicPartition, long j) {
        if (this.partitionState.containsKey(topicPartition)) {
            this.partitionState.get(topicPartition).unAckedOffsets.remove(Long.valueOf(j));
        }
    }

    public Optional<Long> getLastOffset(TopicPartition topicPartition) {
        return !this.partitionState.containsKey(topicPartition) ? Optional.empty() : Optional.of(Long.valueOf(this.partitionState.get(topicPartition).lastReadOffset.get()));
    }

    private boolean allMessagesAcked(TopicPartition topicPartition) {
        return !this.partitionState.containsKey(topicPartition) || this.partitionState.get(topicPartition).unAckedOffsets.size() == 0;
    }

    public Map<TopicPartition, OffsetAndMetadata> createCheckpoint(Collection<TopicPartition> collection) {
        if (this.partitionState.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap(collection.size());
        for (TopicPartition topicPartition : collection) {
            while (!allMessagesAcked(topicPartition)) {
                try {
                    this.waitingForAckCount.increment();
                    Thread.sleep(this.checkpointReadyCheckDelayMs);
                } catch (InterruptedException e) {
                    LOGGER.info("thread interrupted when creating checkpoint for {}", topicPartition);
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("thread interrupted when creating checkpoint", e);
                }
            }
            State state = this.partitionState.get(topicPartition);
            Optional ofNullable = Optional.ofNullable(state != null ? Long.valueOf(state.lastReadOffset.get()) : null);
            if (ofNullable.isPresent() && ((Long) ofNullable.get()).longValue() != 0) {
                hashMap.put(topicPartition, new OffsetAndMetadata(((Long) ofNullable.get()).longValue() + 1, String.valueOf(System.currentTimeMillis())));
            }
        }
        return hashMap;
    }

    public void resetCounters(TopicPartition topicPartition) {
        if (this.partitionState.containsKey(topicPartition)) {
            this.partitionState.get(topicPartition).unAckedOffsets.clear();
            this.partitionState.get(topicPartition).lastReadOffset.set(0L);
        }
    }

    public void resetCounters() {
        LOGGER.info("resetting all counters");
        if (this.partitionState.isEmpty()) {
            return;
        }
        this.partitionState.values().stream().forEach(state -> {
            state.unAckedOffsets.clear();
            state.lastReadOffset.set(0L);
        });
    }
}
