package com.spotify.hype;

import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem;
import com.google.common.base.Preconditions;
import com.spotify.hype.gcs.RunManifest;
import com.spotify.hype.gcs.RunManifestBuilder;
import com.spotify.hype.gcs.StagingUtil;
import com.spotify.hype.model.ContainerEngineCluster;
import com.spotify.hype.model.DockerCluster;
import com.spotify.hype.model.RunEnvironment;
import com.spotify.hype.model.StagedContinuation;
import com.spotify.hype.runner.DockerRunner;
import com.spotify.hype.runner.KubernetesDockerRunner;
import com.spotify.hype.runner.RunSpec;
import com.spotify.hype.runner.VolumeRepository;
import com.spotify.hype.util.Fn;
import com.spotify.hype.util.SerializationUtil;
import com.spotify.hype.util.Util;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.netty.handler.codec.http.multipart.DiskFileUpload;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/hype/Submitter.class */
public class Submitter implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Submitter.class);
    private static final String STAGING_PREFIX = "spotify-hype-staging";
    private final ClasspathInspector classpathInspector;
    private final URI stagingLocation;
    private final VolumeRepository volumeRepository;
    private final DockerRunner runner;
    private static KubernetesClient client;

    public static Submitter createLocal() throws IOException {
        return createLocal(DockerCluster.dockerCluster());
    }

    public static Submitter createLocal(DockerCluster dockerCluster) throws IOException {
        Path resolve = new File(System.getProperty("user.home")).toPath().resolve(DiskFileUpload.postfix).resolve(STAGING_PREFIX);
        LOG.info("Local staging location is " + resolve);
        Files.createDirectories(resolve, new FileAttribute[0]);
        return new Submitter(ClasspathInspector.forLoader(Submitter.class.getClassLoader()), resolve.toString(), dockerCluster);
    }

    public static Submitter create(String str, ContainerEngineCluster containerEngineCluster) {
        return create(ClasspathInspector.forLoader(Submitter.class.getClassLoader()), str, containerEngineCluster);
    }

    public static Submitter create(ClasspathInspector classpathInspector, String str, ContainerEngineCluster containerEngineCluster) {
        return new Submitter(classpathInspector, str, containerEngineCluster);
    }

    private URI getStagingURI(String str) {
        Preconditions.checkNotNull(str);
        URI create = URI.create(str);
        return !create.isAbsolute() ? URI.create("file://" + create.toString()) : create;
    }

    private Submitter(ClasspathInspector classpathInspector, String str, ContainerEngineCluster containerEngineCluster) {
        this.stagingLocation = getStagingURI(str);
        Preconditions.checkState(Objects.equals(this.stagingLocation.getScheme(), CloudStorageFileSystem.URI_SCHEME));
        this.classpathInspector = (ClasspathInspector) Objects.requireNonNull(classpathInspector);
        KubernetesClient client2 = getClient(containerEngineCluster);
        this.volumeRepository = new VolumeRepository(client2);
        this.runner = DockerRunner.kubernetes(client2, this.volumeRepository);
    }

    private Submitter(ClasspathInspector classpathInspector, String str, DockerCluster dockerCluster) {
        this.stagingLocation = getStagingURI(str);
        if (!Objects.equals(this.stagingLocation.getScheme(), "file")) {
            LOG.warn("You are using non local staging location for local cluster");
        }
        this.classpathInspector = (ClasspathInspector) Objects.requireNonNull(classpathInspector);
        this.volumeRepository = null;
        this.runner = DockerRunner.local(DockerRunner.createDockerClient(), dockerCluster);
    }

    public <T> T runOnCluster(Fn<T> fn, RunEnvironment runEnvironment, String str) {
        StagedContinuation stageContinuation = stageContinuation(fn);
        RunSpec runSpec = RunSpec.runSpec(runEnvironment, stageContinuation, str);
        LOG.info("Submitting {} to {}", stageContinuation.manifestPath().toUri(), runEnvironment);
        Optional<URI> run = this.runner.run(runSpec);
        if (!run.isPresent()) {
            throw new RuntimeException("Failed to get return value");
        }
        try {
            InputStream newInputStream = Files.newInputStream(Paths.get(run.get()), new OpenOption[0]);
            Throwable th = null;
            try {
                try {
                    T t = (T) SerializationUtil.readObject(newInputStream);
                    if (newInputStream != null) {
                        if (0 != 0) {
                            try {
                                newInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newInputStream.close();
                        }
                    }
                    waitForDetach(runEnvironment);
                    return t;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public StagedContinuation stageContinuation(Fn<?> fn) {
        List<Path> classpathJars = this.classpathInspector.classpathJars();
        Path serializeContinuation = SerializationUtil.serializeContinuation(fn);
        Path resolve = Paths.get(this.stagingLocation).resolve("manifest-" + Util.randomAlphaNumeric(8) + ".txt");
        String nameWithoutExtension = com.google.common.io.Files.getNameWithoutExtension(serializeContinuation.toAbsolutePath().toString());
        classpathJars.add(serializeContinuation.toAbsolutePath());
        List<StagingUtil.StagedPackage> stageClasspathElements = StagingUtil.stageClasspathElements((List) classpathJars.stream().map((v0) -> {
            return v0.toAbsolutePath();
        }).map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList()), this.stagingLocation.toString());
        Optional<StagingUtil.StagedPackage> findFirst = stageClasspathElements.stream().filter(stagedPackage -> {
            return stagedPackage.name().contains(nameWithoutExtension);
        }).findFirst();
        if (!findFirst.isPresent()) {
            throw new RuntimeException();
        }
        RunManifest build = new RunManifestBuilder().continuation(Paths.get(URI.create(findFirst.get().location())).getFileName().toString()).classPathFiles((List<? extends String>) stageClasspathElements.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList())).build();
        try {
            RunManifest.write(build, resolve);
            return StagedContinuation.stagedContinuation(resolve, build);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.volumeRepository != null) {
            this.volumeRepository.close();
        }
    }

    private void waitForDetach(RunEnvironment runEnvironment) {
        if ((this.runner instanceof KubernetesDockerRunner) && runEnvironment.volumeMounts().stream().anyMatch(volumeMount -> {
            return !volumeMount.readOnly();
        })) {
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
            }
        }
    }

    private static synchronized KubernetesClient getClient(ContainerEngineCluster containerEngineCluster) {
        if (client == null) {
            client = DockerRunner.createKubernetesClient(containerEngineCluster);
        }
        return client;
    }
}
