/*
 * Decompiled with CFR 0.152.
 */
package zipkin2.dependencies.elasticsearch;

import com.google.common.base.Preconditions;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.MalformedJsonException;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.net.URI;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TimeZone;
import javax.annotation.Nullable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import zipkin2.DependencyLink;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.dependencies.elasticsearch.TraceIdAndJsonToDependencyLinks;
import zipkin2.internal.DateUtil;

public final class ElasticsearchDependenciesJob {
    static final Charset UTF_8 = Charset.forName("UTF-8");
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchDependenciesJob.class);
    final String index;
    final long day;
    final String dateStamp;
    final SparkConf conf;
    @Nullable
    final Runnable logInitializer;
    static final Function<DependencyLink, Map<String, Object>> DEPENDENCY_LINK_JSON = new Function<DependencyLink, Map<String, Object>>(){

        public Map<String, Object> call(DependencyLink l) {
            LinkedHashMap<String, Object> result = new LinkedHashMap<String, Object>();
            result.put("id", l.parent() + "|" + l.child());
            result.put("parent", l.parent());
            result.put("child", l.child());
            result.put("callCount", l.callCount());
            result.put("errorCount", l.errorCount());
            return result;
        }
    };
    static final Function<Tuple2<String, String>, String> JSON_TRACE_ID = new Function<Tuple2<String, String>, String>(){

        public String call(Tuple2<String, String> pair) throws IOException {
            JsonReader reader = new JsonReader((Reader)new StringReader((String)pair._2));
            reader.beginObject();
            while (reader.hasNext()) {
                String nextName = reader.nextName();
                if (nextName.equals("traceId")) {
                    String traceId = reader.nextString();
                    return traceId.length() > 16 ? traceId.substring(traceId.length() - 16) : traceId;
                }
                reader.skipValue();
            }
            throw new MalformedJsonException("no traceId in " + pair);
        }

        public String toString() {
            return "pair._2.traceId";
        }
    };
    static final PairFunction<DependencyLink, Tuple2<String, String>, DependencyLink> LINK_TO_PAIR = new PairFunction<DependencyLink, Tuple2<String, String>, DependencyLink>(){

        public Tuple2<Tuple2<String, String>, DependencyLink> call(DependencyLink link) {
            return new Tuple2((Object)new Tuple2((Object)link.parent(), (Object)link.child()), (Object)link);
        }

        public String toString() {
            return "(link.parent(), link.child()), link)";
        }
    };
    static final Function2<DependencyLink, DependencyLink, DependencyLink> MERGE_LINK = new Function2<DependencyLink, DependencyLink, DependencyLink>(){

        public DependencyLink call(DependencyLink l, DependencyLink r) {
            return DependencyLink.newBuilder().parent(l.parent()).child(l.child()).callCount(l.callCount() + r.callCount()).errorCount(l.errorCount() + r.errorCount()).build();
        }

        public String toString() {
            return "DependencyLink.sum(callCount, errorCount)";
        }
    };

    public static Builder builder() {
        return new Builder();
    }

    private static String getSystemPropertyAsFileResource(String key) {
        String prop = System.getProperty(key, "");
        return prop != null && !prop.isEmpty() ? "file:" + prop : prop;
    }

    ElasticsearchDependenciesJob(Builder builder) {
        this.index = builder.index;
        this.day = builder.day;
        String dateSeparator = ElasticsearchDependenciesJob.getEnv("ES_DATE_SEPARATOR", "-");
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd".replace("-", dateSeparator));
        df.setTimeZone(TimeZone.getTimeZone("UTC"));
        this.dateStamp = df.format(new Date(builder.day));
        this.conf = new SparkConf(true).setMaster(builder.sparkMaster).setAppName(this.getClass().getName());
        if (builder.jars != null) {
            this.conf.setJars(builder.jars);
        }
        if (builder.username != null) {
            this.conf.set("es.net.http.auth.user", builder.username);
        }
        if (builder.password != null) {
            this.conf.set("es.net.http.auth.pass", builder.password);
        }
        this.conf.set("es.nodes", ElasticsearchDependenciesJob.parseHosts(builder.hosts));
        if (builder.hosts.contains("https")) {
            this.conf.set("es.net.ssl", "true");
        }
        for (Map.Entry<String, String> entry : builder.sparkProperties.entrySet()) {
            this.conf.set(entry.getKey(), entry.getValue());
        }
        this.logInitializer = builder.logInitializer;
    }

    public void run() {
        this.run(this.index + "-" + this.dateStamp + "/span", this.index + "-" + this.dateStamp + "/dependencylink", SpanBytesDecoder.JSON_V1);
        this.run(this.index + ":span-" + this.dateStamp + "/span", this.index + ":dependency-" + this.dateStamp + "/dependency", SpanBytesDecoder.JSON_V2);
        log.info("Done");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void run(String spanResource, String dependencyLinkResource, SpanBytesDecoder decoder) {
        log.info("Processing spans from {}", (Object)spanResource);
        JavaSparkContext sc = new JavaSparkContext(this.conf);
        try {
            JavaRDD links = JavaEsSpark.esJsonRDD((JavaSparkContext)sc, (String)spanResource).groupBy(JSON_TRACE_ID).flatMapValues((Function)new TraceIdAndJsonToDependencyLinks(this.logInitializer, decoder)).values().mapToPair(LINK_TO_PAIR).reduceByKey(MERGE_LINK).values().map(DEPENDENCY_LINK_JSON);
            if (links.isEmpty()) {
                log.info("No dependency links could be processed from spans in index {}", (Object)spanResource);
            } else {
                log.info("Saving dependency links to {}", (Object)dependencyLinkResource);
                JavaEsSpark.saveToEs((JavaRDD)links, (String)dependencyLinkResource, Collections.singletonMap("es.mapping.id", "id"));
            }
        }
        finally {
            sc.stop();
        }
    }

    private static String getEnv(String key, String defaultValue) {
        String result = System.getenv(key);
        return result != null && !result.isEmpty() ? result : defaultValue;
    }

    static String parseHosts(String hosts) {
        StringBuilder to = new StringBuilder();
        String[] hostParts = hosts.split(",");
        for (int i = 0; i < hostParts.length; ++i) {
            String host = hostParts[i];
            if (host.startsWith("http")) {
                URI httpUri = URI.create(host);
                int port = httpUri.getPort();
                if (port == -1) {
                    port = host.startsWith("https") ? 443 : 80;
                }
                to.append(httpUri.getHost() + ":" + port);
            } else {
                to.append(host);
            }
            if (i + 1 >= hostParts.length) continue;
            to.append(',');
        }
        return to.toString();
    }

    public static final class Builder {
        String index = ElasticsearchDependenciesJob.access$000("ES_INDEX", "zipkin");
        String hosts = ElasticsearchDependenciesJob.access$000("ES_HOSTS", "127.0.0.1");
        String username = ElasticsearchDependenciesJob.access$000("ES_USERNAME", null);
        String password = ElasticsearchDependenciesJob.access$000("ES_PASSWORD", null);
        final Map<String, String> sparkProperties = new LinkedHashMap<String, String>();
        String sparkMaster = ElasticsearchDependenciesJob.access$000("SPARK_MASTER", "local[*]");
        String[] jars;
        Runnable logInitializer;
        long day = DateUtil.midnightUTC((long)System.currentTimeMillis());

        Builder() {
            this.sparkProperties.put("spark.ui.enabled", "false");
            this.sparkProperties.put("es.index.read.missing.as.empty", "true");
            this.sparkProperties.put("es.nodes.wan.only", ElasticsearchDependenciesJob.getEnv("ES_NODES_WAN_ONLY", "false"));
            this.sparkProperties.put("es.net.ssl.keystore.location", ElasticsearchDependenciesJob.getSystemPropertyAsFileResource("javax.net.ssl.keyStore"));
            this.sparkProperties.put("es.net.ssl.keystore.pass", System.getProperty("javax.net.ssl.keyStorePassword", ""));
            this.sparkProperties.put("es.net.ssl.truststore.location", ElasticsearchDependenciesJob.getSystemPropertyAsFileResource("javax.net.ssl.trustStore"));
            this.sparkProperties.put("es.net.ssl.truststore.pass", System.getProperty("javax.net.ssl.trustStorePassword", ""));
        }

        public Builder jars(String ... jars) {
            this.jars = jars;
            return this;
        }

        public Builder index(String index) {
            this.index = (String)Preconditions.checkNotNull((Object)index, (Object)"index");
            return this;
        }

        public Builder hosts(String hosts) {
            this.hosts = (String)Preconditions.checkNotNull((Object)hosts, (Object)"hosts");
            this.sparkProperties.put("es.nodes.wan.only", "true");
            return this;
        }

        public Builder username(String username) {
            this.username = username;
            return this;
        }

        public Builder password(String password) {
            this.password = password;
            return this;
        }

        public Builder day(long day) {
            this.day = DateUtil.midnightUTC((long)day);
            return this;
        }

        public Builder logInitializer(Runnable logInitializer) {
            this.logInitializer = (Runnable)Preconditions.checkNotNull((Object)logInitializer, (Object)"logInitializer");
            return this;
        }

        public ElasticsearchDependenciesJob build() {
            return new ElasticsearchDependenciesJob(this);
        }
    }
}

