package com.networknt.eventuate.cdc.mysql.binlog;

import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.kafka.consumer.ConsumerPropertiesFactory;
import com.networknt.eventuate.kafka.consumer.EventuateKafkaConsumer;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import com.networknt.eventuate.server.common.PublishedEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.cli.HelpFormatter;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/cdc/mysql/binlog/DuplicatePublishingDetector.class */
public class DuplicatePublishingDetector {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private Map<String, Optional<BinlogFileOffset>> maxOffsetsForTopics = new HashMap();
    private boolean okToProcess = false;
    private String kafkaBootstrapServers;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/networknt/eventuate/cdc/mysql/binlog/DuplicatePublishingDetector$PartitionOffset.class */
    public class PartitionOffset {
        public final int partition;
        public final long offset;

        public String toString() {
            return "PartitionOffset{partition=" + this.partition + ", offset=" + this.offset + '}';
        }

        public PartitionOffset(int i, long j) {
            this.partition = i;
            this.offset = j;
        }
    }

    public DuplicatePublishingDetector(String str) {
        this.kafkaBootstrapServers = str;
    }

    public boolean shouldBePublished(BinlogFileOffset binlogFileOffset, String str) {
        if (this.okToProcess) {
            return true;
        }
        Optional<BinlogFileOffset> computeIfAbsent = this.maxOffsetsForTopics.computeIfAbsent(str, this::fetchMaxOffsetFor);
        this.logger.info("For topic {} max is {}", str, computeIfAbsent);
        binlogFileOffset.getClass();
        this.okToProcess = ((Boolean) computeIfAbsent.map(binlogFileOffset::isSameOrAfter).orElse(true)).booleanValue();
        this.logger.info("max = {}, sourceBinlogFileOffset = {} okToProcess = {}", computeIfAbsent, binlogFileOffset, Boolean.valueOf(this.okToProcess));
        return this.okToProcess;
    }

    private Optional<BinlogFileOffset> fetchMaxOffsetFor(String str) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(ConsumerPropertiesFactory.makeConsumerProperties(this.kafkaBootstrapServers, "duplicate-checker-" + str + HelpFormatter.DEFAULT_OPT_PREFIX + System.currentTimeMillis()));
        List list = (List) EventuateKafkaConsumer.verifyTopicExistsBeforeSubscribing(kafkaConsumer, str).stream().map(partitionInfo -> {
            return new TopicPartition(str, partitionInfo.partition());
        }).collect(Collectors.toList());
        kafkaConsumer.assign(list);
        kafkaConsumer.poll(0L);
        this.logger.info("Seeking to end");
        try {
            kafkaConsumer.seekToEnd(list);
            List list2 = (List) list.stream().map(topicPartition -> {
                return new PartitionOffset(topicPartition.partition(), kafkaConsumer.position(topicPartition) - 1);
            }).filter(partitionOffset -> {
                return partitionOffset.offset >= 0;
            }).collect(Collectors.toList());
            this.logger.info("Seeking to positions=" + list2);
            list2.forEach(partitionOffset2 -> {
                kafkaConsumer.seek(new TopicPartition(str, partitionOffset2.partition), partitionOffset2.offset);
            });
            this.logger.info("Polling for records");
            ArrayList arrayList = new ArrayList();
            while (arrayList.size() < list2.size()) {
                ConsumerRecords poll = kafkaConsumer.poll(1000L);
                arrayList.getClass();
                poll.forEach((v1) -> {
                    r1.add(v1);
                });
            }
            this.logger.info("Got records: {}", Integer.valueOf(arrayList.size()));
            Optional<BinlogFileOffset> max = StreamSupport.stream(arrayList.spliterator(), false).map(consumerRecord -> {
                this.logger.info(String.format("got record: %s %s %s", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.value()));
                return ((PublishedEvent) JSonMapper.fromJson((String) consumerRecord.value(), PublishedEvent.class)).getBinlogFileOffset();
            }).filter(binlogFileOffset -> {
                return binlogFileOffset != null;
            }).max((binlogFileOffset2, binlogFileOffset3) -> {
                return binlogFileOffset2.isSameOrAfter(binlogFileOffset3) ? 1 : -1;
            });
            kafkaConsumer.close();
            return max;
        } catch (IllegalStateException e) {
            this.logger.error("Error seeking " + str, (Throwable) e);
            return Optional.empty();
        }
    }
}
