package io.mantisrx.connector.kafka.source;

import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategy;
import java.util.Collection;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/connector/kafka/source/KafkaConsumerRebalanceListener.class */
public class KafkaConsumerRebalanceListener<S> implements ConsumerRebalanceListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerRebalanceListener.class);
    private final KafkaConsumer<?, ?> consumer;
    private final TopicPartitionStateManager partitionStateManager;
    private final CheckpointStrategy<S> checkpointStrategy;

    public KafkaConsumerRebalanceListener(KafkaConsumer<?, ?> kafkaConsumer, TopicPartitionStateManager topicPartitionStateManager, CheckpointStrategy<S> checkpointStrategy) {
        this.consumer = kafkaConsumer;
        this.partitionStateManager = topicPartitionStateManager;
        this.checkpointStrategy = checkpointStrategy;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        LOGGER.info("partitions revoked, resetting partition state: {}", collection.toString());
        collection.stream().forEach(topicPartition -> {
            this.partitionStateManager.resetCounters(topicPartition);
        });
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        LOGGER.info("new partitions assigned: {}", collection.toString());
        try {
            for (TopicPartition topicPartition : collection) {
                Optional<S> filter = this.checkpointStrategy.loadCheckpoint(topicPartition).filter(obj -> {
                    return obj instanceof OffsetAndMetadata;
                });
                Class<OffsetAndMetadata> cls = OffsetAndMetadata.class;
                OffsetAndMetadata.class.getClass();
                filter.map(cls::cast).ifPresent(offsetAndMetadata -> {
                    long offset = offsetAndMetadata.offset();
                    LOGGER.info("seeking consumer to checkpoint'ed offset {} for partition {} on assignment", Long.valueOf(offset), topicPartition);
                    try {
                        this.consumer.seek(topicPartition, offset);
                    } catch (Exception e) {
                        LOGGER.error("caught exception seeking consumer to offset {} on topic partition {}", new Object[]{Long.valueOf(offset), topicPartition, e});
                    }
                });
            }
        } catch (Exception e) {
            LOGGER.error("caught exception on partition assignment {}", collection, e);
        }
    }
}
