/*
 * Decompiled with CFR 0.152.
 */
package zipkin.storage.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.internal.CorrectForClockSkew;
import zipkin.internal.Dependencies;
import zipkin.internal.MergeById;
import zipkin.internal.Nullable;
import zipkin.internal.Pair;
import zipkin.internal.Util;
import zipkin.storage.QueryRequest;
import zipkin.storage.cassandra.CassandraUtil;
import zipkin.storage.cassandra.TimestampCodec;
import zipkin.storage.guava.GuavaSpanStore;

public final class CassandraSpanStore
implements GuavaSpanStore {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class);
    static final ListenableFuture<List<String>> EMPTY_LIST = Futures.immediateFuture(Collections.emptyList());
    static final Ordering<List<Span>> TRACE_DESCENDING = Ordering.from((Comparator)new Comparator<List<Span>>(){

        @Override
        public int compare(List<Span> left, List<Span> right) {
            return right.get(0).compareTo(left.get(0));
        }
    });
    private final int indexTtl;
    private final int maxTraceCols;
    private final Session session;
    private final TimestampCodec timestampCodec;
    private final Set<Integer> buckets;
    private final PreparedStatement selectTraces;
    private final PreparedStatement selectDependencies;
    private final PreparedStatement selectServiceNames;
    private final PreparedStatement selectSpanNames;
    private final PreparedStatement selectTraceIdsByServiceName;
    private final PreparedStatement selectTraceIdsByServiceNames;
    private final PreparedStatement selectTraceIdsBySpanName;
    private final PreparedStatement selectTraceIdsByAnnotation;
    private final PreparedStatement selectTraceIdsBySpanDuration;
    private final Function<ResultSet, Map<Long, Long>> traceIdToTimestamp;

    CassandraSpanStore(Session session, int bucketCount, int indexTtl, int maxTraceCols) {
        this.session = session;
        this.indexTtl = indexTtl;
        this.maxTraceCols = maxTraceCols;
        ProtocolVersion protocolVersion = session.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion();
        this.timestampCodec = new TimestampCodec(protocolVersion);
        this.buckets = ContiguousSet.create((Range)Range.closedOpen((Comparable)Integer.valueOf(0), (Comparable)Integer.valueOf(bucketCount)), (DiscreteDomain)DiscreteDomain.integers());
        this.selectTraces = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"trace_id", "span"}).from("traces").where(QueryBuilder.in((String)"trace_id", (Object[])new Object[]{QueryBuilder.bindMarker((String)"trace_id")})).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectDependencies = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"dependencies"}).from("dependencies").where(QueryBuilder.in((String)"day", (Object[])new Object[]{QueryBuilder.bindMarker((String)"days")})));
        this.selectServiceNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"service_name"}).from("service_names"));
        this.selectSpanNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"span_name"}).from("span_names").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).and(QueryBuilder.eq((String)"bucket", (Object)QueryBuilder.bindMarker((String)"bucket"))).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectTraceIdsByServiceName = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("service_name_index").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).and(QueryBuilder.in((String)"bucket", (Object[])new Object[]{QueryBuilder.bindMarker((String)"bucket")})).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        this.selectTraceIdsBySpanName = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("service_span_name_index").where(QueryBuilder.eq((String)"service_span_name", (Object)QueryBuilder.bindMarker((String)"service_span_name"))).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        this.selectTraceIdsByAnnotation = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("annotations_index").where(QueryBuilder.eq((String)"annotation", (Object)QueryBuilder.bindMarker((String)"annotation"))).and(QueryBuilder.in((String)"bucket", (Object[])new Object[]{QueryBuilder.bindMarker((String)"bucket")})).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        this.selectTraceIdsBySpanDuration = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"duration", "ts", "trace_id"}).from("span_duration_index").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).and(QueryBuilder.eq((String)"span_name", (Object)QueryBuilder.bindMarker((String)"span_name"))).and(QueryBuilder.eq((String)"bucket", (Object)QueryBuilder.bindMarker((String)"time_bucket"))).and(QueryBuilder.lte((String)"duration", (Object)QueryBuilder.bindMarker((String)"max_duration"))).and(QueryBuilder.gte((String)"duration", (Object)QueryBuilder.bindMarker((String)"min_duration"))).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"duration")}));
        if (protocolVersion.compareTo((Enum)ProtocolVersion.V4) < 0) {
            LOG.warn("Please update Cassandra to 2.2 or later, as some features may fail");
            this.selectTraceIdsByServiceNames = null;
        } else {
            this.selectTraceIdsByServiceNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("service_name_index").where(QueryBuilder.in((String)"service_name", (Object[])new Object[]{QueryBuilder.bindMarker((String)"service_name")})).and(QueryBuilder.in((String)"bucket", (Object[])new Object[]{QueryBuilder.bindMarker((String)"bucket")})).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        }
        this.traceIdToTimestamp = new Function<ResultSet, Map<Long, Long>>(){

            public Map<Long, Long> apply(ResultSet input) {
                LinkedHashMap<Long, Long> traceIdsToTimestamps = new LinkedHashMap<Long, Long>();
                for (Row row : input) {
                    traceIdsToTimestamps.put(row.getLong("trace_id"), CassandraSpanStore.this.timestampCodec.deserialize(row, "ts"));
                }
                return traceIdsToTimestamps;
            }
        };
    }

    public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request) {
        ListenableFuture traceIds;
        ListenableFuture<Map<Long, Long>> traceIdToTimestamp;
        if (request.minDuration != null || request.maxDuration != null) {
            traceIdToTimestamp = this.getTraceIdsByDuration(request);
        } else if (request.spanName != null) {
            traceIdToTimestamp = this.getTraceIdsBySpanName(request.serviceName, request.spanName, request.endTs * 1000L, request.lookback * 1000L, request.limit);
        } else if (request.serviceName != null) {
            traceIdToTimestamp = this.getTraceIdsByServiceNames(Collections.singletonList(request.serviceName), request.endTs * 1000L, request.lookback * 1000L, request.limit);
        } else {
            Preconditions.checkArgument((this.selectTraceIdsByServiceNames != null ? 1 : 0) != 0, (Object)"getTraces without serviceName requires Cassandra 2.2 or later");
            traceIdToTimestamp = Futures.transform(this.getServiceNames(), (AsyncFunction)new AsyncFunction<List<String>, Map<Long, Long>>(){

                public ListenableFuture<Map<Long, Long>> apply(List<String> serviceNames) {
                    return CassandraSpanStore.this.getTraceIdsByServiceNames(serviceNames, request.endTs * 1000L, request.lookback * 1000L, request.limit);
                }
            });
        }
        List<String> annotationKeys = CassandraUtil.annotationKeys(request);
        if (annotationKeys.isEmpty()) {
            traceIds = Futures.transform(traceIdToTimestamp, CassandraUtil.keyset());
        } else {
            ArrayList<ListenableFuture<Map<Long, Long>>> futureKeySetsToIntersect = new ArrayList<ListenableFuture<Map<Long, Long>>>();
            futureKeySetsToIntersect.add(traceIdToTimestamp);
            for (String annotationKey : annotationKeys) {
                futureKeySetsToIntersect.add(this.getTraceIdsByAnnotation(annotationKey, request.endTs * 1000L, request.lookback * 1000L, request.limit));
            }
            traceIds = Futures.transform((ListenableFuture)Futures.allAsList(futureKeySetsToIntersect), CassandraUtil.intersectKeySets());
        }
        return Futures.transform((ListenableFuture)traceIds, (AsyncFunction)new AsyncFunction<Set<Long>, List<List<Span>>>(){

            public ListenableFuture<List<List<Span>>> apply(Set<Long> traceIds) {
                return Futures.transform(CassandraSpanStore.this.getSpansByTraceIds(traceIds, CassandraSpanStore.this.maxTraceCols), (Function)AdjustTraces.INSTANCE);
            }

            public String toString() {
                return "getSpansByTraceIds";
            }
        });
    }

    static String spanName(String nullableSpanName) {
        return nullableSpanName != null ? nullableSpanName : "";
    }

    public ListenableFuture<List<Span>> getRawTrace(long traceId) {
        return Futures.transform(this.getSpansByTraceIds(Collections.singleton(traceId), this.maxTraceCols), (Function)new Function<Collection<List<Span>>, List<Span>>(){

            public List<Span> apply(Collection<List<Span>> encodedTraces) {
                if (encodedTraces.isEmpty()) {
                    return null;
                }
                return encodedTraces.iterator().next();
            }
        });
    }

    public ListenableFuture<List<Span>> getTrace(long traceId) {
        return Futures.transform(this.getRawTrace(traceId), (Function)new Function<List<Span>, List<Span>>(){

            public List<Span> apply(List<Span> input) {
                if (input == null || input.isEmpty()) {
                    return null;
                }
                return ImmutableList.copyOf((Collection)CorrectForClockSkew.apply((List)MergeById.apply(input)));
            }
        });
    }

    public ListenableFuture<List<String>> getServiceNames() {
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectServiceNames, "select-service-names");
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.selectServiceNames.getQueryString());
            }
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<String>>(){

                public List<String> apply(ResultSet input) {
                    HashSet<String> serviceNames = new HashSet<String>();
                    for (Row row : input) {
                        serviceNames.add(row.getString("service_name"));
                    }
                    return Ordering.natural().sortedCopy(serviceNames);
                }
            });
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.selectServiceNames.getQueryString(), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    public ListenableFuture<List<String>> getSpanNames(String serviceName) {
        if (serviceName == null || serviceName.isEmpty()) {
            return EMPTY_LIST;
        }
        serviceName = ((String)Preconditions.checkNotNull((Object)serviceName, (Object)"serviceName")).toLowerCase();
        int bucket = 0;
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectSpanNames, "select-span-names").setString("service_name", serviceName).setInt("bucket", bucket).setInt("limit_", 1000);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugSelectSpanNames(bucket, serviceName));
            }
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<String>>(){

                public List<String> apply(ResultSet input) {
                    HashSet<String> spanNames = new HashSet<String>();
                    for (Row row : input) {
                        spanNames.add(row.getString("span_name"));
                    }
                    return Ordering.natural().sortedCopy(spanNames);
                }
            });
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.debugSelectSpanNames(bucket, serviceName), (Throwable)ex);
            throw ex;
        }
    }

    public ListenableFuture<List<DependencyLink>> getDependencies(long endTs, @Nullable Long lookback) {
        long endEpochDayMillis = Util.midnightUTC((long)endTs);
        long startEpochDayMillis = Util.midnightUTC((long)(endTs - (lookback != null ? lookback : endTs)));
        List<Date> days = CassandraSpanStore.getDays(startEpochDayMillis, endEpochDayMillis);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectDependencies, "select-dependencies").setList("days", days);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugSelectDependencies(days));
            }
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)ConvertDependenciesResponse.INSTANCE);
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.debugSelectDependencies(days), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Collection<List<Span>>> getSpansByTraceIds(Set<Long> traceIds, int limit) {
        Preconditions.checkNotNull(traceIds, (Object)"traceIds");
        if (traceIds.isEmpty()) {
            List result = Collections.emptyList();
            return Futures.immediateFuture(result);
        }
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraces, "select-traces").setSet("trace_id", traceIds).setInt("limit_", limit);
            bound.setFetchSize(Integer.MAX_VALUE);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugSelectTraces(traceIds, limit));
            }
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, Collection<List<Span>>>(){

                public Collection<List<Span>> apply(ResultSet input) {
                    LinkedHashMap spans = new LinkedHashMap();
                    for (Row row : input) {
                        long traceId = row.getLong("trace_id");
                        if (!spans.containsKey(traceId)) {
                            spans.put(traceId, new ArrayList());
                        }
                        ((List)spans.get(traceId)).add(Codec.THRIFT.readSpan(row.getBytes("span")));
                    }
                    return spans.values();
                }
            });
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.debugSelectTraces(traceIds, limit), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    private String debugSelectTraces(Set<Long> traceIds, int limit) {
        return this.selectTraces.getQueryString().replace(":trace_id", traceIds.toString()).replace(":limit_", String.valueOf(limit));
    }

    private String debugSelectDependencies(List<Date> days) {
        StringBuilder dates = new StringBuilder(CassandraUtil.iso8601(days.get(0).getTime() * 1000L));
        if (days.size() > 1) {
            dates.append(" until ").append(CassandraUtil.iso8601(days.get(days.size() - 1).getTime() * 1000L));
        }
        return this.selectDependencies.getQueryString().replace(":days", dates.toString());
    }

    private String debugSelectSpanNames(int bucket, String serviceName) {
        return this.selectSpanNames.getQueryString().replace(":bucket", String.valueOf(bucket)).replace(":service_name", serviceName);
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsByServiceNames(List<String> serviceNames, long endTs, long lookback, int limit) {
        if (serviceNames.isEmpty()) {
            return Futures.immediateFuture(Collections.emptyMap());
        }
        long startTs = endTs - lookback;
        try {
            BoundStatement bound = serviceNames.size() == 1 ? CassandraUtil.bindWithName(this.selectTraceIdsByServiceName, "select-trace-ids-by-service-name").setString("service_name", serviceNames.get(0)).setSet("bucket", this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit) : CassandraUtil.bindWithName(this.selectTraceIdsByServiceNames, "select-trace-ids-by-service-names").setList("service_name", serviceNames).setSet("bucket", this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit);
            bound.setFetchSize(Integer.MAX_VALUE);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugSelectTraceIdsByServiceNames(serviceNames, this.buckets, startTs, endTs, limit));
            }
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), this.traceIdToTimestamp);
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.debugSelectTraceIdsByServiceNames(serviceNames, this.buckets, startTs, endTs, limit), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    private String debugSelectTraceIdsByServiceNames(List<String> serviceNames, Set<Integer> buckets, long startTs, long endTs, int limit) {
        return serviceNames.size() == 1 ? this.selectTraceIdsByServiceName.getQueryString().replace(":service_name", serviceNames.get(0)).replace(":bucket", buckets.toString()).replace(":start_ts", CassandraUtil.iso8601(startTs)).replace(":end_ts", CassandraUtil.iso8601(endTs)).replace(":limit_", String.valueOf(limit)) : this.selectTraceIdsByServiceNames.getQueryString().replace(":service_name", serviceNames.toString()).replace(":bucket", buckets.toString()).replace(":start_ts", CassandraUtil.iso8601(startTs)).replace(":end_ts", CassandraUtil.iso8601(endTs)).replace(":limit_", String.valueOf(limit));
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsBySpanName(String serviceName, String spanName, long endTs, long lookback, int limit) {
        Preconditions.checkArgument((serviceName != null ? 1 : 0) != 0, (Object)"serviceName required on spanName query");
        Preconditions.checkArgument((spanName != null ? 1 : 0) != 0, (Object)"spanName required on spanName query");
        String serviceSpanName = serviceName + "." + spanName;
        long startTs = endTs - lookback;
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraceIdsBySpanName, "select-trace-ids-by-span-name").setString("service_span_name", serviceSpanName).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugSelectTraceIdsBySpanName(serviceSpanName, startTs, endTs, limit));
            }
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), this.traceIdToTimestamp);
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.debugSelectTraceIdsBySpanName(serviceSpanName, startTs, endTs, limit), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    private String debugSelectTraceIdsBySpanName(String serviceSpanName, long startTs, long endTs, int limit) {
        return this.selectTraceIdsBySpanName.getQueryString().replace(":service_span_name", serviceSpanName).replace(":start_ts", CassandraUtil.iso8601(startTs)).replace(":end_ts", CassandraUtil.iso8601(endTs)).replace(":limit_", String.valueOf(limit));
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsByAnnotation(String annotationKey, long endTs, long lookback, int limit) {
        long startTs = endTs - lookback;
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraceIdsByAnnotation, "select-trace-ids-by-annotation").setBytes("annotation", CassandraUtil.toByteBuffer(annotationKey)).setSet("bucket", this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit);
            bound.setFetchSize(Integer.MAX_VALUE);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugSelectTraceIdsByAnnotation(annotationKey, this.buckets, startTs, endTs, limit));
            }
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, Map<Long, Long>>(){

                public Map<Long, Long> apply(ResultSet input) {
                    LinkedHashMap<Long, Long> traceIdsToTimestamps = new LinkedHashMap<Long, Long>();
                    for (Row row : input) {
                        traceIdsToTimestamps.put(row.getLong("trace_id"), CassandraSpanStore.this.timestampCodec.deserialize(row, "ts"));
                    }
                    return traceIdsToTimestamps;
                }
            });
        }
        catch (RuntimeException | CharacterCodingException ex) {
            LOG.error("failed " + this.debugSelectTraceIdsByAnnotation(annotationKey, this.buckets, startTs, endTs, limit), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    private String debugSelectTraceIdsByAnnotation(String annotationKey, Set<Integer> buckets, long startTs, long endTs, int limit) {
        return this.selectTraceIdsByAnnotation.getQueryString().replace(":annotation", annotationKey).replace(":bucket", buckets.toString()).replace(":start_ts", CassandraUtil.iso8601(startTs)).replace(":end_ts", CassandraUtil.iso8601(endTs)).replace(":limit_", String.valueOf(limit));
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsByDuration(QueryRequest request) {
        Preconditions.checkArgument((request.serviceName != null ? 1 : 0) != 0, (Object)"serviceName required on duration query");
        long oldestData = (System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(this.indexTtl)) * 1000L;
        long startTs = Math.max((request.endTs - request.lookback) * 1000L, oldestData);
        long endTs = Math.max(request.endTs * 1000L, oldestData);
        int startBucket = CassandraUtil.durationIndexBucket(startTs);
        int endBucket = CassandraUtil.durationIndexBucket(endTs);
        if (startBucket > endBucket) {
            throw new IllegalArgumentException("Start bucket (" + startBucket + ") > end bucket (" + endBucket + ")");
        }
        ArrayList<ListenableFuture<List<DurationRow>>> futures = new ArrayList<ListenableFuture<List<DurationRow>>>();
        for (int i = startBucket; i <= endBucket; ++i) {
            futures.add(this.oneBucketDurationQuery(request, i, startTs, endTs));
        }
        return Futures.transform((ListenableFuture)Futures.allAsList(futures), (Function)new Function<List<List<DurationRow>>, Map<Long, Long>>(){

            public Map<Long, Long> apply(List<List<DurationRow>> input) {
                LinkedHashMap<Long, Long> result = new LinkedHashMap<Long, Long>();
                for (DurationRow row : Iterables.concat(input)) {
                    Long oldValue = (Long)result.get(row.trace_id);
                    if (oldValue != null && oldValue <= row.timestamp) continue;
                    result.put(row.trace_id, row.timestamp);
                }
                return Collections.unmodifiableMap(result);
            }
        });
    }

    ListenableFuture<List<DurationRow>> oneBucketDurationQuery(QueryRequest request, int bucket, final long startTs, final long endTs) {
        String serviceName = request.serviceName;
        String spanName = CassandraSpanStore.spanName(request.spanName);
        long minDuration = request.minDuration;
        long maxDuration = request.maxDuration != null ? request.maxDuration : Long.MAX_VALUE;
        int limit = request.limit;
        BoundStatement bound = CassandraUtil.bindWithName(this.selectTraceIdsBySpanDuration, "select-trace-ids-by-span-duration").setInt("time_bucket", bucket).setString("service_name", serviceName).setString("span_name", spanName).setLong("min_duration", minDuration).setLong("max_duration", maxDuration);
        bound.setFetchSize(limit);
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.debugSelectTraceIdsByDuration(bucket, request.serviceName, request.spanName, request.minDuration, request.maxDuration, request.limit));
        }
        return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<DurationRow>>(){

            public List<DurationRow> apply(ResultSet rs) {
                ImmutableList.Builder result = ImmutableList.builder();
                for (Row input : rs) {
                    DurationRow row = new DurationRow(input);
                    if (row.timestamp < startTs || row.timestamp > endTs) continue;
                    result.add((Object)row);
                }
                return result.build();
            }
        });
    }

    private String debugSelectTraceIdsByDuration(int bucket, String serviceName, String spanName, long minDuration, long maxDuration, int limit) {
        return this.selectTraceIdsBySpanDuration.getQueryString().replace(":time_bucket", String.valueOf(bucket)).replace(":service_name", serviceName).replace(":span_name", spanName).replace(":min_duration", String.valueOf(minDuration)).replace(":max_duration", String.valueOf(maxDuration)).replace(":limit_", String.valueOf(limit));
    }

    private static List<Date> getDays(long from, long to) {
        ArrayList<Date> days = new ArrayList<Date>();
        for (long time = from; time <= to; time += TimeUnit.DAYS.toMillis(1L)) {
            days.add(new Date(time));
        }
        return days;
    }

    class DurationRow {
        Long trace_id;
        Long duration;
        Long timestamp;

        DurationRow(Row row) {
            this.trace_id = row.getLong("trace_id");
            this.duration = row.getLong("duration");
            this.timestamp = CassandraSpanStore.this.timestampCodec.deserialize(row, "ts");
        }

        public String toString() {
            return String.format("trace_id=%d, duration=%d, timestamp=%d", this.trace_id, this.duration, this.timestamp);
        }
    }

    static enum ConvertDependenciesResponse implements Function<ResultSet, List<DependencyLink>>
    {
        INSTANCE;


        public List<DependencyLink> apply(ResultSet rs) {
            LinkedHashMap<Pair, Long> links = new LinkedHashMap<Pair, Long>();
            for (Row row : rs) {
                ByteBuffer encodedDayOfDependencies = row.getBytes("dependencies");
                for (DependencyLink link : Dependencies.fromThrift((ByteBuffer)encodedDayOfDependencies).links) {
                    Pair parentChild = Pair.create((Object)link.parent, (Object)link.child);
                    long callCount = links.containsKey(parentChild) ? (Long)links.get(parentChild) : 0L;
                    links.put(parentChild, callCount += link.callCount);
                }
            }
            ArrayList<DependencyLink> result = new ArrayList<DependencyLink>(links.size());
            for (Map.Entry link : links.entrySet()) {
                result.add(DependencyLink.create((String)((String)((Pair)link.getKey())._1), (String)((String)((Pair)link.getKey())._2), (long)((Long)link.getValue())));
            }
            return result;
        }
    }

    static enum AdjustTraces implements Function<Collection<List<Span>>, List<List<Span>>>
    {
        INSTANCE;


        public List<List<Span>> apply(Collection<List<Span>> unmerged) {
            ArrayList<List> result = new ArrayList<List>(unmerged.size());
            for (List<Span> spans : unmerged) {
                result.add(CorrectForClockSkew.apply((List)MergeById.apply(spans)));
            }
            return TRACE_DESCENDING.immutableSortedCopy(result);
        }
    }
}

