package io.debezium.connector.cassandra;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.health.HealthCheckRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.codahale.metrics.servlets.HealthCheckServlet;
import com.codahale.metrics.servlets.MetricsServlet;
import com.codahale.metrics.servlets.PingServlet;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.debezium.connector.cassandra.network.BuildInfoServlet;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/CassandraConnectorTask.class */
public class CassandraConnectorTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraConnectorTask.class);
    public static final MetricRegistry METRIC_REGISTRY_INSTANCE = new MetricRegistry();
    private final CassandraConnectorConfig config;
    private CassandraConnectorContext taskContext;
    private ProcessorGroup processorGroup;
    private Server httpServer;
    private JmxReporter jmxReporter;

    /* loaded from: input_file:io/debezium/connector/cassandra/CassandraConnectorTask$ProcessorGroup.class */
    public static class ProcessorGroup {
        private final String name;
        private final Set<AbstractProcessor> processors = new HashSet();
        private ExecutorService executorService;

        ProcessorGroup(String str) {
            this.name = str;
        }

        public boolean isRunning() {
            Iterator<AbstractProcessor> it = this.processors.iterator();
            while (it.hasNext()) {
                if (!it.next().isRunning()) {
                    return false;
                }
            }
            return true;
        }

        public String getName() {
            return this.name;
        }

        void addProcessor(AbstractProcessor abstractProcessor) {
            this.processors.add(abstractProcessor);
        }

        void start() {
            this.executorService = Executors.newFixedThreadPool(this.processors.size());
            CassandraConnectorTask.LOGGER.info("Starting processor group {}", getName());
            for (AbstractProcessor abstractProcessor : this.processors) {
                this.executorService.submit(() -> {
                    try {
                        abstractProcessor.initialize();
                        abstractProcessor.start();
                    } catch (Exception e) {
                        CassandraConnectorTask.LOGGER.error("Encountered exception while running {}; stopping all processors in {}", new Object[]{abstractProcessor.getName(), getName(), e});
                        try {
                            stopProcessors();
                        } catch (Exception e2) {
                            CassandraConnectorTask.LOGGER.error("Encountered exceptions while stopping all processors in {}", getName(), e2);
                        }
                    }
                });
            }
        }

        void terminate() throws Exception {
            stopProcessors();
            CassandraConnectorTask.LOGGER.info("Terminating processor group {}", getName());
            if (this.executorService.isShutdown()) {
                return;
            }
            this.executorService.shutdown();
            if (this.executorService.awaitTermination(1L, TimeUnit.SECONDS)) {
                return;
            }
            this.executorService.shutdownNow();
        }

        private void stopProcessors() throws Exception {
            for (AbstractProcessor abstractProcessor : this.processors) {
                abstractProcessor.stop();
                abstractProcessor.destroy();
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length == 0) {
            throw new CassandraConnectorConfigException("CDC config file is required");
        }
        FileInputStream fileInputStream = new FileInputStream(strArr[0]);
        Throwable th = null;
        try {
            try {
                Properties properties = new Properties();
                properties.load(fileInputStream);
                fileInputStream.close();
                new CassandraConnectorTask(new CassandraConnectorConfig(properties)).run();
                if (fileInputStream != null) {
                    if (0 == 0) {
                        fileInputStream.close();
                        return;
                    }
                    try {
                        fileInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fileInputStream != null) {
                if (th != null) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th4;
        }
    }

    public CassandraConnectorTask(CassandraConnectorConfig cassandraConnectorConfig) {
        this.config = cassandraConnectorConfig;
    }

    void run() throws Exception {
        try {
            LOGGER.info("Initializing Cassandra connector task context ...");
            this.taskContext = new CassandraConnectorContext(this.config);
            LOGGER.info("Starting processor group ...");
            initProcessorGroup();
            this.processorGroup.start();
            LOGGER.info("Starting HTTP server ...");
            initHttpServer();
            this.httpServer.start();
            LOGGER.info("Starting JMX reporter ...");
            initJmxReporter(this.config.connectorName());
            this.jmxReporter.start();
            while (this.processorGroup.isRunning()) {
                Thread.sleep(1000L);
            }
        } finally {
            stopAll();
        }
    }

    private void stopAll() throws Exception {
        if (this.processorGroup != null) {
            this.processorGroup.terminate();
            LOGGER.info("Stopped processor group");
        }
        if (this.httpServer != null) {
            this.httpServer.stop();
            LOGGER.info("Stopped HTTP server");
        }
        if (this.jmxReporter != null) {
            this.jmxReporter.stop();
            LOGGER.info("Stopped JMX reporter");
        }
        if (this.taskContext != null) {
            this.taskContext.cleanUp();
            LOGGER.info("Cleaned up Cassandra connector task context");
        }
    }

    private void initHttpServer() {
        int httpPort = this.config.httpPort();
        LOGGER.info("HTTP port is {}", Integer.valueOf(httpPort));
        this.httpServer = new Server(httpPort);
        ServletContextHandler servletContextHandler = new ServletContextHandler(1);
        servletContextHandler.setContextPath("/");
        this.httpServer.setHandler(servletContextHandler);
        servletContextHandler.addServlet(new ServletHolder(new PingServlet()), "/ping");
        servletContextHandler.addServlet(new ServletHolder(new BuildInfoServlet(getBuildInfoMap(getClass()))), "/buildinfo");
        servletContextHandler.addServlet(new ServletHolder(new MetricsServlet(METRIC_REGISTRY_INSTANCE)), "/metrics");
        servletContextHandler.addServlet(new ServletHolder(new HealthCheckServlet(registerHealthCheck())), "/health");
    }

    private void initProcessorGroup() throws IOException {
        this.processorGroup = new ProcessorGroup("Cassandra Connector Task");
        this.processorGroup.addProcessor(new SchemaProcessor(this.taskContext));
        this.processorGroup.addProcessor(new CommitLogProcessor(this.taskContext));
        this.processorGroup.addProcessor(new SnapshotProcessor(this.taskContext));
        this.processorGroup.addProcessor(new QueueProcessor(this.taskContext));
        if (this.taskContext.getCassandraConnectorConfig().postProcessEnabled()) {
            this.processorGroup.addProcessor(new CommitLogPostProcessor(this.taskContext));
        }
    }

    private void initJmxReporter(String str) {
        this.jmxReporter = JmxReporter.forRegistry(METRIC_REGISTRY_INSTANCE).inDomain(str).build();
    }

    private HealthCheckRegistry registerHealthCheck() {
        CassandraConnectorTaskHealthCheck cassandraConnectorTaskHealthCheck = new CassandraConnectorTaskHealthCheck(this.processorGroup, this.taskContext.getCassandraClient());
        HealthCheckRegistry healthCheckRegistry = new HealthCheckRegistry();
        healthCheckRegistry.register("cassandra-cdc-health-check", cassandraConnectorTaskHealthCheck);
        return healthCheckRegistry;
    }

    private static Map<String, String> getBuildInfoMap(Class<?> cls) {
        HashMap hashMap = new HashMap();
        hashMap.put(SourceInfo.DEBEZIUM_VERSION_KEY, cls.getPackage().getImplementationVersion());
        hashMap.put("service_name", cls.getPackage().getImplementationTitle());
        return hashMap;
    }
}
