package com.networknt.eventuate.debezium;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.eventuate.cdccore.AggregateTopicMapping;
import com.networknt.eventuate.cdccore.PublishedEvent;
import com.networknt.eventuate.cdccore.kafka.producer.CdcKafkaProducer;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.relational.history.KafkaDatabaseHistory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/debezium/EventTableChangesToAggregateTopicRelay.class */
public class EventTableChangesToAggregateTopicRelay {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private CdcKafkaProducer producer;
    private EmbeddedEngine engine;
    public static String kafkaBootstrapServers;
    private final String dbHost;
    private final int dbPort;
    private final String dbUser;
    private final String dbPassword;
    private final LeaderSelector leaderSelector;

    /* renamed from: com.networknt.eventuate.debezium.EventTableChangesToAggregateTopicRelay$2, reason: invalid class name */
    /* loaded from: input_file:com/networknt/eventuate/debezium/EventTableChangesToAggregateTopicRelay$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$state$ConnectionState = new int[ConnectionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.SUSPENDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.RECONNECTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.LOST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public EventTableChangesToAggregateTopicRelay(String str, String str2, int i, String str3, String str4, CuratorFramework curatorFramework) {
        kafkaBootstrapServers = str;
        this.dbHost = str2;
        this.dbPort = i;
        this.dbUser = str3;
        this.dbPassword = str4;
        this.leaderSelector = new LeaderSelector(curatorFramework, "/eventuatelocal/cdc/leader", new LeaderSelectorListener() { // from class: com.networknt.eventuate.debezium.EventTableChangesToAggregateTopicRelay.1
            @Override // org.apache.curator.framework.recipes.leader.LeaderSelectorListener
            public void takeLeadership(CuratorFramework curatorFramework2) throws Exception {
                takeLeadership();
            }

            private void takeLeadership() throws InterruptedException {
                EventTableChangesToAggregateTopicRelay.this.logger.info("Taking leadership");
                try {
                    try {
                        try {
                            EventTableChangesToAggregateTopicRelay.this.startCapturingChanges().get();
                        } catch (InterruptedException e) {
                            EventTableChangesToAggregateTopicRelay.this.logger.error("Interrupted while taking leadership");
                        }
                    } catch (Throwable th) {
                        EventTableChangesToAggregateTopicRelay.this.logger.error("In takeLeadership", th);
                        if (!(th instanceof RuntimeException)) {
                            throw new RuntimeException(th);
                        }
                    }
                } finally {
                    EventTableChangesToAggregateTopicRelay.this.logger.debug("TakeLeadership returning");
                }
            }

            @Override // org.apache.curator.framework.state.ConnectionStateListener
            public void stateChanged(CuratorFramework curatorFramework2, ConnectionState connectionState) {
                EventTableChangesToAggregateTopicRelay.this.logger.debug("StateChanged: {}", connectionState);
                switch (AnonymousClass2.$SwitchMap$org$apache$curator$framework$state$ConnectionState[connectionState.ordinal()]) {
                    case 1:
                        resignLeadership();
                        return;
                    case 2:
                        try {
                            takeLeadership();
                            return;
                        } catch (InterruptedException e) {
                            EventTableChangesToAggregateTopicRelay.this.logger.error("While handling RECONNECTED", (Throwable) e);
                            return;
                        }
                    case 3:
                        resignLeadership();
                        return;
                    default:
                        return;
                }
            }

            private void resignLeadership() {
                EventTableChangesToAggregateTopicRelay.this.logger.info("Resigning leadership");
                try {
                    EventTableChangesToAggregateTopicRelay.this.stopCapturingChanges();
                } catch (InterruptedException e) {
                    EventTableChangesToAggregateTopicRelay.this.logger.error("While handling SUSPEND", (Throwable) e);
                }
            }
        });
    }

    @PostConstruct
    public void start() {
        this.logger.info("CDC initialized. Ready to become leader");
        this.leaderSelector.start();
    }

    public CompletableFuture<Object> startCapturingChanges() throws InterruptedException {
        this.logger.debug("Starting to capture changes");
        this.producer = new CdcKafkaProducer(kafkaBootstrapServers);
        Configuration build = Configuration.create().with(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "io.debezium.connector.mysql.MySqlConnector").with("offset.storage", KafkaOffsetBackingStore.class.getName()).with("bootstrap.servers", kafkaBootstrapServers).with(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "eventuate.local.cdc.my-sql-connector.offset.storage").with("poll.interval.ms", 50).with(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, 6000).with("name", "my-sql-connector").with("database.hostname", this.dbHost).with("database.port", this.dbPort).with("database.user", this.dbUser).with("database.password", this.dbPassword).with("database.server.id", 85744).with("database.server.name", "light-event-sourcing").with("database.history", KafkaDatabaseHistory.class.getName()).with("database.history.kafka.topic", "eventuate.local.cdc.my-sql-connector.history.kafka.topic").with("database.history.kafka.bootstrap.servers", kafkaBootstrapServers).build();
        CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        this.engine = EmbeddedEngine.create().using((z, str, th) -> {
            if (z) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(new RuntimeException("Engine failed to start" + str, th));
            }
        }).using(build).notifying(this::handleEvent).build();
        Executors.newCachedThreadPool().execute(() -> {
            try {
                this.engine.run();
            } catch (Throwable th2) {
                th2.printStackTrace();
            }
        });
        this.logger.debug("Started engine");
        return completableFuture;
    }

    @PreDestroy
    public void stop() throws InterruptedException {
        this.leaderSelector.close();
    }

    public void stopCapturingChanges() throws InterruptedException {
        this.logger.debug("Stopping to capture changes");
        if (this.producer != null) {
            this.producer.close();
        }
        if (this.engine != null) {
            this.logger.debug("Stopping Debezium engine");
            this.engine.stop();
            while (!this.engine.await(30L, TimeUnit.SECONDS)) {
                try {
                    this.logger.debug("Waiting another 30 seconds for the embedded engine to shut down");
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    return;
                }
            }
        }
    }

    private void handleEvent(SourceRecord sourceRecord) {
        this.logger.trace("Got record");
        if ("my-app-connector.eventuate.events".equals(sourceRecord.topic())) {
            Struct struct = ((Struct) sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
            String string = struct.getString("event_id");
            String string2 = struct.getString("event_type");
            String string3 = struct.getString("event_data");
            String string4 = struct.getString("entity_type");
            String string5 = struct.getString("entity_id");
            String string6 = struct.getString("triggering_event");
            PublishedEvent publishedEvent = new PublishedEvent(string, string5, string4, string3, string2);
            String aggregateTypeToTopic = AggregateTopicMapping.aggregateTypeToTopic(string4);
            String json = toJson(publishedEvent);
            if (this.logger.isInfoEnabled()) {
                this.logger.debug("Publishing triggeringEvent={}, event={}", string6, json);
            }
            try {
                this.producer.send(aggregateTypeToTopic, string5, json).get(10L, TimeUnit.SECONDS);
            } catch (RuntimeException e) {
                this.logger.error("error publishing to " + aggregateTypeToTopic, (Throwable) e);
                throw e;
            } catch (Throwable th) {
                this.logger.error("error publishing to " + aggregateTypeToTopic, th);
                throw new RuntimeException(th);
            }
        }
    }

    public static String toJson(PublishedEvent publishedEvent) {
        try {
            return new ObjectMapper().writeValueAsString(publishedEvent);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}
