package zipkin.dependencies.elasticsearch;

import java.io.IOException;
import java.io.StringReader;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TimeZone;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.internal.Nullable;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.internal.Util;
import zipkin.internal.gson.stream.JsonReader;
import zipkin.internal.gson.stream.MalformedJsonException;

/* loaded from: input_file:zipkin/dependencies/elasticsearch/ElasticsearchDependenciesJob.class */
public final class ElasticsearchDependenciesJob {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchDependenciesJob.class);
    final String index;
    final long day;
    final String dateStamp;
    final SparkConf conf;

    @Nullable
    final Runnable logInitializer;

    /* loaded from: input_file:zipkin/dependencies/elasticsearch/ElasticsearchDependenciesJob$Builder.class */
    public static final class Builder {
        String[] jars;
        Runnable logInitializer;
        String index = ElasticsearchDependenciesJob.getEnv("ES_INDEX", "zipkin");
        final Map<String, String> sparkProperties = new LinkedHashMap();
        String sparkMaster = ElasticsearchDependenciesJob.getEnv("SPARK_MASTER", "local[*]");
        long day = Util.midnightUTC(System.currentTimeMillis());

        Builder() {
            this.sparkProperties.put("spark.ui.enabled", "false");
            this.sparkProperties.put("es.index.read.missing.as.empty", "true");
            this.sparkProperties.put("es.nodes.wan.only", ElasticsearchDependenciesJob.getEnv("ES_NODES_WAN_ONLY", "false"));
            this.sparkProperties.put("es.nodes", ElasticsearchDependenciesJob.getEnv("ES_HOSTS", "127.0.0.1"));
        }

        public Builder jars(String... strArr) {
            this.jars = strArr;
            return this;
        }

        public Builder index(String str) {
            this.index = (String) Util.checkNotNull(str, "index");
            return this;
        }

        public Builder esNodes(String str) {
            this.sparkProperties.put("es.nodes", (String) Util.checkNotNull(str, "esNodes"));
            this.sparkProperties.put("es.nodes.wan.only", "true");
            return this;
        }

        public Builder day(long j) {
            this.day = Util.midnightUTC(j);
            return this;
        }

        public Builder logInitializer(Runnable runnable) {
            this.logInitializer = (Runnable) Util.checkNotNull(runnable, "logInitializer");
            return this;
        }

        public ElasticsearchDependenciesJob build() {
            return new ElasticsearchDependenciesJob(this);
        }
    }

    /* loaded from: input_file:zipkin/dependencies/elasticsearch/ElasticsearchDependenciesJob$Span2Decoder.class */
    enum Span2Decoder implements Function<byte[], Span> {
        INSTANCE;

        public Span call(byte[] bArr) throws Exception {
            return Span2Converter.toSpan(Span2Codec.JSON.readSpan(bArr));
        }
    }

    /* loaded from: input_file:zipkin/dependencies/elasticsearch/ElasticsearchDependenciesJob$SpanDecoder.class */
    enum SpanDecoder implements Function<byte[], Span> {
        INSTANCE;

        public Span call(byte[] bArr) throws Exception {
            return Codec.JSON.readSpan(bArr);
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    ElasticsearchDependenciesJob(Builder builder) {
        this.index = builder.index;
        this.day = builder.day;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd".replace("-", getEnv("ES_DATE_SEPARATOR", "-")));
        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
        this.dateStamp = simpleDateFormat.format(new Date(builder.day));
        this.conf = new SparkConf(true).setMaster(builder.sparkMaster).setAppName(getClass().getName());
        if (builder.jars != null) {
            this.conf.setJars(builder.jars);
        }
        for (Map.Entry<String, String> entry : builder.sparkProperties.entrySet()) {
            this.conf.set(entry.getKey(), entry.getValue());
        }
        this.logInitializer = builder.logInitializer;
    }

    public void run() {
        run(this.index + "-" + this.dateStamp + "/span", this.index + "-" + this.dateStamp + "/dependencylink", SpanDecoder.INSTANCE);
        run(this.index + ":span-" + this.dateStamp + "/span", this.index + ":dependency-" + this.dateStamp + "/dependency", Span2Decoder.INSTANCE);
        log.info("Done");
    }

    void run(String str, String str2, Function<byte[], Span> function) {
        Function function2;
        PairFunction pairFunction;
        Function2 function22;
        Function function3;
        log.info("Processing spans from {}", str);
        JavaSparkContext javaSparkContext = new JavaSparkContext(this.conf);
        try {
            JavaPairRDD esJsonRDD = JavaEsSpark.esJsonRDD(javaSparkContext, str);
            function2 = ElasticsearchDependenciesJob$$Lambda$1.instance;
            JavaRDD values = esJsonRDD.groupBy(function2).flatMapValues(new TraceIdAndJsonToDependencyLinks(this.logInitializer, function)).values();
            pairFunction = ElasticsearchDependenciesJob$$Lambda$4.instance;
            JavaPairRDD mapToPair = values.mapToPair(pairFunction);
            function22 = ElasticsearchDependenciesJob$$Lambda$5.instance;
            JavaRDD values2 = mapToPair.reduceByKey(function22).values();
            function3 = ElasticsearchDependenciesJob$$Lambda$6.instance;
            JavaRDD map = values2.map(function3);
            if (map.isEmpty()) {
                log.info("No spans found at {}", str);
            } else {
                log.info("Saving dependency links to {}", str2);
                JavaEsSpark.saveToEs(map, str2, Collections.singletonMap("es.mapping.id", "id"));
            }
        } finally {
            javaSparkContext.stop();
        }
    }

    public static Map<String, Object> dependencyLinkJson(DependencyLink dependencyLink) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("id", dependencyLink.parent + "|" + dependencyLink.child);
        linkedHashMap.put("parent", dependencyLink.parent);
        linkedHashMap.put("child", dependencyLink.child);
        linkedHashMap.put("callCount", Long.valueOf(dependencyLink.callCount));
        linkedHashMap.put("errorCount", Long.valueOf(dependencyLink.errorCount));
        return linkedHashMap;
    }

    public static String getEnv(String str, String str2) {
        String str3 = System.getenv(str);
        return str3 != null ? str3 : str2;
    }

    public static String traceId(String str) throws IOException {
        JsonReader jsonReader = new JsonReader(new StringReader(str));
        jsonReader.beginObject();
        while (jsonReader.hasNext()) {
            if (jsonReader.nextName().equals("traceId")) {
                String nextString = jsonReader.nextString();
                return nextString.length() > 16 ? nextString.substring(nextString.length() - 16) : nextString;
            }
            jsonReader.skipValue();
        }
        throw new MalformedJsonException("no traceId in " + str);
    }

    public static <T1, T2> Tuple2<T1, T2> tuple2(T1 t1, T2 t2) {
        return new Tuple2<>(t1, t2);
    }

    public static /* synthetic */ String lambda$run$b0dd6fce$1(Tuple2 tuple2) throws Exception {
        return traceId((String) tuple2._2);
    }
}
