package org.datacleaner.spark;

import com.google.common.base.Strings;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Writer;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.metamodel.util.Action;
import org.apache.metamodel.util.FileHelper;
import org.apache.metamodel.util.HdfsResource;
import org.apache.metamodel.util.MutableRef;
import org.apache.spark.launcher.SparkLauncher;
import org.datacleaner.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/datacleaner/spark/ApplicationDriver.class */
public class ApplicationDriver {
    private static final Logger logger = LoggerFactory.getLogger(ApplicationDriver.class);
    private static final String PRIMARY_JAR_FILENAME_PREFIX = "DataCleaner-env-spark";
    private final URI _defaultFs;
    private final DistributedFileSystem _fileSystem;
    private final String _jarDirectoryPath;
    private final String _sparkHome;

    public ApplicationDriver(URI uri, String str) throws IOException {
        this(uri, str, determineSparkHome());
    }

    public ApplicationDriver(URI uri, String str, String str2) throws IOException {
        this._defaultFs = uri;
        this._fileSystem = FileSystem.newInstance(this._defaultFs, new Configuration());
        this._jarDirectoryPath = str;
        this._sparkHome = str2;
    }

    private static String determineSparkHome() {
        String property = System.getProperty("SPARK_HOME");
        if (Strings.isNullOrEmpty(property)) {
            property = System.getenv("SPARK_HOME");
        }
        if (Strings.isNullOrEmpty(property)) {
            throw new IllegalStateException("Could not determine SPARK_HOME. Please set the environment variable, system property or provide it as a " + ApplicationDriver.class.getSimpleName() + " constructor argument");
        }
        return property;
    }

    public int launch(String str, String str2) throws Exception {
        return launch(createSparkLauncher(createTemporaryHadoopConfDir(), str, str2, (String) null));
    }

