package com.networknt.eventuate.cdc.mysql;

import com.networknt.eventuate.cdc.common.BinLogEvent;
import com.networknt.eventuate.cdc.mysql.exception.EventuateLocalPublishingException;
import com.networknt.eventuate.kafka.producer.EventuateKafkaProducer;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/cdc/mysql/MySQLCdcKafkaPublisher.class */
public class MySQLCdcKafkaPublisher<M extends BinLogEvent> {
    private String kafkaBootstrapServers;
    private PublishingStrategy<M> publishingStrategy;
    private DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore;
    private EventuateKafkaProducer producer;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private DuplicatePublishingDetector duplicatePublishingDetector;

    public MySQLCdcKafkaPublisher(DatabaseBinlogOffsetKafkaStore databaseBinlogOffsetKafkaStore, String str, PublishingStrategy<M> publishingStrategy) {
        this.binlogOffsetKafkaStore = databaseBinlogOffsetKafkaStore;
        this.kafkaBootstrapServers = str;
        this.publishingStrategy = publishingStrategy;
        this.duplicatePublishingDetector = new DuplicatePublishingDetector(str);
    }

    public void start() {
        this.logger.debug("Starting MySQLCdcKafkaPublisher");
        this.producer = new EventuateKafkaProducer(this.kafkaBootstrapServers);
        this.logger.debug("Starting MySQLCdcKafkaPublisher");
    }

    public void handleEvent(M m) throws EventuateLocalPublishingException {
        this.logger.trace("Got record " + m.toString());
        String str = this.publishingStrategy.topicFor(m);
        String json = this.publishingStrategy.toJson(m);
        Exception exc = null;
        for (int i = 0; i < 5; i++) {
            try {
                if (!this.duplicatePublishingDetector.shouldBePublished(m.getBinlogFileOffset(), str)) {
                    this.logger.debug("Duplicate event {}", m);
                    return;
                } else {
                    this.producer.send(str, this.publishingStrategy.partitionKeyFor(m), json).get(10L, TimeUnit.SECONDS);
                    this.binlogOffsetKafkaStore.save(m.getBinlogFileOffset());
                    return;
                }
            } catch (Exception e) {
                this.logger.warn("error publishing to " + str, e);
                exc = e;
                try {
                    Thread.sleep(((int) Math.pow(2.0d, i)) * 1000);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
        throw new EventuateLocalPublishingException("error publishing to " + str, exc);
    }

    public void stop() {
        this.logger.debug("Stopping kafka producer");
        if (this.producer != null) {
            this.producer.close();
        }
    }
}
