/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.debezium;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.eventuate.cdccore.AggregateTopicMapping;
import com.networknt.eventuate.cdccore.PublishedEvent;
import com.networknt.eventuate.cdccore.kafka.producer.CdcKafkaProducer;
import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.relational.history.KafkaDatabaseHistory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventTableChangesToAggregateTopicRelay {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private CdcKafkaProducer producer;
    private EmbeddedEngine engine;
    public static String kafkaBootstrapServers;
    private final String dbHost;
    private final int dbPort;
    private final String dbUser;
    private final String dbPassword;
    private final LeaderSelector leaderSelector;

    public EventTableChangesToAggregateTopicRelay(String kafkaBootstrapServers, String dbHost, int dbPort, String dbUser, String dbPassword, CuratorFramework client) {
        EventTableChangesToAggregateTopicRelay.kafkaBootstrapServers = kafkaBootstrapServers;
        this.dbHost = dbHost;
        this.dbPort = dbPort;
        this.dbUser = dbUser;
        this.dbPassword = dbPassword;
        this.leaderSelector = new LeaderSelector(client, "/eventuatelocal/cdc/leader", new LeaderSelectorListener(){

            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                this.takeLeadership();
            }

            private void takeLeadership() throws InterruptedException {
                EventTableChangesToAggregateTopicRelay.this.logger.info("Taking leadership");
                try {
                    CompletableFuture<Object> completion = EventTableChangesToAggregateTopicRelay.this.startCapturingChanges();
                    try {
                        completion.get();
                    }
                    catch (InterruptedException e) {
                        EventTableChangesToAggregateTopicRelay.this.logger.error("Interrupted while taking leadership");
                    }
                }
                catch (Throwable t) {
                    EventTableChangesToAggregateTopicRelay.this.logger.error("In takeLeadership", t);
                    throw t instanceof RuntimeException ? (RuntimeException)t : new RuntimeException(t);
                }
                finally {
                    EventTableChangesToAggregateTopicRelay.this.logger.debug("TakeLeadership returning");
                }
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                EventTableChangesToAggregateTopicRelay.this.logger.debug("StateChanged: {}", (Object)newState);
                switch (newState) {
                    case SUSPENDED: {
                        this.resignLeadership();
                        break;
                    }
                    case RECONNECTED: {
                        try {
                            this.takeLeadership();
                        }
                        catch (InterruptedException e) {
                            EventTableChangesToAggregateTopicRelay.this.logger.error("While handling RECONNECTED", e);
                        }
                        break;
                    }
                    case LOST: {
                        this.resignLeadership();
                    }
                }
            }

            private void resignLeadership() {
                EventTableChangesToAggregateTopicRelay.this.logger.info("Resigning leadership");
                try {
                    EventTableChangesToAggregateTopicRelay.this.stopCapturingChanges();
                }
                catch (InterruptedException e) {
                    EventTableChangesToAggregateTopicRelay.this.logger.error("While handling SUSPEND", e);
                }
            }
        });
    }

    @PostConstruct
    public void start() {
        this.logger.info("CDC initialized. Ready to become leader");
        this.leaderSelector.start();
    }

    public CompletableFuture<Object> startCapturingChanges() throws InterruptedException {
        this.logger.debug("Starting to capture changes");
        this.producer = new CdcKafkaProducer(kafkaBootstrapServers);
        String connectorName = "my-sql-connector";
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with("connector.class", "io.debezium.connector.mysql.MySqlConnector").with("offset.storage", KafkaOffsetBackingStore.class.getName()).with("bootstrap.servers", kafkaBootstrapServers).with("offset.storage.topic", "eventuate.local.cdc." + connectorName + ".offset.storage").with("poll.interval.ms", 50)).with("offset.flush.interval.ms", 6000)).with("name", connectorName).with("database.hostname", this.dbHost).with("database.port", this.dbPort)).with("database.user", this.dbUser).with("database.password", this.dbPassword).with("database.server.id", 85744)).with("database.server.name", "light-event-sourcing").with("database.history", KafkaDatabaseHistory.class.getName()).with("database.history.kafka.topic", "eventuate.local.cdc." + connectorName + ".history.kafka.topic").with("database.history.kafka.bootstrap.servers", kafkaBootstrapServers).build();
        CompletableFuture<Object> completion = new CompletableFuture<Object>();
        this.engine = EmbeddedEngine.create().using((success, message, throwable) -> {
            if (success) {
                completion.complete(null);
            } else {
                completion.completeExceptionally(new RuntimeException("Engine failed to start" + message, throwable));
            }
        }).using(config).notifying(this::handleEvent).build();
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(() -> {
            try {
                this.engine.run();
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        });
        this.logger.debug("Started engine");
        return completion;
    }

    @PreDestroy
    public void stop() throws InterruptedException {
        this.leaderSelector.close();
    }

    public void stopCapturingChanges() throws InterruptedException {
        this.logger.debug("Stopping to capture changes");
        if (this.producer != null) {
            this.producer.close();
        }
        if (this.engine != null) {
            this.logger.debug("Stopping Debezium engine");
            this.engine.stop();
            try {
                while (!this.engine.await(30L, TimeUnit.SECONDS)) {
                    this.logger.debug("Waiting another 30 seconds for the embedded engine to shut down");
                }
            }
            catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
    }

    private void handleEvent(SourceRecord sourceRecord) {
        this.logger.trace("Got record");
        String topic = sourceRecord.topic();
        if ("my-app-connector.eventuate.events".equals(topic)) {
            Struct value = (Struct)sourceRecord.value();
            Struct after = value.getStruct("after");
            String eventId = after.getString("event_id");
            String eventType = after.getString("event_type");
            String eventData = after.getString("event_data");
            String entityType = after.getString("entity_type");
            String entityId = after.getString("entity_id");
            String triggeringEvent = after.getString("triggering_event");
            PublishedEvent pe = new PublishedEvent(eventId, entityId, entityType, eventData, eventType);
            String aggregateTopic = AggregateTopicMapping.aggregateTypeToTopic(entityType);
            String json = EventTableChangesToAggregateTopicRelay.toJson(pe);
            if (this.logger.isInfoEnabled()) {
                this.logger.debug("Publishing triggeringEvent={}, event={}", (Object)triggeringEvent, (Object)json);
            }
            try {
                this.producer.send(aggregateTopic, entityId, json).get(10L, TimeUnit.SECONDS);
            }
            catch (RuntimeException e) {
                this.logger.error("error publishing to " + aggregateTopic, e);
                throw e;
            }
            catch (Throwable e) {
                this.logger.error("error publishing to " + aggregateTopic, e);
                throw new RuntimeException(e);
            }
        }
    }

    public static String toJson(PublishedEvent eventInfo) {
        ObjectMapper om = new ObjectMapper();
        try {
            return om.writeValueAsString(eventInfo);
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