    public int launch(SparkLauncher sparkLauncher) throws Exception {
        Process launch = sparkLauncher.launch();
        startLogger(launch.getErrorStream());
        startLogger(launch.getInputStream());
        return launch.waitFor();
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [org.datacleaner.spark.ApplicationDriver$1] */
    private void startLogger(final InputStream inputStream) {
        new Thread() { // from class: org.datacleaner.spark.ApplicationDriver.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
                    Throwable th = null;
                    try {
                        for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                            ApplicationDriver.logger.info(readLine);
                        }
                        bufferedReader.close();
                        if (bufferedReader != null) {
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                    } finally {
                    }
                } catch (Exception e) {
                    ApplicationDriver.logger.warn("Logger thread failure: " + e.getMessage(), e);
                }
            }
        }.start();
    }

    public HdfsResource createResource(String str) {
        return new HdfsResource(this._defaultFs.resolve(str).toString());
    }

    public SparkLauncher createSparkLauncher(File file, URI uri, URI uri2, URI uri3) throws Exception {
        return createSparkLauncher(file, uri.toString(), uri2.toString(), uri3 == null ? null : uri3.toString());
    }

    public SparkLauncher createSparkLauncher(File file, String str, String str2, String str3) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("HADOOP_CONF_DIR", file.getAbsolutePath());
        hashMap.put("YARN_CONF_DIR", file.getAbsolutePath());
        SparkLauncher sparkLauncher = new SparkLauncher(hashMap);
        sparkLauncher.setSparkHome(this._sparkHome);
        sparkLauncher.setMaster("yarn-cluster");
        sparkLauncher.setAppName("DataCleaner");
        MutableRef<String> mutableRef = new MutableRef<>();
        List<String> buildJarFiles = buildJarFiles(mutableRef);
        logger.info("Using JAR files: {}", buildJarFiles);
        Iterator<String> it = buildJarFiles.iterator();
        while (it.hasNext()) {
            sparkLauncher.addJar(it.next());
        }
        sparkLauncher.setMainClass(Main.class.getName());
        sparkLauncher.setConf("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
        sparkLauncher.addAppArgs(new String[]{(String) mutableRef.get()});
        sparkLauncher.addAppArgs(new String[]{toHadoopPath(str)});
        sparkLauncher.addAppArgs(new String[]{toHadoopPath(str2)});
        if (!StringUtils.isNullOrEmpty(str3)) {
            Properties properties = new Properties();
            properties.setProperty("datacleaner.result.hdfs.path", str3);
            File createTempFile = File.createTempFile("job-", ".properties");
            properties.store(new FileWriter(createTempFile), "DataCleaner Spark runner properties");
            sparkLauncher.addAppArgs(new String[]{copyFileToHdfs(createTempFile, this._fileSystem.getHomeDirectory().toUri().resolve("temp/" + createTempFile.getName()).toString()).toString()});
        }
        return sparkLauncher;
    }

    private String toHadoopPath(String str) {
        return URI.create(str).getScheme() != null ? str : this._defaultFs.resolve(str).toString();
    }

    private List<String> buildJarFiles(MutableRef<String> mutableRef) throws IOException {
        ArrayList arrayList = new ArrayList();
        RemoteIterator listFiles = this._fileSystem.listFiles(new Path(this._jarDirectoryPath), false);
        while (listFiles.hasNext()) {
            Path path = ((LocatedFileStatus) listFiles.next()).getPath();
            if (path.getName().startsWith(PRIMARY_JAR_FILENAME_PREFIX)) {
                mutableRef.set(path.toString());
            } else {
                arrayList.add(path.toString());
            }
        }
        if (mutableRef.get() == null) {
            throw new IllegalArgumentException("Failed to find primary jar (starting with 'DataCleaner-env-spark') in JAR file directory: " + this._jarDirectoryPath);
        }
        return arrayList;
    }

    public File createTemporaryHadoopConfDir() throws IOException {
        File file = new File(FileHelper.getTempDir(), "datacleaner_hadoop_conf_" + UUID.randomUUID().toString());
        file.mkdirs();
        createTemporaryHadoopConfFile(file, "core-site.xml", "core-site-template.xml");
        createTemporaryHadoopConfFile(file, "yarn-site.xml", "yarn-site-template.xml");
        logger.debug("Created temporary Hadoop conf dir: {}", file);
        return file;
    }

    private void createTemporaryHadoopConfFile(File file, String str, String str2) throws IOException {
        File file2 = new File(file, str);
        InputStream resourceAsStream = getClass().getResourceAsStream(str2);
        Throwable th = null;
        try {
            BufferedReader bufferedReader = FileHelper.getBufferedReader(resourceAsStream, "UTF-8");
            Writer writer = FileHelper.getWriter(file2);
            Throwable th2 = null;
            try {
                try {
                    for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                        writer.write(StringUtils.replaceAll(StringUtils.replaceAll(readLine, "${HDFS_HOSTNAME}", this._defaultFs.getHost()), "${HDFS_PORT}", this._defaultFs.getPort() + ""));
                    }
                    writer.flush();
                    if (writer != null) {
                        if (0 != 0) {
                            try {
                                writer.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            writer.close();
                        }
                    }
                    if (resourceAsStream != null) {
                        if (0 == 0) {
                            resourceAsStream.close();
                            return;
                        }
                        try {
                            resourceAsStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (writer != null) {
                    if (th2 != null) {
                        try {
                            writer.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        writer.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (resourceAsStream != null) {
                if (0 != 0) {
                    try {
                        resourceAsStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    resourceAsStream.close();
                }
            }
            throw th8;
        }
    }

    public URI copyFileToHdfs(File file, String str) {
        return copyFileToHdfs(file, str, true);
    }

    public URI copyFileToHdfs(final File file, String str, boolean z) {
        HdfsResource createResource = createResource(str);
        URI uri = createResource.getHadoopPath().toUri();
        boolean isExists = createResource.isExists();
        if (!z && isExists) {
            logger.debug("Skipping file-copy to {} because file already exists", str);
            return uri;
        }
        if (isExists) {
            logger.info("Overwriting file on HDFS: {}", str);
        } else {
            logger.debug("Copying file to HDFS: {}", str);
        }
        createResource.write(new Action<OutputStream>() { // from class: org.datacleaner.spark.ApplicationDriver.2
            public void run(OutputStream outputStream) throws Exception {
                FileInputStream fileInputStream = new FileInputStream(file);
                FileHelper.copy(fileInputStream, outputStream);
                fileInputStream.close();
            }
        });
        return uri;
    }
}
