package com.networknt.eventuate.cdc.mysql;

import com.networknt.eventuate.cdc.common.BinLogEvent;
import com.networknt.eventuate.cdc.common.CdcConfig;
import com.networknt.eventuate.cdc.mysql.exception.EventuateLocalPublishingException;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/cdc/mysql/EventTableChangesToAggregateTopicTranslator.class */
public class EventTableChangesToAggregateTopicTranslator<M extends BinLogEvent> {
    private final LeaderSelector leaderSelector;
    private MySQLCdcKafkaPublisher<M> mySQLCdcKafkaPublisher;
    private MySQLCdcProcessor<M> mySQLCdcProcessor;
    private CdcConfig cdcConfig;
    private Logger logger = LoggerFactory.getLogger(getClass());

    /* renamed from: com.networknt.eventuate.cdc.mysql.EventTableChangesToAggregateTopicTranslator$2, reason: invalid class name */
    /* loaded from: input_file:com/networknt/eventuate/cdc/mysql/EventTableChangesToAggregateTopicTranslator$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$state$ConnectionState = new int[ConnectionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.SUSPENDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.RECONNECTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.LOST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public EventTableChangesToAggregateTopicTranslator(MySQLCdcKafkaPublisher<M> mySQLCdcKafkaPublisher, MySQLCdcProcessor<M> mySQLCdcProcessor, CuratorFramework curatorFramework, CdcConfig cdcConfig) {
        this.mySQLCdcKafkaPublisher = mySQLCdcKafkaPublisher;
        this.mySQLCdcProcessor = mySQLCdcProcessor;
        this.cdcConfig = cdcConfig;
        this.leaderSelector = new LeaderSelector(curatorFramework, "/eventuatelocal/cdc/leader", new LeaderSelectorListener() { // from class: com.networknt.eventuate.cdc.mysql.EventTableChangesToAggregateTopicTranslator.1
            public void takeLeadership(CuratorFramework curatorFramework2) throws Exception {
                takeLeadership();
            }

            private void takeLeadership() throws InterruptedException {
                EventTableChangesToAggregateTopicTranslator.this.logger.info("Taking leadership");
                try {
                    try {
                        EventTableChangesToAggregateTopicTranslator.this.startCapturingChanges();
                        EventTableChangesToAggregateTopicTranslator.this.logger.debug("TakeLeadership returning");
                    } catch (Throwable th) {
                        EventTableChangesToAggregateTopicTranslator.this.logger.error("In takeLeadership", th);
                        if (!(th instanceof RuntimeException)) {
                            throw new RuntimeException(th);
                        }
                    }
                } catch (Throwable th2) {
                    EventTableChangesToAggregateTopicTranslator.this.logger.debug("TakeLeadership returning");
                    throw th2;
                }
            }

            public void stateChanged(CuratorFramework curatorFramework2, ConnectionState connectionState) {
                EventTableChangesToAggregateTopicTranslator.this.logger.debug("StateChanged: {}", connectionState);
                switch (AnonymousClass2.$SwitchMap$org$apache$curator$framework$state$ConnectionState[connectionState.ordinal()]) {
                    case 1:
                        resignLeadership();
                        return;
                    case 2:
                        try {
                            takeLeadership();
                            return;
                        } catch (InterruptedException e) {
                            EventTableChangesToAggregateTopicTranslator.this.logger.error("While handling RECONNECTED", e);
                            return;
                        }
                    case 3:
                        resignLeadership();
                        return;
                    default:
                        return;
                }
            }

            private void resignLeadership() {
                EventTableChangesToAggregateTopicTranslator.this.logger.info("Resigning leadership");
                try {
                    EventTableChangesToAggregateTopicTranslator.this.stopCapturingChanges();
                } catch (InterruptedException e) {
                    EventTableChangesToAggregateTopicTranslator.this.logger.error("While handling SUSPEND", e);
                }
            }
        });
    }

    @PostConstruct
    public void start() {
        this.logger.info("CDC initialized. Ready to become leader");
        this.leaderSelector.start();
    }

    public void startCapturingChanges() throws InterruptedException {
        this.logger.debug("Starting to capture changes");
        new CdcStartupValidator(this.cdcConfig.getJdbcUrl(), this.cdcConfig.getDbUser(), this.cdcConfig.getDbPass(), this.cdcConfig.getKafka()).validateEnvironment();
        this.mySQLCdcKafkaPublisher.start();
        try {
            this.mySQLCdcProcessor.start(binLogEvent -> {
                try {
                    this.mySQLCdcKafkaPublisher.handleEvent(binLogEvent);
                } catch (EventuateLocalPublishingException e) {
                    throw new RuntimeException(e);
                }
            });
        } catch (Exception e) {
            if (e.getCause() instanceof EventuateLocalPublishingException) {
                this.logger.error("Stopping capturing changes due to exception:", e);
                stopCapturingChanges();
            }
        }
        this.logger.debug("Started CDC Kafka publisher");
    }

    public void stop() throws InterruptedException {
        this.leaderSelector.close();
    }

    public void stopCapturingChanges() throws InterruptedException {
        this.logger.debug("Stopping to capture changes");
        this.mySQLCdcKafkaPublisher.stop();
        this.mySQLCdcProcessor.stop();
    }
}
