package com.networknt.eventuate.server.common;

import com.networknt.eventuate.server.common.exception.EventuateLocalPublishingException;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/server/common/EventTableChangesToAggregateTopicTranslator.class */
public class EventTableChangesToAggregateTopicTranslator<EVENT> {
    private final LeaderSelector leaderSelector;
    private CdcKafkaPublisher<EVENT> cdcKafkaPublisher;
    private CdcProcessor<EVENT> cdcProcessor;
    private CdcConfig cdcConfig;
    private Logger logger = LoggerFactory.getLogger(getClass());

    public EventTableChangesToAggregateTopicTranslator(CdcKafkaPublisher<EVENT> cdcKafkaPublisher, CdcProcessor<EVENT> cdcProcessor, CuratorFramework curatorFramework, CdcConfig cdcConfig) {
        this.cdcKafkaPublisher = cdcKafkaPublisher;
        this.cdcProcessor = cdcProcessor;
        this.cdcConfig = cdcConfig;
        this.leaderSelector = new LeaderSelector(curatorFramework, cdcConfig.getLeadershipLockPath(), new EventuateLeaderSelectorListener(this));
    }

    @PostConstruct
    public void start() {
        this.logger.info("CDC initialized. Ready to become leader");
        this.leaderSelector.start();
    }

    public void startCapturingChanges() throws InterruptedException {
        this.logger.debug("Starting to capture changes");
        this.cdcKafkaPublisher.start();
        try {
            CdcProcessor<EVENT> cdcProcessor = this.cdcProcessor;
            CdcKafkaPublisher<EVENT> cdcKafkaPublisher = this.cdcKafkaPublisher;
            cdcKafkaPublisher.getClass();
            cdcProcessor.start(cdcKafkaPublisher::handleEvent);
        } catch (Exception e) {
            this.logger.error("cdc process has exception:", (Throwable) e);
            if (e.getCause() instanceof EventuateLocalPublishingException) {
                this.logger.error("Stopping capturing changes due to exception:", (Throwable) e);
                stopCapturingChanges();
            }
        }
        this.logger.debug("Started CDC Kafka publisher");
    }

    public void stop() throws InterruptedException {
        this.logger.debug("stop is called");
        this.leaderSelector.close();
    }

    public void stopCapturingChanges() throws InterruptedException {
        this.logger.debug("Stopping to capture changes");
        this.cdcKafkaPublisher.stop();
        this.cdcProcessor.stop();
    }
}
