/*
 * Decompiled with CFR 0.152.
 */
package zipkin.storage.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.internal.CorrectForClockSkew;
import zipkin.internal.Dependencies;
import zipkin.internal.DependencyLinker;
import zipkin.internal.MergeById;
import zipkin.internal.Nullable;
import zipkin.internal.Util;
import zipkin.storage.QueryRequest;
import zipkin.storage.cassandra.CassandraUtil;
import zipkin.storage.cassandra.TimestampCodec;
import zipkin.storage.guava.GuavaSpanStore;

public final class CassandraSpanStore
implements GuavaSpanStore {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class);
    static final ListenableFuture<List<String>> EMPTY_LIST = Futures.immediateFuture(Collections.emptyList());
    static final Ordering<List<Span>> TRACE_DESCENDING = Ordering.from((Comparator)new Comparator<List<Span>>(){

        @Override
        public int compare(List<Span> left, List<Span> right) {
            return right.get(0).compareTo(left.get(0));
        }
    });
    private final int maxTraceCols;
    private final int indexFetchMultiplier;
    private final Session session;
    private final TimestampCodec timestampCodec;
    private final Set<Integer> buckets;
    private final PreparedStatement selectTraces;
    private final PreparedStatement selectDependencies;
    private final PreparedStatement selectServiceNames;
    private final PreparedStatement selectSpanNames;
    private final PreparedStatement selectTraceIdsByServiceName;
    private final PreparedStatement selectTraceIdsByServiceNames;
    private final PreparedStatement selectTraceIdsBySpanName;
    private final PreparedStatement selectTraceIdsByAnnotation;
    private final Function<ResultSet, Map<Long, Long>> traceIdToTimestamp;

    CassandraSpanStore(Session session, int bucketCount, int indexTtl, int maxTraceCols, int indexFetchMultiplier) {
        this.session = session;
        this.maxTraceCols = maxTraceCols;
        this.indexFetchMultiplier = indexFetchMultiplier;
        ProtocolVersion protocolVersion = session.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion();
        this.timestampCodec = new TimestampCodec(protocolVersion);
        this.buckets = ContiguousSet.create((Range)Range.closedOpen((Comparable)Integer.valueOf(0), (Comparable)Integer.valueOf(bucketCount)), (DiscreteDomain)DiscreteDomain.integers());
        this.selectTraces = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"trace_id", "span"}).from("traces").where(QueryBuilder.in((String)"trace_id", (Object[])new Object[]{QueryBuilder.bindMarker((String)"trace_id")})).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectDependencies = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"dependencies"}).from("dependencies").where(QueryBuilder.in((String)"day", (Object[])new Object[]{QueryBuilder.bindMarker((String)"days")})));
        this.selectServiceNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"service_name"}).from("service_names"));
        this.selectSpanNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"span_name"}).from("span_names").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).and(QueryBuilder.eq((String)"bucket", (Object)QueryBuilder.bindMarker((String)"bucket"))).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectTraceIdsByServiceName = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("service_name_index").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).and(QueryBuilder.in((String)"bucket", (Object[])new Object[]{QueryBuilder.bindMarker((String)"bucket")})).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        this.selectTraceIdsBySpanName = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("service_span_name_index").where(QueryBuilder.eq((String)"service_span_name", (Object)QueryBuilder.bindMarker((String)"service_span_name"))).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        this.selectTraceIdsByAnnotation = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("annotations_index").where(QueryBuilder.eq((String)"annotation", (Object)QueryBuilder.bindMarker((String)"annotation"))).and(QueryBuilder.in((String)"bucket", (Object[])new Object[]{QueryBuilder.bindMarker((String)"bucket")})).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        if (protocolVersion.compareTo((Enum)ProtocolVersion.V4) < 0) {
            LOG.warn("Please update Cassandra to 2.2 or later, as some features may fail");
            this.selectTraceIdsByServiceNames = null;
        } else {
            this.selectTraceIdsByServiceNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("service_name_index").where(QueryBuilder.in((String)"service_name", (Object[])new Object[]{QueryBuilder.bindMarker((String)"service_name")})).and(QueryBuilder.in((String)"bucket", (Object[])new Object[]{QueryBuilder.bindMarker((String)"bucket")})).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        }
        this.traceIdToTimestamp = new Function<ResultSet, Map<Long, Long>>(){

            public Map<Long, Long> apply(ResultSet input) {
                LinkedHashMap<Long, Long> traceIdsToTimestamps = new LinkedHashMap<Long, Long>();
                for (Row row : input) {
                    traceIdsToTimestamps.put(row.getLong("trace_id"), CassandraSpanStore.this.timestampCodec.deserialize(row, "ts"));
                }
                return traceIdsToTimestamps;
            }
        };
    }

    public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request) {
        ListenableFuture traceIds;
        ListenableFuture traceIdToTimestamp;
        final int traceIndexFetchSize = request.limit * this.indexFetchMultiplier;
        if (request.spanName != null) {
            traceIdToTimestamp = this.getTraceIdsBySpanName(request.serviceName, request.spanName, request.endTs * 1000L, request.lookback * 1000L, traceIndexFetchSize);
        } else if (request.serviceName != null) {
            traceIdToTimestamp = this.getTraceIdsByServiceNames(Collections.singletonList(request.serviceName), request.endTs * 1000L, request.lookback * 1000L, traceIndexFetchSize);
        } else {
            Preconditions.checkArgument((this.selectTraceIdsByServiceNames != null ? 1 : 0) != 0, (Object)"getTraces without serviceName requires Cassandra 2.2 or later");
            traceIdToTimestamp = Futures.transform(this.getServiceNames(), (AsyncFunction)new AsyncFunction<List<String>, Map<Long, Long>>(){

                public ListenableFuture<Map<Long, Long>> apply(List<String> serviceNames) {
                    return CassandraSpanStore.this.getTraceIdsByServiceNames(serviceNames, request.endTs * 1000L, request.lookback * 1000L, traceIndexFetchSize);
                }
            });
        }
        List<String> annotationKeys = CassandraUtil.annotationKeys(request);
        if (annotationKeys.isEmpty()) {
            traceIds = Futures.transform((ListenableFuture)traceIdToTimestamp, CassandraUtil.keyset());
        } else {
            ArrayList<Object> futureKeySetsToIntersect = new ArrayList<Object>();
            futureKeySetsToIntersect.add(traceIdToTimestamp);
            for (String annotationKey : annotationKeys) {
                futureKeySetsToIntersect.add(this.getTraceIdsByAnnotation(annotationKey, request.endTs * 1000L, request.lookback * 1000L, traceIndexFetchSize));
            }
            traceIds = Futures.transform((ListenableFuture)Futures.allAsList(futureKeySetsToIntersect), CassandraUtil.intersectKeySets());
        }
        return Futures.transform((ListenableFuture)traceIds, (AsyncFunction)new AsyncFunction<Set<Long>, List<List<Span>>>(){

            public ListenableFuture<List<List<Span>>> apply(Set<Long> traceIds) {
                traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet();
                return Futures.transform(CassandraSpanStore.this.getSpansByTraceIds((Set<Long>)traceIds, CassandraSpanStore.this.maxTraceCols), (Function)AdjustTraces.INSTANCE);
            }

            public String toString() {
                return "getSpansByTraceIds";
            }
        });
    }

    static String spanName(String nullableSpanName) {
        return nullableSpanName != null ? nullableSpanName : "";
    }

    public ListenableFuture<List<Span>> getRawTrace(long traceId) {
        return Futures.transform(this.getSpansByTraceIds(Collections.singleton(traceId), this.maxTraceCols), (Function)new Function<Collection<List<Span>>, List<Span>>(){

            public List<Span> apply(Collection<List<Span>> encodedTraces) {
                if (encodedTraces.isEmpty()) {
                    return null;
                }
                return encodedTraces.iterator().next();
            }
        });
    }

    public ListenableFuture<List<Span>> getTrace(long traceId) {
        return Futures.transform(this.getRawTrace(traceId), (Function)new Function<List<Span>, List<Span>>(){

            public List<Span> apply(List<Span> input) {
                if (input == null || input.isEmpty()) {
                    return null;
                }
                return ImmutableList.copyOf((Collection)CorrectForClockSkew.apply((List)MergeById.apply(input)));
            }
        });
    }

    public ListenableFuture<List<String>> getServiceNames() {
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectServiceNames, "select-service-names");
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<String>>(){

                public List<String> apply(ResultSet input) {
                    HashSet<String> serviceNames = new HashSet<String>();
                    for (Row row : input) {
                        serviceNames.add(row.getString("service_name"));
                    }
                    return Ordering.natural().sortedCopy(serviceNames);
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    public ListenableFuture<List<String>> getSpanNames(String serviceName) {
        if (serviceName == null || serviceName.isEmpty()) {
            return EMPTY_LIST;
        }
        serviceName = ((String)Preconditions.checkNotNull((Object)serviceName, (Object)"serviceName")).toLowerCase();
        int bucket = 0;
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectSpanNames, "select-span-names").setString("service_name", serviceName).setInt("bucket", bucket).setInt("limit_", 1000);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<String>>(){

                public List<String> apply(ResultSet input) {
                    HashSet<String> spanNames = new HashSet<String>();
                    for (Row row : input) {
                        spanNames.add(row.getString("span_name"));
                    }
                    return Ordering.natural().sortedCopy(spanNames);
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    public ListenableFuture<List<DependencyLink>> getDependencies(long endTs, @Nullable Long lookback) {
        List days = Util.getDays((long)endTs, (Long)lookback);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectDependencies, "select-dependencies").setList("days", days);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)ConvertDependenciesResponse.INSTANCE);
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Collection<List<Span>>> getSpansByTraceIds(Set<Long> traceIds, int limit) {
        Preconditions.checkNotNull(traceIds, (Object)"traceIds");
        if (traceIds.isEmpty()) {
            List result = Collections.emptyList();
            return Futures.immediateFuture(result);
        }
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraces, "select-traces").setSet("trace_id", traceIds).setInt("limit_", limit);
            bound.setFetchSize(Integer.MAX_VALUE);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, Collection<List<Span>>>(){

                public Collection<List<Span>> apply(ResultSet input) {
                    LinkedHashMap spans = new LinkedHashMap();
                    for (Row row : input) {
                        long traceId = row.getLong("trace_id");
                        if (!spans.containsKey(traceId)) {
                            spans.put(traceId, new ArrayList());
                        }
                        ((List)spans.get(traceId)).add(Codec.THRIFT.readSpan(row.getBytes("span")));
                    }
                    return spans.values();
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsByServiceNames(List<String> serviceNames, long endTs, long lookback, int limit) {
        if (serviceNames.isEmpty()) {
            return Futures.immediateFuture(Collections.emptyMap());
        }
        long startTs = endTs - lookback;
        try {
            BoundStatement bound = serviceNames.size() == 1 ? CassandraUtil.bindWithName(this.selectTraceIdsByServiceName, "select-trace-ids-by-service-name").setString("service_name", serviceNames.get(0)).setSet("bucket", this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit) : CassandraUtil.bindWithName(this.selectTraceIdsByServiceNames, "select-trace-ids-by-service-names").setList("service_name", serviceNames).setSet("bucket", this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit);
            bound.setFetchSize(Integer.MAX_VALUE);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), this.traceIdToTimestamp);
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsBySpanName(String serviceName, String spanName, long endTs, long lookback, int limit) {
        Preconditions.checkArgument((serviceName != null ? 1 : 0) != 0, (Object)"serviceName required on spanName query");
        Preconditions.checkArgument((spanName != null ? 1 : 0) != 0, (Object)"spanName required on spanName query");
        String serviceSpanName = serviceName + "." + spanName;
        long startTs = endTs - lookback;
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraceIdsBySpanName, "select-trace-ids-by-span-name").setString("service_span_name", serviceSpanName).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), this.traceIdToTimestamp);
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsByAnnotation(String annotationKey, long endTs, long lookback, int limit) {
        long startTs = endTs - lookback;
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraceIdsByAnnotation, "select-trace-ids-by-annotation").setBytes("annotation", CassandraUtil.toByteBuffer(annotationKey)).setSet("bucket", this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit);
            bound.setFetchSize(Integer.MAX_VALUE);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, Map<Long, Long>>(){

                public Map<Long, Long> apply(ResultSet input) {
                    LinkedHashMap<Long, Long> traceIdsToTimestamps = new LinkedHashMap<Long, Long>();
                    for (Row row : input) {
                        traceIdsToTimestamps.put(row.getLong("trace_id"), CassandraSpanStore.this.timestampCodec.deserialize(row, "ts"));
                    }
                    return traceIdsToTimestamps;
                }
            });
        }
        catch (RuntimeException | CharacterCodingException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    static enum ConvertDependenciesResponse implements Function<ResultSet, List<DependencyLink>>
    {
        INSTANCE;


        public List<DependencyLink> apply(ResultSet rs) {
            ImmutableList.Builder unmerged = ImmutableList.builder();
            for (Row row : rs) {
                ByteBuffer encodedDayOfDependencies = row.getBytes("dependencies");
                for (DependencyLink link : Dependencies.fromThrift((ByteBuffer)encodedDayOfDependencies).links) {
                    unmerged.add((Object)link);
                }
            }
            return DependencyLinker.merge((Iterable)unmerged.build());
        }
    }

    static enum AdjustTraces implements Function<Collection<List<Span>>, List<List<Span>>>
    {
        INSTANCE;


        public List<List<Span>> apply(Collection<List<Span>> unmerged) {
            ArrayList<List> result = new ArrayList<List>(unmerged.size());
            for (List<Span> spans : unmerged) {
                result.add(CorrectForClockSkew.apply((List)MergeById.apply(spans)));
            }
            return TRACE_DESCENDING.immutableSortedCopy(result);
        }
    }
}

