package io.mantisrx.server.worker;

import com.sampullara.cli.Args;
import com.sampullara.cli.Argument;
import io.mantisrx.common.metrics.netty.MantisNettyEventsListenerFactory;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.loader.ClassLoaderHandle;
import io.mantisrx.runtime.loader.SinkSubscriptionStateHandler;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.Service;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.worker.config.ConfigurationFactory;
import io.mantisrx.server.worker.config.StaticPropertiesConfigurationFactory;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import mantis.io.reactivex.netty.RxNetty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/mantisrx/server/worker/MantisWorker.class */
public class MantisWorker extends BaseService {
    private static final Logger logger = LoggerFactory.getLogger(MantisWorker.class);

    @Argument(alias = "p", description = "Specify a configuration file", required = false)
    private static String propFile = "worker.properties";
    private CountDownLatch blockUntilShutdown;
    private List<Service> mantisServices;

    public MantisWorker(ConfigurationFactory configurationFactory, io.mantisrx.server.master.client.config.ConfigurationFactory configurationFactory2) {
        this(configurationFactory, (Optional<Job>) Optional.empty());
    }

    public MantisWorker(ConfigurationFactory configurationFactory, final Optional<Job> optional) {
        this.blockUntilShutdown = new CountDownLatch(1);
        this.mantisServices = new LinkedList();
        System.setProperty("rx.ring-buffer.size", "1024");
        final WorkerConfiguration config = configurationFactory.getConfig();
        final HighAvailabilityServices createHAServices = HighAvailabilityServicesUtil.createHAServices(config);
        this.mantisServices.add(new Service() { // from class: io.mantisrx.server.worker.MantisWorker.1
            public void start() {
                createHAServices.startAsync().awaitRunning();
            }

            public void shutdown() {
                createHAServices.stopAsync().awaitTerminated();
            }

            public void enterActiveMode() {
            }

            public String toString() {
                return "HighAvailabilityServices Service";
            }
        });
        final MantisMasterGateway masterClientApi = createHAServices.getMasterClientApi();
        Thread thread = new Thread() { // from class: io.mantisrx.server.worker.MantisWorker.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                MantisWorker.this.shutdown();
            }
        };
        thread.setDaemon(true);
        Runtime.getRuntime().addShutdownHook(thread);
        final PublishSubject create = PublishSubject.create();
        this.mantisServices.add(new Service() { // from class: io.mantisrx.server.worker.MantisWorker.3
            private RuntimeTaskImpl runtimeTaskImpl;
            private Subscription vmStatusSubscription;

            public void start() {
                ClassLoader contextClassLoader;
                if (Thread.currentThread().getContextClassLoader() == null) {
                    contextClassLoader = ClassLoader.getSystemClassLoader();
                    MantisWorker.logger.info("Choosing system classloader {}", contextClassLoader);
                } else {
                    contextClassLoader = Thread.currentThread().getContextClassLoader();
                    MantisWorker.logger.info("Choosing current thread classloader {}", contextClassLoader);
                }
                Observable first = create.asObservable().first();
                WorkerConfiguration workerConfiguration = config;
                MantisMasterGateway mantisMasterGateway = masterClientApi;
                ClassLoader classLoader = contextClassLoader;
                Optional optional2 = optional;
                first.subscribe(wrappedExecuteStageRequest -> {
                    try {
                        this.runtimeTaskImpl = new RuntimeTaskImpl();
                        this.runtimeTaskImpl.initialize(wrappedExecuteStageRequest, workerConfiguration, mantisMasterGateway, ClassLoaderHandle.fixed(classLoader).getOrResolveClassLoader(ImmutableList.of(), ImmutableList.of()), SinkSubscriptionStateHandler.Factory.forEphemeralJobsThatNeedToBeKilledInAbsenceOfSubscriber(mantisMasterGateway, Clock.systemDefaultZone()));
                        this.runtimeTaskImpl.setJob(optional2);
                        this.runtimeTaskImpl.startAsync();
                    } catch (Exception e) {
                        MantisWorker.logger.error("Failed to start task, request: {}", wrappedExecuteStageRequest, e);
                        throw new RuntimeException("Failed to start task", e);
                    }
                });
            }

            public void shutdown() {
                if (this.runtimeTaskImpl != null) {
                    try {
                        this.runtimeTaskImpl.stopAsync().awaitTerminated();
                    } finally {
                        this.vmStatusSubscription.unsubscribe();
                    }
                }
            }

            public void enterActiveMode() {
            }

            public String toString() {
                return "TaskService";
            }
        });
    }

    private static Properties loadProperties(String str) {
        Properties properties = new Properties();
        try {
            InputStream findResourceAsStream = findResourceAsStream(str);
            Throwable th = null;
            try {
                try {
                    properties.load(findResourceAsStream);
                    if (findResourceAsStream != null) {
                        if (0 != 0) {
                            try {
                                findResourceAsStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            findResourceAsStream.close();
                        }
                    }
                    return properties;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(String.format("Can't load properties from the given property file %s: %s", str, e.getMessage()), e);
        }
    }

    private static InputStream findResourceAsStream(String str) throws FileNotFoundException {
        File file = new File(str);
        if (file.exists()) {
            return new FileInputStream(file);
        }
        InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(str);
        if (resourceAsStream == null) {
            throw new FileNotFoundException(String.format("Can't find property file %s. Make sure the property file is either in your path or in your classpath ", str));
        }
        return resourceAsStream;
    }

    public static void main(String[] strArr) {
        try {
            Args.parse(MantisWorker.class, strArr);
        } catch (IllegalArgumentException e) {
            Args.usage(MantisWorker.class);
            System.exit(1);
        }
        try {
            new MantisWorker((ConfigurationFactory) new StaticPropertiesConfigurationFactory(loadProperties(propFile)), (io.mantisrx.server.master.client.config.ConfigurationFactory) new io.mantisrx.server.master.client.config.StaticPropertiesConfigurationFactory(loadProperties(propFile))).start();
        } catch (Exception e2) {
            logger.error("Unexpected error: " + e2.getMessage(), e2);
            System.exit(2);
        }
    }

    public void start() {
        startUp();
        awaitTerminated();
    }

    public void startUp() {
        logger.info("Starting Mantis Worker");
        RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory());
        for (Service service : this.mantisServices) {
            logger.info("Starting service: " + service);
            try {
                service.start();
                logger.info("Started service: " + service);
            } catch (Throwable th) {
                logger.error(String.format("Failed to start service %s: %s", service, th.getMessage()), th);
                throw th;
            }
        }
        logger.info("Started Mantis Worker successfully");
    }

    public void awaitTerminated() {
        try {
            this.blockUntilShutdown.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void shutdown() {
        logger.info("Shutting down Mantis Worker");
        Iterator<Service> it = this.mantisServices.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.blockUntilShutdown.countDown();
    }

    public void enterActiveMode() {
    }
}
