/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.cdc.polling;

import com.networknt.config.Config;
import com.networknt.eventuate.cdc.polling.EventPollingDataProvider;
import com.networknt.eventuate.cdc.polling.PollingCdcKafkaPublisher;
import com.networknt.eventuate.cdc.polling.PollingCdcProcessor;
import com.networknt.eventuate.cdc.polling.PollingDao;
import com.networknt.eventuate.jdbc.EventuateSchema;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.kafka.producer.EventuateKafkaProducer;
import com.networknt.eventuate.server.common.CdcConfig;
import com.networknt.eventuate.server.common.CdcKafkaPublisher;
import com.networknt.eventuate.server.common.CdcProcessor;
import com.networknt.eventuate.server.common.EventTableChangesToAggregateTopicTranslator;
import com.networknt.eventuate.server.common.PublishedEvent;
import com.networknt.eventuate.server.common.PublishingStrategy;
import com.networknt.service.SingletonServiceFactory;
import javax.sql.DataSource;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class PollingCdcServiceInitializer {
    public static String CDC_CONFIG_NAME = "cdc";
    public static CdcConfig cdcConfig = (CdcConfig)Config.getInstance().getJsonObjectConfig(CDC_CONFIG_NAME, CdcConfig.class);
    public static String KAFKA_CONFIG_NAME = "kafka";
    public static KafkaConfig kafkaConfig = (KafkaConfig)Config.getInstance().getJsonObjectConfig(KAFKA_CONFIG_NAME, KafkaConfig.class);

    public EventuateSchema eventuateSchema() {
        return new EventuateSchema();
    }

    public PollingDao pollingDao() {
        DataSource ds = SingletonServiceFactory.getBean(DataSource.class);
        EventPollingDataProvider pollingDataProvider = SingletonServiceFactory.getBean(EventPollingDataProvider.class);
        return new PollingDao(pollingDataProvider, ds, cdcConfig.getMaxEventsPerPolling(), cdcConfig.getMaxAttemptsForPolling(), cdcConfig.getPollingRetryIntervalInMilliseconds());
    }

    public EventuateKafkaProducer eventuateKafkaProducer() {
        return new EventuateKafkaProducer();
    }

    public CdcProcessor<PublishedEvent> pollingCdcProcessor() {
        PollingDao pollingDao = SingletonServiceFactory.getBean(PollingDao.class);
        return new PollingCdcProcessor(pollingDao, cdcConfig.getPollingIntervalInMilliseconds());
    }

    public CdcKafkaPublisher<PublishedEvent> pollingCdcKafkaPublisher() {
        PublishingStrategy publishingStrategy = SingletonServiceFactory.getBean(PublishingStrategy.class);
        return new PollingCdcKafkaPublisher<PublishedEvent>(kafkaConfig.getBootstrapServers(), publishingStrategy);
    }

    public CuratorFramework curatorFramework() {
        String connectionString = cdcConfig.getZookeeper();
        return PollingCdcServiceInitializer.makeStartedCuratorClient(connectionString);
    }

    public EventTableChangesToAggregateTopicTranslator<PublishedEvent> pollingEventTableChangesToAggregateTopicTranslator() {
        CdcKafkaPublisher mySQLCdcKafkaPublisher = SingletonServiceFactory.getBean(CdcKafkaPublisher.class);
        CdcProcessor mySQLCdcProcessor = SingletonServiceFactory.getBean(CdcProcessor.class);
        CuratorFramework curatorFramework = SingletonServiceFactory.getBean(CuratorFramework.class);
        return new EventTableChangesToAggregateTopicTranslator<PublishedEvent>(mySQLCdcKafkaPublisher, mySQLCdcProcessor, curatorFramework, cdcConfig);
    }

    static CuratorFramework makeStartedCuratorClient(String connectionString) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder().retryPolicy(retryPolicy).connectString(connectionString).build();
        client.start();
        return client;
    }
}

