package com.networknt.tram.cdc.server;

import com.networknt.config.Config;
import com.networknt.eventuate.cdc.polling.PollingCdcKafkaPublisher;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.kafka.producer.EventuateKafkaProducer;
import com.networknt.eventuate.server.common.CdcConfig;
import com.networknt.eventuate.server.common.EventTableChangesToAggregateTopicTranslator;
import com.networknt.eventuate.server.common.PublishingStrategy;
import com.networknt.server.StartupHookProvider;
import com.networknt.service.SingletonServiceFactory;
import com.networknt.tram.cdc.mysql.connector.MessageWithDestination;
import com.networknt.tram.cdc.polling.connector.MessagePollingDataProvider;
import com.networknt.tram.cdc.polling.connector.PollingCdcProcessor;
import com.networknt.tram.cdc.polling.connector.PollingDao;
import javax.sql.DataSource;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/* loaded from: input_file:com/networknt/tram/cdc/server/CdcServerStartupHookProvider.class */
public class CdcServerStartupHookProvider implements StartupHookProvider {
    static String CDC_CONFIG_NAME = "cdc";
    static CdcConfig cdcConfig = (CdcConfig) Config.getInstance().getJsonObjectConfig(CDC_CONFIG_NAME, CdcConfig.class);
    static String KAFKA_CONFIG_NAME = KafkaConfig.CONFIG_NAME;
    static KafkaConfig kafkaConfig = (KafkaConfig) Config.getInstance().getJsonObjectConfig(KAFKA_CONFIG_NAME, KafkaConfig.class);
    static String PULLING_CONFIG_NAME = "pulling";
    static PullingConfig pullingConfig = (PullingConfig) Config.getInstance().getJsonObjectConfig(PULLING_CONFIG_NAME, PullingConfig.class);
    public static CuratorFramework curatorFramework;
    public static EventTableChangesToAggregateTopicTranslator<MessageWithDestination> translator;

    @Override // com.networknt.server.StartupHookProvider
    public void onStartup() {
        curatorFramework = makeStartedCuratorClient(cdcConfig.getZookeeper());
        MessagePollingDataProvider messagePollingDataProvider = (MessagePollingDataProvider) SingletonServiceFactory.getBean(MessagePollingDataProvider.class);
        if (pullingConfig != null) {
            messagePollingDataProvider.reset(pullingConfig.getTableName(), pullingConfig.getIdField(), pullingConfig.getPublishedField(), pullingConfig.getHeaders(), pullingConfig.getDestination(), pullingConfig.getPayload());
        }
        new EventuateKafkaProducer();
        translator = new EventTableChangesToAggregateTopicTranslator<>(new PollingCdcKafkaPublisher(kafkaConfig.getBootstrapServers(), (PublishingStrategy) SingletonServiceFactory.getBean(PublishingStrategy.class)), new PollingCdcProcessor(new PollingDao(messagePollingDataProvider, (DataSource) SingletonServiceFactory.getBean(DataSource.class), cdcConfig.getMaxEventsPerPolling(), cdcConfig.getMaxAttemptsForPolling(), cdcConfig.getPollingRetryIntervalInMilliseconds()), cdcConfig.getPollingIntervalInMilliseconds()), curatorFramework, cdcConfig);
        translator.start();
        System.out.println("CdcServerStartupHookProvider is called");
    }

    CuratorFramework makeStartedCuratorClient(String str) {
        CuratorFramework build = CuratorFrameworkFactory.builder().connectString(str).retryPolicy(new ExponentialBackoffRetry(2000, 6, 2000)).build();
        build.start();
        return build;
    }
}
