/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.cdc.mysql.binlog;

import com.networknt.config.Config;
import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.kafka.consumer.ConsumerPropertiesFactory;
import com.networknt.eventuate.kafka.consumer.EventuateKafkaConsumer;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import com.networknt.eventuate.server.common.PublishedEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DuplicatePublishingDetector {
    private static final Logger logger = LoggerFactory.getLogger(DuplicatePublishingDetector.class);
    static final KafkaConfig config = (KafkaConfig)Config.getInstance().getJsonObjectConfig("kafka", KafkaConfig.class);
    private Map<String, Optional<BinlogFileOffset>> maxOffsetsForTopics = new HashMap<String, Optional<BinlogFileOffset>>();
    private boolean okToProcess = false;

    public boolean shouldBePublished(BinlogFileOffset sourceBinlogFileOffset, String destinationTopic) {
        if (this.okToProcess) {
            return true;
        }
        Optional max = this.maxOffsetsForTopics.computeIfAbsent(destinationTopic, this::fetchMaxOffsetFor);
        logger.info("For topic {} max is {}", (Object)destinationTopic, (Object)max);
        this.okToProcess = max.map(sourceBinlogFileOffset::isSameOrAfter).orElse(true);
        logger.info("max = {}, sourceBinlogFileOffset = {} okToProcess = {}", max, sourceBinlogFileOffset, this.okToProcess);
        return this.okToProcess;
    }

    private Optional<BinlogFileOffset> fetchMaxOffsetFor(String destinationTopic) {
        String subscriberId = "duplicate-checker-" + destinationTopic + "-" + System.currentTimeMillis();
        Properties consumerProperties = ConsumerPropertiesFactory.makeConsumerProperties(config, subscriberId);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(consumerProperties);
        List<PartitionInfo> partitions = EventuateKafkaConsumer.verifyTopicExistsBeforeSubscribing(consumer, destinationTopic);
        List<TopicPartition> topicPartitionList = partitions.stream().map(p -> new TopicPartition(destinationTopic, p.partition())).collect(Collectors.toList());
        consumer.assign(topicPartitionList);
        consumer.poll(0L);
        logger.info("Seeking to end");
        try {
            consumer.seekToEnd(topicPartitionList);
        }
        catch (IllegalStateException e) {
            logger.error("Error seeking " + destinationTopic, e);
            return Optional.empty();
        }
        List<PartitionOffset> positions = topicPartitionList.stream().map(tp -> new PartitionOffset(tp.partition(), consumer.position((TopicPartition)tp) - 1L)).filter(po -> po.offset >= 0L).collect(Collectors.toList());
        logger.info("Seeking to positions=" + positions);
        positions.forEach(po -> consumer.seek(new TopicPartition(destinationTopic, po.partition), po.offset));
        logger.info("Polling for records");
        ArrayList records = new ArrayList();
        while (records.size() < positions.size()) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000L);
            consumerRecords.forEach(records::add);
        }
        logger.info("Got records: {}", (Object)records.size());
        Optional<BinlogFileOffset> max = StreamSupport.stream(records.spliterator(), false).map(record -> {
            logger.info(String.format("got record: %s %s %s", record.partition(), record.offset(), record.value()));
            return JSonMapper.fromJson((String)record.value(), PublishedEvent.class).getBinlogFileOffset();
        }).filter(binlogFileOffset -> binlogFileOffset != null).max((blfo1, blfo2) -> blfo1.isSameOrAfter((BinlogFileOffset)blfo2) ? 1 : -1);
        consumer.close();
        return max;
    }

    class PartitionOffset {
        public final int partition;
        public final long offset;

        public String toString() {
            return "PartitionOffset{partition=" + this.partition + ", offset=" + this.offset + '}';
        }

        public PartitionOffset(int partition, long offset) {
            this.partition = partition;
            this.offset = offset;
        }
    }
}

