/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.cdc.mysql.binlog;

import com.networknt.config.Config;
import com.networknt.eventuate.cdc.mysql.binlog.DatabaseBinlogOffsetKafkaStore;
import com.networknt.eventuate.cdc.mysql.binlog.DebeziumBinlogOffsetKafkaStore;
import com.networknt.eventuate.cdc.mysql.binlog.IWriteRowsEventDataParser;
import com.networknt.eventuate.cdc.mysql.binlog.MySQLCdcKafkaPublisher;
import com.networknt.eventuate.cdc.mysql.binlog.MySQLCdcProcessor;
import com.networknt.eventuate.cdc.mysql.binlog.MySqlBinaryLogClient;
import com.networknt.eventuate.cdc.mysql.binlog.SourceTableNameSupplier;
import com.networknt.eventuate.cdc.mysql.binlog.WriteRowsEventDataParser;
import com.networknt.eventuate.jdbc.EventuateSchema;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.kafka.producer.EventuateKafkaProducer;
import com.networknt.eventuate.server.common.CdcConfig;
import com.networknt.eventuate.server.common.CdcKafkaPublisher;
import com.networknt.eventuate.server.common.CdcProcessor;
import com.networknt.eventuate.server.common.EventTableChangesToAggregateTopicTranslator;
import com.networknt.eventuate.server.common.PublishedEvent;
import com.networknt.eventuate.server.common.PublishingStrategy;
import com.networknt.service.SingletonServiceFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import javax.sql.DataSource;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class MySqlCdcServiceInitializer {
    public static String CDC_CONFIG_NAME = "cdc";
    public static CdcConfig cdcConfig = (CdcConfig)Config.getInstance().getJsonObjectConfig(CDC_CONFIG_NAME, CdcConfig.class);
    public static String KAFKA_CONFIG_NAME = "kafka";
    public static KafkaConfig kafkaConfig = (KafkaConfig)Config.getInstance().getJsonObjectConfig(KAFKA_CONFIG_NAME, KafkaConfig.class);

    public EventuateSchema eventuateSchema() {
        return new EventuateSchema();
    }

    public SourceTableNameSupplier sourceTableNameSupplier() {
        return new SourceTableNameSupplier(cdcConfig.getSourceTableName(), "EVENTS");
    }

    public IWriteRowsEventDataParser<PublishedEvent> eventDataParser() {
        DataSource dataSource = SingletonServiceFactory.getBean(DataSource.class);
        EventuateSchema eventuateSchema = SingletonServiceFactory.getBean(EventuateSchema.class);
        SourceTableNameSupplier sourceTableNameSupplier = SingletonServiceFactory.getBean(SourceTableNameSupplier.class);
        return new WriteRowsEventDataParser(dataSource, sourceTableNameSupplier.getSourceTableName(), eventuateSchema);
    }

    public MySqlBinaryLogClient<PublishedEvent> mySqlBinaryLogClient() throws IOException, TimeoutException {
        IWriteRowsEventDataParser eventDataParser = SingletonServiceFactory.getBean(IWriteRowsEventDataParser.class);
        SourceTableNameSupplier sourceTableNameSupplier = SingletonServiceFactory.getBean(SourceTableNameSupplier.class);
        return new MySqlBinaryLogClient<PublishedEvent>(eventDataParser, cdcConfig.getDbUser(), cdcConfig.getDbPass(), cdcConfig.getDbHost(), cdcConfig.getDbPort(), cdcConfig.getBinlogClientId(), sourceTableNameSupplier.getSourceTableName(), cdcConfig.getMySqlBinLogClientName());
    }

    public EventuateKafkaProducer eventuateKafkaProducer() {
        return new EventuateKafkaProducer();
    }

    public DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore() {
        MySqlBinaryLogClient mySqlBinaryLogClient = SingletonServiceFactory.getBean(MySqlBinaryLogClient.class);
        EventuateKafkaProducer eventuateKafkaProducer = SingletonServiceFactory.getBean(EventuateKafkaProducer.class);
        return new DatabaseBinlogOffsetKafkaStore(cdcConfig.getDbHistoryTopicName(), mySqlBinaryLogClient.getName(), eventuateKafkaProducer);
    }

    public DebeziumBinlogOffsetKafkaStore debeziumBinlogOffsetKafkaStore() {
        return new DebeziumBinlogOffsetKafkaStore(cdcConfig.getOldDbHistoryTopicName());
    }

    public CdcProcessor<PublishedEvent> mySQLCdcProcessor() {
        MySqlBinaryLogClient mySqlBinaryLogClient = SingletonServiceFactory.getBean(MySqlBinaryLogClient.class);
        DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore = SingletonServiceFactory.getBean(DatabaseBinlogOffsetKafkaStore.class);
        DebeziumBinlogOffsetKafkaStore debeziumBinlogOffsetKafkaStore = SingletonServiceFactory.getBean(DebeziumBinlogOffsetKafkaStore.class);
        return new MySQLCdcProcessor<PublishedEvent>(mySqlBinaryLogClient, binlogOffsetKafkaStore, debeziumBinlogOffsetKafkaStore);
    }

    public CdcKafkaPublisher<PublishedEvent> mySQLCdcKafkaPublisher() {
        DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore = SingletonServiceFactory.getBean(DatabaseBinlogOffsetKafkaStore.class);
        PublishingStrategy publishingStrategy = SingletonServiceFactory.getBean(PublishingStrategy.class);
        return new MySQLCdcKafkaPublisher<PublishedEvent>(binlogOffsetKafkaStore, kafkaConfig.getBootstrapServers(), publishingStrategy);
    }

    public CuratorFramework curatorFramework() {
        String connectionString = cdcConfig.getZookeeper();
        return MySqlCdcServiceInitializer.makeStartedCuratorClient(connectionString);
    }

    public EventTableChangesToAggregateTopicTranslator<PublishedEvent> mySqlEventTableChangesToAggregateTopicTranslator() {
        CdcKafkaPublisher mySQLCdcKafkaPublisher = SingletonServiceFactory.getBean(CdcKafkaPublisher.class);
        CdcProcessor mySQLCdcProcessor = SingletonServiceFactory.getBean(CdcProcessor.class);
        CuratorFramework curatorFramework = SingletonServiceFactory.getBean(CuratorFramework.class);
        return new EventTableChangesToAggregateTopicTranslator<PublishedEvent>(mySQLCdcKafkaPublisher, mySQLCdcProcessor, curatorFramework, cdcConfig);
    }

    static CuratorFramework makeStartedCuratorClient(String connectionString) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder().retryPolicy(retryPolicy).connectString(connectionString).build();
        client.start();
        return client;
    }
}

