/*
 * Decompiled with CFR 0.152.
 */
package zipkin.storage.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.internal.CorrectForClockSkew;
import zipkin.internal.Dependencies;
import zipkin.internal.DependencyLinker;
import zipkin.internal.GroupByTraceId;
import zipkin.internal.MergeById;
import zipkin.internal.Nullable;
import zipkin.internal.Util;
import zipkin.storage.QueryRequest;
import zipkin.storage.cassandra.CassandraSpanStore$$Lambda$1;
import zipkin.storage.cassandra.CassandraSpanStore$2$1$$Lambda$1;
import zipkin.storage.cassandra.CassandraUtil;
import zipkin.storage.cassandra.TimestampCodec;
import zipkin.storage.guava.GuavaSpanStore;

public final class CassandraSpanStore
implements GuavaSpanStore {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class);
    static final ListenableFuture<List<String>> EMPTY_LIST = Futures.immediateFuture(Collections.emptyList());
    private final int maxTraceCols;
    private final int indexFetchMultiplier;
    private final boolean strictTraceId;
    private final Session session;
    private final TimestampCodec timestampCodec;
    private final Set<Integer> buckets;
    private final PreparedStatement selectTraces;
    private final PreparedStatement selectDependencies;
    private final PreparedStatement selectServiceNames;
    private final PreparedStatement selectSpanNames;
    private final PreparedStatement selectTraceIdsByServiceName;
    private final PreparedStatement selectTraceIdsByServiceNames;
    private final PreparedStatement selectTraceIdsBySpanName;
    private final PreparedStatement selectTraceIdsByAnnotation;
    private final Function<ResultSet, Map<Long, Long>> traceIdToTimestamp;

    CassandraSpanStore(Session session, int bucketCount, int maxTraceCols, int indexFetchMultiplier, boolean strictTraceId) {
        this.session = session;
        this.maxTraceCols = maxTraceCols;
        this.indexFetchMultiplier = indexFetchMultiplier;
        this.strictTraceId = strictTraceId;
        ProtocolVersion protocolVersion = session.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion();
        this.timestampCodec = new TimestampCodec(protocolVersion);
        this.buckets = ContiguousSet.create((Range)Range.closedOpen((Comparable)Integer.valueOf(0), (Comparable)Integer.valueOf(bucketCount)), (DiscreteDomain)DiscreteDomain.integers());
        this.selectTraces = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"trace_id", "span"}).from("traces").where(QueryBuilder.in((String)"trace_id", (Object[])new Object[]{QueryBuilder.bindMarker((String)"trace_id")})).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectDependencies = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"dependencies"}).from("dependencies").where(QueryBuilder.in((String)"day", (Object[])new Object[]{QueryBuilder.bindMarker((String)"days")})));
        this.selectServiceNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"service_name"}).from("service_names"));
        this.selectSpanNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"span_name"}).from("span_names").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).and(QueryBuilder.eq((String)"bucket", (Object)QueryBuilder.bindMarker((String)"bucket"))).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectTraceIdsByServiceName = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("service_name_index").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).and(QueryBuilder.in((String)"bucket", (Object[])new Object[]{QueryBuilder.bindMarker((String)"bucket")})).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        this.selectTraceIdsBySpanName = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("service_span_name_index").where(QueryBuilder.eq((String)"service_span_name", (Object)QueryBuilder.bindMarker((String)"service_span_name"))).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        this.selectTraceIdsByAnnotation = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("annotations_index").where(QueryBuilder.eq((String)"annotation", (Object)QueryBuilder.bindMarker((String)"annotation"))).and(QueryBuilder.in((String)"bucket", (Object[])new Object[]{QueryBuilder.bindMarker((String)"bucket")})).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        if (protocolVersion.compareTo((Enum)ProtocolVersion.V4) < 0) {
            LOG.warn("Please update Cassandra to 2.2 or later, as some features may fail");
            this.selectTraceIdsByServiceNames = null;
        } else {
            this.selectTraceIdsByServiceNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("service_name_index").where(QueryBuilder.in((String)"service_name", (Object[])new Object[]{QueryBuilder.bindMarker((String)"service_name")})).and(QueryBuilder.in((String)"bucket", (Object[])new Object[]{QueryBuilder.bindMarker((String)"bucket")})).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).orderBy(new com.datastax.driver.core.querybuilder.Ordering[]{QueryBuilder.desc((String)"ts")}));
        }
        this.traceIdToTimestamp = CassandraSpanStore$$Lambda$1.lambdaFactory$(this);
    }

    public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request) {
        ListenableFuture traceIds;
        ListenableFuture traceIdToTimestamp;
        Preconditions.checkArgument((request.minDuration == null ? 1 : 0) != 0, (Object)"getTraces with duration is unsupported");
        final int traceIndexFetchSize = request.limit * this.indexFetchMultiplier;
        if (request.spanName != null) {
            traceIdToTimestamp = this.getTraceIdsBySpanName(request.serviceName, request.spanName, request.endTs * 1000L, request.lookback * 1000L, traceIndexFetchSize);
        } else if (request.serviceName != null) {
            traceIdToTimestamp = this.getTraceIdsByServiceNames(Collections.singletonList(request.serviceName), request.endTs * 1000L, request.lookback * 1000L, traceIndexFetchSize);
        } else {
            Preconditions.checkArgument((this.selectTraceIdsByServiceNames != null ? 1 : 0) != 0, (Object)"getTraces without serviceName requires Cassandra 2.2 or later");
            traceIdToTimestamp = Futures.transform(this.getServiceNames(), (AsyncFunction)new AsyncFunction<List<String>, Map<Long, Long>>(){

                public ListenableFuture<Map<Long, Long>> apply(@Nullable List<String> serviceNames) {
                    return CassandraSpanStore.this.getTraceIdsByServiceNames(serviceNames, request.endTs * 1000L, request.lookback * 1000L, traceIndexFetchSize);
                }
            });
        }
        List<String> annotationKeys = CassandraUtil.annotationKeys(request);
        if (annotationKeys.isEmpty()) {
            traceIds = Futures.transform((ListenableFuture)traceIdToTimestamp, CassandraUtil.keyset());
        } else {
            ArrayList<Object> futureKeySetsToIntersect = new ArrayList<Object>();
            if (request.spanName != null) {
                futureKeySetsToIntersect.add(traceIdToTimestamp);
            }
            for (String annotationKey : annotationKeys) {
                futureKeySetsToIntersect.add(this.getTraceIdsByAnnotation(annotationKey, request.endTs * 1000L, request.lookback * 1000L, traceIndexFetchSize));
            }
            traceIds = Futures.transform((ListenableFuture)Futures.allAsList(futureKeySetsToIntersect), CassandraUtil.intersectKeySets());
        }
        return Futures.transform((ListenableFuture)traceIds, (AsyncFunction)new AsyncFunction<Set<Long>, List<List<Span>>>(){

            public ListenableFuture<List<List<Span>>> apply(@Nullable Set<Long> traceIds) {
                traceIds = ImmutableSet.copyOf((Iterator)Iterators.limit(traceIds.iterator(), (int)request.limit));
                return Futures.transform(CassandraSpanStore.this.getSpansByTraceIds((Set<Long>)traceIds, CassandraSpanStore.this.maxTraceCols), (Function)new Function<List<Span>, List<List<Span>>>(){

                    public List<List<Span>> apply(@Nullable List<Span> input) {
                        return FluentIterable.from((Iterable)GroupByTraceId.apply(input, (boolean)CassandraSpanStore.this.strictTraceId, (boolean)true)).filter(CassandraSpanStore$2$1$$Lambda$1.lambdaFactory$(request)).toList();
                    }

                    static /* synthetic */ boolean lambda$apply$0(QueryRequest request, List trace) {
                        return ((Span)trace.get((int)0)).traceIdHigh == 0L || request.test(trace);
                    }
                });
            }

            public String toString() {
                return "getSpansByTraceIds";
            }
        });
    }

    public ListenableFuture<List<Span>> getRawTrace(long traceId) {
        return this.getRawTrace(0L, traceId);
    }

    public ListenableFuture<List<Span>> getRawTrace(final long traceIdHigh, long traceIdLow) {
        return Futures.transform(this.getSpansByTraceIds(Collections.singleton(traceIdLow), this.maxTraceCols), (Function)new Function<List<Span>, List<Span>>(){

            public List<Span> apply(@Nullable List<Span> input) {
                if (CassandraSpanStore.this.strictTraceId) {
                    Iterator<Span> spans = input.iterator();
                    while (spans.hasNext()) {
                        long nextTraceIdHigh = spans.next().traceIdHigh;
                        if (nextTraceIdHigh == 0L || nextTraceIdHigh == traceIdHigh) continue;
                        spans.remove();
                    }
                }
                return input.isEmpty() ? null : input;
            }
        });
    }

    public ListenableFuture<List<Span>> getTrace(long traceId) {
        return this.getTrace(0L, traceId);
    }

    public ListenableFuture<List<Span>> getTrace(long traceIdHigh, long traceIdLow) {
        return Futures.transform(this.getRawTrace(traceIdHigh, traceIdLow), (Function)AdjustTrace.INSTANCE);
    }

    public ListenableFuture<List<String>> getServiceNames() {
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectServiceNames, "select-service-names");
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<String>>(){

                public List<String> apply(@Nullable ResultSet input) {
                    HashSet<String> serviceNames = new HashSet<String>();
                    for (Row row : input) {
                        serviceNames.add(row.getString("service_name"));
                    }
                    return Ordering.natural().sortedCopy(serviceNames);
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    public ListenableFuture<List<String>> getSpanNames(String serviceName) {
        if (serviceName == null || serviceName.isEmpty()) {
            return EMPTY_LIST;
        }
        serviceName = ((String)Preconditions.checkNotNull((Object)serviceName, (Object)"serviceName")).toLowerCase();
        int bucket = 0;
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectSpanNames, "select-span-names").setString("service_name", serviceName).setInt("bucket", bucket).setInt("limit_", 1000);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<String>>(){

                public List<String> apply(@Nullable ResultSet input) {
                    HashSet<String> spanNames = new HashSet<String>();
                    for (Row row : input) {
                        spanNames.add(row.getString("span_name"));
                    }
                    return Ordering.natural().sortedCopy(spanNames);
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    public ListenableFuture<List<DependencyLink>> getDependencies(long endTs, @Nullable Long lookback) {
        List days = Util.getDays((long)endTs, (Long)lookback);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectDependencies, "select-dependencies").setList("days", days);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)ConvertDependenciesResponse.INSTANCE);
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<List<Span>> getSpansByTraceIds(Set<Long> traceIds, int limit) {
        Preconditions.checkNotNull(traceIds, (Object)"traceIds");
        if (traceIds.isEmpty()) {
            return Futures.immediateFuture(Collections.emptyList());
        }
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraces, "select-traces").setSet("trace_id", traceIds).setInt("limit_", limit);
            bound.setFetchSize(Integer.MAX_VALUE);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<Span>>(){

                public List<Span> apply(@Nullable ResultSet input) {
                    ArrayList<Span> result = new ArrayList<Span>(input.getAvailableWithoutFetching());
                    for (Row row : input) {
                        result.add(Codec.THRIFT.readSpan(row.getBytes("span")));
                    }
                    return result;
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsByServiceNames(List<String> serviceNames, long endTs, long lookback, int limit) {
        if (serviceNames.isEmpty()) {
            return Futures.immediateFuture(Collections.emptyMap());
        }
        long startTs = Math.max(endTs - lookback, 0L);
        try {
            BoundStatement bound = serviceNames.size() == 1 ? CassandraUtil.bindWithName(this.selectTraceIdsByServiceName, "select-trace-ids-by-service-name").setString("service_name", serviceNames.get(0)).setSet("bucket", this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit) : CassandraUtil.bindWithName(this.selectTraceIdsByServiceNames, "select-trace-ids-by-service-names").setList("service_name", serviceNames).setSet("bucket", this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit);
            bound.setFetchSize(Integer.MAX_VALUE);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), this.traceIdToTimestamp);
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsBySpanName(String serviceName, String spanName, long endTs, long lookback, int limit) {
        Preconditions.checkArgument((serviceName != null ? 1 : 0) != 0, (Object)"serviceName required on spanName query");
        Preconditions.checkArgument((spanName != null ? 1 : 0) != 0, (Object)"spanName required on spanName query");
        String serviceSpanName = serviceName + "." + spanName;
        long startTs = Math.max(endTs - lookback, 0L);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraceIdsBySpanName, "select-trace-ids-by-span-name").setString("service_span_name", serviceSpanName).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), this.traceIdToTimestamp);
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsByAnnotation(String annotationKey, long endTs, long lookback, int limit) {
        long startTs = Math.max(endTs - lookback, 0L);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraceIdsByAnnotation, "select-trace-ids-by-annotation").setBytes("annotation", CassandraUtil.toByteBuffer(annotationKey)).setSet("bucket", this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(startTs)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(endTs)).setInt("limit_", limit);
            bound.setFetchSize(Integer.MAX_VALUE);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, Map<Long, Long>>(){

                public Map<Long, Long> apply(@Nullable ResultSet input) {
                    LinkedHashMap<Long, Long> traceIdsToTimestamps = new LinkedHashMap<Long, Long>();
                    for (Row row : input) {
                        traceIdsToTimestamps.put(row.getLong("trace_id"), CassandraSpanStore.this.timestampCodec.deserialize(row, "ts"));
                    }
                    return traceIdsToTimestamps;
                }
            });
        }
        catch (RuntimeException | CharacterCodingException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    static /* synthetic */ Map lambda$new$0(CassandraSpanStore this_, ResultSet input) {
        LinkedHashMap<Long, Long> result = new LinkedHashMap<Long, Long>();
        for (Row row : input) {
            result.put(row.getLong("trace_id"), this_.timestampCodec.deserialize(row, "ts"));
        }
        return result;
    }

    static enum ConvertDependenciesResponse implements Function<ResultSet, List<DependencyLink>>
    {
        INSTANCE;


        public List<DependencyLink> apply(@Nullable ResultSet rs) {
            ImmutableList.Builder unmerged = ImmutableList.builder();
            for (Row row : rs) {
                ByteBuffer encodedDayOfDependencies = row.getBytes("dependencies");
                for (DependencyLink link : Dependencies.fromThrift((ByteBuffer)encodedDayOfDependencies).links) {
                    unmerged.add((Object)link);
                }
            }
            return DependencyLinker.merge((Iterable)unmerged.build());
        }
    }

    static enum AdjustTrace implements Function<Collection<Span>, List<Span>>
    {
        INSTANCE;


        public List<Span> apply(@Nullable Collection<Span> input) {
            List result = CorrectForClockSkew.apply((List)MergeById.apply(input));
            return result.isEmpty() ? null : result;
        }
    }
}

