/*
 * Decompiled with CFR 0.152.
 */
package zipkin.storage.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.BindMarker;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.utils.Bytes;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.storage.cassandra.CassandraUtil;
import zipkin.storage.cassandra.Schema;
import zipkin.storage.cassandra.TimestampCodec;
import zipkin.storage.guava.GuavaSpanConsumer;

final class CassandraSpanConsumer
implements GuavaSpanConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanConsumer.class);
    private static final long WRITTEN_NAMES_TTL = Long.getLong("zipkin.store.cassandra.internal.writtenNamesTtl", 3600000L);
    private static final Function<Object, Void> TO_VOID = Functions.constant(null);
    private static final Random RAND = new Random();
    private final Session session;
    private final TimestampCodec timestampCodec;
    private final int bucketCount;
    @Deprecated
    private final int spanTtl;
    @Deprecated
    private final int indexTtl;
    private final PreparedStatement insertSpan;
    private final PreparedStatement insertServiceName;
    private final PreparedStatement insertSpanName;
    private final PreparedStatement insertTraceIdByServiceName;
    private final PreparedStatement insertTraceIdBySpanName;
    private final PreparedStatement insertTraceIdByAnnotation;
    private final PreparedStatement insertTraceIdBySpanDuration;
    private final Schema.Metadata metadata;
    private final ThreadLocal<Set<String>> writtenNames = new ThreadLocal<Set<String>>(){
        private long cacheInterval = this.toCacheInterval(System.currentTimeMillis());

        @Override
        protected Set<String> initialValue() {
            return new HashSet<String>();
        }

        @Override
        public Set<String> get() {
            long newCacheInterval = this.toCacheInterval(System.currentTimeMillis());
            if (this.cacheInterval != newCacheInterval) {
                this.cacheInterval = newCacheInterval;
                this.set(new HashSet());
            }
            return (Set)super.get();
        }

        private long toCacheInterval(long ms) {
            return ms / WRITTEN_NAMES_TTL;
        }
    };

    CassandraSpanConsumer(Session session, int bucketCount, int spanTtl, int indexTtl) {
        this.session = session;
        this.timestampCodec = new TimestampCodec(session);
        this.bucketCount = bucketCount;
        this.spanTtl = spanTtl;
        this.indexTtl = indexTtl;
        this.metadata = Schema.readMetadata(session);
        this.insertSpan = session.prepare(this.maybeUseTtl(QueryBuilder.insertInto((String)"traces").value("trace_id", (Object)QueryBuilder.bindMarker((String)"trace_id")).value("ts", (Object)QueryBuilder.bindMarker((String)"ts")).value("span_name", (Object)QueryBuilder.bindMarker((String)"span_name")).value("span", (Object)QueryBuilder.bindMarker((String)"span"))));
        this.insertServiceName = session.prepare(this.maybeUseTtl(QueryBuilder.insertInto((String)"service_names").value("service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))));
        this.insertSpanName = session.prepare(this.maybeUseTtl(QueryBuilder.insertInto((String)"span_names").value("service_name", (Object)QueryBuilder.bindMarker((String)"service_name")).value("bucket", (Object)QueryBuilder.bindMarker((String)"bucket")).value("span_name", (Object)QueryBuilder.bindMarker((String)"span_name"))));
        this.insertTraceIdByServiceName = session.prepare(this.maybeUseTtl(QueryBuilder.insertInto((String)"service_name_index").value("service_name", (Object)QueryBuilder.bindMarker((String)"service_name")).value("bucket", (Object)QueryBuilder.bindMarker((String)"bucket")).value("ts", (Object)QueryBuilder.bindMarker((String)"ts")).value("trace_id", (Object)QueryBuilder.bindMarker((String)"trace_id"))));
        this.insertTraceIdBySpanName = session.prepare(this.maybeUseTtl(QueryBuilder.insertInto((String)"service_span_name_index").value("service_span_name", (Object)QueryBuilder.bindMarker((String)"service_span_name")).value("ts", (Object)QueryBuilder.bindMarker((String)"ts")).value("trace_id", (Object)QueryBuilder.bindMarker((String)"trace_id"))));
        this.insertTraceIdByAnnotation = session.prepare(this.maybeUseTtl(QueryBuilder.insertInto((String)"annotations_index").value("annotation", (Object)QueryBuilder.bindMarker((String)"annotation")).value("bucket", (Object)QueryBuilder.bindMarker((String)"bucket")).value("ts", (Object)QueryBuilder.bindMarker((String)"ts")).value("trace_id", (Object)QueryBuilder.bindMarker((String)"trace_id"))));
        this.insertTraceIdBySpanDuration = session.prepare(this.maybeUseTtl(QueryBuilder.insertInto((String)"span_duration_index").value("service_name", (Object)QueryBuilder.bindMarker((String)"service_name")).value("span_name", (Object)QueryBuilder.bindMarker((String)"span_name")).value("bucket", (Object)QueryBuilder.bindMarker((String)"bucket")).value("duration", (Object)QueryBuilder.bindMarker((String)"duration")).value("ts", (Object)QueryBuilder.bindMarker((String)"ts")).value("trace_id", (Object)QueryBuilder.bindMarker((String)"trace_id"))));
    }

    private RegularStatement maybeUseTtl(Insert value) {
        return this.metadata.hasDefaultTtl ? value : value.using(QueryBuilder.ttl((BindMarker)QueryBuilder.bindMarker((String)"ttl_")));
    }

    public ListenableFuture<Void> accept(List<Span> spans) {
        LinkedList futures = new LinkedList();
        for (Span span : spans) {
            span = ApplyTimestampAndDuration.apply((Span)span);
            futures.add(this.storeSpan(span.traceId, span.timestamp != null ? span.timestamp : 0L, String.format("%d_%d_%d", span.id, span.annotations.hashCode(), span.binaryAnnotations.hashCode()), ByteBuffer.wrap(Codec.THRIFT.writeSpan(span))));
            for (String serviceName : span.serviceNames()) {
                futures.add(this.storeServiceName(serviceName));
                if (!span.name.isEmpty()) {
                    futures.add(this.storeSpanName(serviceName, span.name));
                }
                if (span.timestamp == null) continue;
                futures.add(this.storeTraceIdByServiceName(serviceName, span.timestamp, span.traceId));
                if (!span.name.isEmpty()) {
                    futures.add(this.storeTraceIdBySpanName(serviceName, span.name, span.timestamp, span.traceId));
                }
                if (span.duration == null) continue;
                futures.add(this.storeTraceIdByDuration(serviceName, span.name, span.timestamp, span.duration, span.traceId));
                if (span.name.isEmpty()) continue;
                futures.add(this.storeTraceIdByDuration(serviceName, "", span.timestamp, span.duration, span.traceId));
            }
            if (span.timestamp == null) continue;
            for (String annotation : CassandraUtil.annotationKeys(span)) {
                futures.add(this.storeTraceIdByAnnotation(annotation, span.timestamp, span.traceId));
            }
        }
        return Futures.transform((ListenableFuture)Futures.allAsList(futures), TO_VOID);
    }

    ListenableFuture<?> storeSpan(long traceId, long timestamp, String spanName, ByteBuffer span) {
        Preconditions.checkNotNull((Object)spanName);
        Preconditions.checkArgument((!spanName.isEmpty() ? 1 : 0) != 0);
        try {
            if (0L == timestamp && this.metadata.compactionClass.contains("DateTieredCompactionStrategy")) {
                LOG.warn("Span {} in trace {} had no timestamp. If this happens a lot consider switching back to SizeTieredCompactionStrategy for {}.traces", new Object[]{spanName, traceId, this.session.getLoggedKeyspace()});
            }
            BoundStatement bound = CassandraUtil.bindWithName(this.insertSpan, "insert-span").setLong("trace_id", traceId).setBytesUnsafe("ts", this.timestampCodec.serialize(timestamp)).setString("span_name", spanName).setBytes("span", span);
            if (!this.metadata.hasDefaultTtl) {
                bound.setInt("ttl_", this.spanTtl);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugInsertSpan(traceId, timestamp, spanName, span));
            }
            return this.session.executeAsync((Statement)bound);
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.debugInsertSpan(traceId, timestamp, spanName, span), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    private String debugInsertSpan(long traceId, long timestamp, String spanName, ByteBuffer span) {
        return this.insertSpan.getQueryString().replace(":trace_id", String.valueOf(traceId)).replace(":ts", String.valueOf(timestamp)).replace(":span_name", spanName).replace(":span", Bytes.toHexString((ByteBuffer)span)).replace(":ttl_", String.valueOf(this.spanTtl));
    }

    ListenableFuture<?> storeServiceName(String serviceName) {
        Preconditions.checkNotNull((Object)serviceName);
        Preconditions.checkArgument((!serviceName.isEmpty() ? 1 : 0) != 0);
        if (this.writtenNames.get().add(serviceName)) {
            try {
                BoundStatement bound = CassandraUtil.bindWithName(this.insertServiceName, "insert-service-name").setString("service_name", serviceName);
                if (!this.metadata.hasDefaultTtl) {
                    bound.setInt("ttl_", this.indexTtl);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.debugInsertServiceName(serviceName));
                }
                return this.session.executeAsync((Statement)bound);
            }
            catch (RuntimeException ex) {
                LOG.error("failed " + this.debugInsertServiceName(serviceName), (Throwable)ex);
                this.writtenNames.get().remove(serviceName);
                throw ex;
            }
        }
        return Futures.immediateFuture(null);
    }

    private String debugInsertServiceName(String serviceName) {
        return this.insertServiceName.getQueryString().replace(":service_name", serviceName).replace(":ttl_", String.valueOf(this.indexTtl));
    }

    ListenableFuture<?> storeSpanName(String serviceName, String spanName) {
        Preconditions.checkNotNull((Object)serviceName);
        Preconditions.checkArgument((!serviceName.isEmpty() ? 1 : 0) != 0);
        Preconditions.checkNotNull((Object)spanName);
        Preconditions.checkArgument((!spanName.isEmpty() ? 1 : 0) != 0);
        int bucket = 0;
        if (this.writtenNames.get().add(serviceName + "\u2013\u2013" + spanName)) {
            try {
                BoundStatement bound = CassandraUtil.bindWithName(this.insertSpanName, "insert-span-name").setString("service_name", serviceName).setInt("bucket", bucket).setString("span_name", spanName);
                if (!this.metadata.hasDefaultTtl) {
                    bound.setInt("ttl_", this.indexTtl);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.debugInsertSpanName(bucket, serviceName, spanName));
                }
                return this.session.executeAsync((Statement)bound);
            }
            catch (RuntimeException ex) {
                LOG.error("failed " + this.debugInsertSpanName(bucket, serviceName, spanName), (Throwable)ex);
                this.writtenNames.get().remove(serviceName + "\u2013\u2013" + spanName);
                return Futures.immediateFailedFuture((Throwable)ex);
            }
        }
        return Futures.immediateFuture(null);
    }

    private String debugInsertSpanName(int bucket, String serviceName, String spanName) {
        return this.insertSpanName.getQueryString().replace(":bucket", String.valueOf(bucket)).replace(":service_name", serviceName).replace(":span_name", spanName).replace(":ttl_", String.valueOf(this.indexTtl));
    }

    ListenableFuture<?> storeTraceIdByServiceName(String serviceName, long timestamp, long traceId) {
        Preconditions.checkNotNull((Object)serviceName);
        Preconditions.checkArgument((!serviceName.isEmpty() ? 1 : 0) != 0);
        int bucket = RAND.nextInt(this.bucketCount);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.insertTraceIdByServiceName, "insert-trace-id-by-service-name").setInt("bucket", bucket).setString("service_name", serviceName).setBytesUnsafe("ts", this.timestampCodec.serialize(timestamp)).setLong("trace_id", traceId);
            if (!this.metadata.hasDefaultTtl) {
                bound.setInt("ttl_", this.indexTtl);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugInsertTraceIdByServiceName(bucket, serviceName, timestamp, traceId));
            }
            return this.session.executeAsync((Statement)bound);
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.debugInsertTraceIdByServiceName(bucket, serviceName, timestamp, traceId), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    private String debugInsertTraceIdByServiceName(int bucket, String serviceName, long timestamp, long traceId) {
        return this.insertTraceIdByServiceName.getQueryString().replace(":bucket", String.valueOf(bucket)).replace(":service_name", serviceName).replace(":ts", CassandraUtil.iso8601(timestamp)).replace(":trace_id", String.valueOf(traceId)).replace(":ttl_", String.valueOf(this.indexTtl));
    }

    ListenableFuture<?> storeTraceIdBySpanName(String serviceName, String spanName, long timestamp, long traceId) {
        Preconditions.checkNotNull((Object)serviceName);
        Preconditions.checkArgument((!serviceName.isEmpty() ? 1 : 0) != 0);
        Preconditions.checkNotNull((Object)spanName);
        Preconditions.checkArgument((!spanName.isEmpty() ? 1 : 0) != 0);
        String serviceSpanName = serviceName + "." + spanName;
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.insertTraceIdBySpanName, "insert-trace-id-by-span-name").setString("service_span_name", serviceSpanName).setBytesUnsafe("ts", this.timestampCodec.serialize(timestamp)).setLong("trace_id", traceId);
            if (!this.metadata.hasDefaultTtl) {
                bound.setInt("ttl_", this.indexTtl);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugInsertTraceIdBySpanName(serviceSpanName, timestamp, traceId));
            }
            return this.session.executeAsync((Statement)bound);
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.debugInsertTraceIdBySpanName(serviceSpanName, timestamp, traceId), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    private String debugInsertTraceIdBySpanName(String serviceSpanName, long timestamp, long traceId) {
        return this.insertTraceIdBySpanName.getQueryString().replace(":service_span_name", serviceSpanName).replace(":ts", String.valueOf(timestamp)).replace(":trace_id", String.valueOf(traceId)).replace(":ttl_", String.valueOf(this.indexTtl));
    }

    ListenableFuture<?> storeTraceIdByAnnotation(String annotationKey, long timestamp, long traceId) {
        int bucket = RAND.nextInt(this.bucketCount);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.insertTraceIdByAnnotation, "insert-trace-id-by-annotation").setInt("bucket", bucket).setBytes("annotation", CassandraUtil.toByteBuffer(annotationKey)).setBytesUnsafe("ts", this.timestampCodec.serialize(timestamp)).setLong("trace_id", traceId);
            if (!this.metadata.hasDefaultTtl) {
                bound.setInt("ttl_", this.indexTtl);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugInsertTraceIdByAnnotation(bucket, annotationKey, timestamp, traceId));
            }
            return this.session.executeAsync((Statement)bound);
        }
        catch (RuntimeException | CharacterCodingException ex) {
            LOG.error("failed " + this.debugInsertTraceIdByAnnotation(bucket, annotationKey, timestamp, traceId), (Throwable)ex);
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    private String debugInsertTraceIdByAnnotation(int bucket, String annotationKey, long timestamp, long traceId) {
        return this.insertTraceIdByAnnotation.getQueryString().replace(":bucket", String.valueOf(bucket)).replace(":annotation", annotationKey).replace(":ts", CassandraUtil.iso8601(timestamp)).replace(":trace_id", String.valueOf(traceId)).replace(":ttl_", String.valueOf(this.indexTtl));
    }

    ListenableFuture<?> storeTraceIdByDuration(String serviceName, String spanName, long timestamp, long duration, long traceId) {
        int bucket = CassandraUtil.durationIndexBucket(timestamp);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.insertTraceIdBySpanDuration, "insert-trace-id-by-span-duration").setInt("bucket", bucket).setString("service_name", serviceName).setString("span_name", spanName).setBytesUnsafe("ts", this.timestampCodec.serialize(timestamp)).setLong("duration", duration).setLong("trace_id", traceId);
            if (!this.metadata.hasDefaultTtl) {
                bound.setInt("ttl_", this.indexTtl);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugInsertTraceIdBySpanDuration(bucket, serviceName, spanName, timestamp, duration, traceId));
            }
            return this.session.executeAsync((Statement)bound);
        }
        catch (RuntimeException ex) {
            LOG.error("failed " + this.debugInsertTraceIdBySpanDuration(bucket, serviceName, spanName, timestamp, duration, traceId));
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    private String debugInsertTraceIdBySpanDuration(int bucket, String serviceName, String spanName, long timestamp, long duration, long traceId) {
        return this.insertTraceIdBySpanDuration.getQueryString().replace(":bucket", String.valueOf(bucket)).replace(":service_name", serviceName).replace(":span_name", spanName).replace(":ts", CassandraUtil.iso8601(timestamp)).replace(":duration", String.valueOf(duration)).replace(":trace_id", String.valueOf(traceId)).replace(":ttl_", String.valueOf(this.indexTtl));
    }
}

