/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.server.common;

import com.networknt.eventuate.server.common.CdcConfig;
import com.networknt.eventuate.server.common.CdcKafkaPublisher;
import com.networknt.eventuate.server.common.CdcProcessor;
import com.networknt.eventuate.server.common.EventuateLeaderSelectorListener;
import com.networknt.eventuate.server.common.exception.EventuateLocalPublishingException;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventTableChangesToAggregateTopicTranslator<EVENT> {
    private final LeaderSelector leaderSelector;
    private CdcKafkaPublisher<EVENT> cdcKafkaPublisher;
    private CdcProcessor<EVENT> cdcProcessor;
    private CdcConfig cdcConfig;
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public EventTableChangesToAggregateTopicTranslator(CdcKafkaPublisher<EVENT> cdcKafkaPublisher, CdcProcessor<EVENT> cdcProcessor, CuratorFramework client, CdcConfig cdcConfig) {
        this.cdcKafkaPublisher = cdcKafkaPublisher;
        this.cdcProcessor = cdcProcessor;
        this.cdcConfig = cdcConfig;
        this.leaderSelector = new LeaderSelector(client, cdcConfig.getLeadershipLockPath(), new EventuateLeaderSelectorListener(this));
    }

    @PostConstruct
    public void start() {
        this.logger.info("CDC initialized. Ready to become leader");
        this.leaderSelector.start();
    }

    public void startCapturingChanges() throws InterruptedException {
        block2: {
            this.logger.debug("Starting to capture changes");
            this.cdcKafkaPublisher.start();
            try {
                this.cdcProcessor.start(this.cdcKafkaPublisher::handleEvent);
            }
            catch (Exception e2) {
                this.logger.error("cdc process has exception:", e2);
                if (!(e2.getCause() instanceof EventuateLocalPublishingException)) break block2;
                this.logger.error("Stopping capturing changes due to exception:", e2);
                this.stopCapturingChanges();
            }
        }
        this.logger.debug("Started CDC Kafka publisher");
    }

    public void stop() throws InterruptedException {
        this.logger.debug("stop is called");
        this.leaderSelector.close();
    }

    public void stopCapturingChanges() throws InterruptedException {
        this.logger.debug("Stopping to capture changes");
        this.cdcKafkaPublisher.stop();
        this.cdcProcessor.stop();
    }
}

