package zipkin2.storage.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.annotation.Default;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
import com.linecorp.armeria.server.annotation.ProducesJson;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.DependencyLink;
import zipkin2.Span;
import zipkin2.codec.DependencyLinkBytesEncoder;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.internal.DependencyLinker;
import zipkin2.storage.QueryRequest;
import zipkin2.storage.kafka.streams.DependencyStorageTopology;
import zipkin2.storage.kafka.streams.TraceStorageTopology;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:lib/zipkin-storage-kafka-0.9.1.jar:zipkin2/storage/kafka/KafkaStorageHttpService.class */
public final class KafkaStorageHttpService {
    static final Logger LOG = LoggerFactory.getLogger(KafkaStorageHttpService.class);
    static final ObjectMapper MAPPER = new ObjectMapper();
    final KafkaStorage storage;
    final long minTracesStored;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStorageHttpService(KafkaStorage kafkaStorage) {
        this.storage = kafkaStorage;
        this.minTracesStored = kafkaStorage.minTracesStored;
    }

    @Get("/dependencies")
    public AggregatedHttpResponse getDependencies(@Param("endTs") long j, @Param("lookback") long j2) {
        try {
            if (!this.storage.dependencyQueryEnabled) {
                return AggregatedHttpResponse.of(HttpStatus.NOT_FOUND);
            }
            ReadOnlyWindowStore readOnlyWindowStore = (ReadOnlyWindowStore) this.storage.getDependencyStorageStream().store(DependencyStorageTopology.DEPENDENCIES_STORE_NAME, QueryableStoreTypes.windowStore());
            ArrayList arrayList = new ArrayList();
            Instant ofEpochMilli = Instant.ofEpochMilli(j - j2);
            Instant ofEpochMilli2 = Instant.ofEpochMilli(j);
            KeyValueIterator fetchAll = readOnlyWindowStore.fetchAll(ofEpochMilli, ofEpochMilli2);
            try {
                fetchAll.forEachRemaining(keyValue -> {
                    arrayList.add((DependencyLink) keyValue.value);
                });
                if (fetchAll != null) {
                    fetchAll.close();
                }
                List merge = DependencyLinker.merge(arrayList);
                LOG.debug("Dependencies found from={}-to={}: {}", new Object[]{ofEpochMilli, ofEpochMilli2, Integer.valueOf(merge.size())});
                return AggregatedHttpResponse.of(HttpStatus.OK, MediaType.JSON, DependencyLinkBytesEncoder.JSON_V1.encodeList(merge));
            } finally {
            }
        } catch (InvalidStateStoreException e) {
            LOG.debug("State store is not ready", e);
            return AggregatedHttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE);
        }
    }

    @ProducesJson
    @Get("/serviceNames")
    public JsonNode getServiceNames() {
        try {
            if (!this.storage.traceSearchEnabled) {
                return MAPPER.createArrayNode();
            }
            ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) this.storage.getTraceStorageStream().store(TraceStorageTopology.SERVICE_NAMES_STORE_NAME, QueryableStoreTypes.keyValueStore());
            ArrayNode createArrayNode = MAPPER.createArrayNode();
            KeyValueIterator all = readOnlyKeyValueStore.all();
            try {
                all.forEachRemaining(keyValue -> {
                    createArrayNode.add((String) keyValue.value);
                });
                if (all != null) {
                    all.close();
                }
                return createArrayNode;
            } finally {
            }
        } catch (InvalidStateStoreException e) {
            LOG.debug("State store is not ready", e);
            throw e;
        }
    }

    @ProducesJson
    @Get("/serviceNames/:service_name/spanNames")
    public JsonNode getSpanNames(@Param("service_name") String str) {
        try {
            if (!this.storage.traceSearchEnabled) {
                return MAPPER.createArrayNode();
            }
            Set set = (Set) ((ReadOnlyKeyValueStore) this.storage.getTraceStorageStream().store(TraceStorageTopology.SPAN_NAMES_STORE_NAME, QueryableStoreTypes.keyValueStore())).get(str);
            ArrayNode createArrayNode = MAPPER.createArrayNode();
            if (set != null) {
                Objects.requireNonNull(createArrayNode);
                set.forEach(createArrayNode::add);
            }
            return createArrayNode;
        } catch (InvalidStateStoreException e) {
            LOG.debug("State store is not ready", e);
            throw e;
        }
    }

    @ProducesJson
    @Get("/serviceNames/:service_name/remoteServiceNames")
    public JsonNode getRemoteServiceNames(@Param("service_name") String str) {
        try {
            if (!this.storage.traceSearchEnabled) {
                return MAPPER.createArrayNode();
            }
            Set set = (Set) ((ReadOnlyKeyValueStore) this.storage.getTraceStorageStream().store(TraceStorageTopology.REMOTE_SERVICE_NAMES_STORE_NAME, QueryableStoreTypes.keyValueStore())).get(str);
            ArrayNode createArrayNode = MAPPER.createArrayNode();
            if (set != null) {
                Objects.requireNonNull(createArrayNode);
                set.forEach(createArrayNode::add);
            }
            return createArrayNode;
        } catch (InvalidStateStoreException e) {
            LOG.debug("State store is not ready", e);
            throw e;
        }
    }

    @Get("/traces")
    public AggregatedHttpResponse getTraces(@Param("serviceName") Optional<String> optional, @Param("remoteServiceName") Optional<String> optional2, @Param("spanName") Optional<String> optional3, @Param("annotationQuery") Optional<String> optional4, @Param("minDuration") Optional<Long> optional5, @Param("maxDuration") Optional<Long> optional6, @Param("endTs") Optional<Long> optional7, @Default("86400000") @Param("lookback") Long l, @Default("10") @Param("limit") int i) {
        KeyValueIterator<Long, Set<String>> range;
        try {
            if (!this.storage.traceSearchEnabled) {
                return AggregatedHttpResponse.of(HttpStatus.NOT_FOUND);
            }
            QueryRequest build = QueryRequest.newBuilder().serviceName(optional.orElse(null)).remoteServiceName(optional2.orElse(null)).spanName(optional3.orElse(null)).parseAnnotationQuery(optional4.orElse(null)).minDuration(optional5.orElse(null)).maxDuration(optional6.orElse(null)).endTs(optional7.orElse(Long.valueOf(System.currentTimeMillis())).longValue()).lookback(l.longValue()).limit(i).build();
            ReadOnlyKeyValueStore<String, List<Span>> readOnlyKeyValueStore = (ReadOnlyKeyValueStore) this.storage.getTraceStorageStream().store(TraceStorageTopology.TRACES_STORE_NAME, QueryableStoreTypes.keyValueStore());
            ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) this.storage.getTraceStorageStream().store(TraceStorageTopology.SPAN_IDS_BY_TS_STORE_NAME, QueryableStoreTypes.keyValueStore());
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            long micros = TimeUnit.MILLISECONDS.toMicros(build.endTs() - build.lookback());
            long micros2 = TimeUnit.MILLISECONDS.toMicros(build.endTs());
            long micros3 = TimeUnit.SECONDS.toMicros(30L);
            long j = micros2 - micros3;
            if (j <= micros || readOnlyKeyValueStore.approximateNumEntries() <= this.minTracesStored) {
                range = readOnlyKeyValueStore2.range(Long.valueOf(micros), Long.valueOf(micros2));
                try {
                    addResults(build, readOnlyKeyValueStore, arrayList, arrayList2, range);
                    if (range != null) {
                        range.close();
                    }
                } finally {
                }
            } else {
                while (j > micros && arrayList.size() < build.limit()) {
                    range = readOnlyKeyValueStore2.range(Long.valueOf(j), Long.valueOf(micros2));
                    try {
                        addResults(build, readOnlyKeyValueStore, arrayList, arrayList2, range);
                        if (range != null) {
                            range.close();
                        }
                        micros2 = j;
                        j -= micros3;
                    } finally {
                    }
                }
            }
            arrayList.sort(Comparator.comparingLong(list -> {
                return ((Span) list.get(0)).timestampAsLong();
            }).reversed());
            LOG.debug("Traces found from query {}: {}", build, Integer.valueOf(arrayList.size()));
            return AggregatedHttpResponse.of(HttpStatus.OK, MediaType.JSON, writeTraces((List) arrayList.stream().limit(build.limit()).collect(Collectors.toList())));
        } catch (InvalidStateStoreException e) {
            LOG.debug("State store is not ready", e);
            return AggregatedHttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE);
        }
    }

    void addResults(QueryRequest queryRequest, ReadOnlyKeyValueStore<String, List<Span>> readOnlyKeyValueStore, List<List<Span>> list, List<String> list2, KeyValueIterator<Long, Set<String>> keyValueIterator) {
        keyValueIterator.forEachRemaining(keyValue -> {
            List list3;
            for (String str : (Set) keyValue.value) {
                if (!list2.contains(str) && (list3 = (List) readOnlyKeyValueStore.get(str)) != null && !list3.isEmpty() && queryRequest.test(list3)) {
                    list2.add(str);
                    list.add(list3);
                }
            }
        });
    }

    @Get("/traces/:trace_id")
    public AggregatedHttpResponse getTrace(@Param("trace_id") String str) {
        try {
            if (!this.storage.traceByIdQueryEnabled) {
                return AggregatedHttpResponse.of(HttpStatus.NOT_FOUND);
            }
            return AggregatedHttpResponse.of(HttpStatus.OK, MediaType.JSON, SpanBytesEncoder.JSON_V2.encodeList((List) ((ReadOnlyKeyValueStore) this.storage.getTraceStorageStream().store(TraceStorageTopology.TRACES_STORE_NAME, QueryableStoreTypes.keyValueStore())).get(str)));
        } catch (InvalidStateStoreException e) {
            LOG.debug("State store is not ready", e);
            return AggregatedHttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE);
        }
    }

    @Get("/traceMany")
    public AggregatedHttpResponse getTraces(@Param("traceIds") String str) {
        try {
            if (!this.storage.traceByIdQueryEnabled) {
                return AggregatedHttpResponse.of(HttpStatus.NOT_FOUND);
            }
            ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) this.storage.getTraceStorageStream().store(TraceStorageTopology.TRACES_STORE_NAME, QueryableStoreTypes.keyValueStore());
            ArrayList arrayList = new ArrayList();
            for (String str2 : str.split(",", JsonConverterConfig.SCHEMAS_CACHE_SIZE_DEFAULT)) {
                arrayList.add((List) readOnlyKeyValueStore.get(str2));
            }
            return AggregatedHttpResponse.of(HttpStatus.OK, MediaType.JSON, writeTraces(arrayList));
        } catch (InvalidStateStoreException e) {
            LOG.debug("State store is not ready", e);
            return AggregatedHttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE);
        }
    }

    @ProducesJson
    @Get("/autocompleteTags")
    public JsonNode getAutocompleteTags() {
        try {
            if (!this.storage.traceSearchEnabled) {
                return MAPPER.createArrayNode();
            }
            ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) this.storage.getTraceStorageStream().store(TraceStorageTopology.AUTOCOMPLETE_TAGS_STORE_NAME, QueryableStoreTypes.keyValueStore());
            ArrayNode createArrayNode = MAPPER.createArrayNode();
            KeyValueIterator all = readOnlyKeyValueStore.all();
            try {
                all.forEachRemaining(keyValue -> {
                    createArrayNode.add((String) keyValue.key);
                });
                if (all != null) {
                    all.close();
                }
                return createArrayNode;
            } finally {
            }
        } catch (InvalidStateStoreException e) {
            LOG.debug("State store is not ready", e);
            throw e;
        }
    }

    @ProducesJson
    @Get("/autocompleteTags/:key")
    public JsonNode getAutocompleteValues(@Param("key") String str) {
        try {
            if (!this.storage.traceSearchEnabled) {
                return MAPPER.createArrayNode();
            }
            Set set = (Set) ((ReadOnlyKeyValueStore) this.storage.getTraceStorageStream().store(TraceStorageTopology.AUTOCOMPLETE_TAGS_STORE_NAME, QueryableStoreTypes.keyValueStore())).get(str);
            ArrayNode createArrayNode = MAPPER.createArrayNode();
            if (set != null) {
                Objects.requireNonNull(createArrayNode);
                set.forEach(createArrayNode::add);
            }
            return createArrayNode;
        } catch (InvalidStateStoreException e) {
            LOG.debug("State store is not ready", e);
            throw e;
        }
    }

    @ProducesJson
    @Get("/instances/:store_name")
    public KafkaStreamsMetadata getInstancesByStore(@Param("store_name") String str) {
        Collection<StreamsMetadata> allMetadataForStore = this.storage.getTraceStorageStream().allMetadataForStore(str);
        allMetadataForStore.addAll(this.storage.getDependencyStorageStream().allMetadataForStore(str));
        return KafkaStreamsMetadata.create(allMetadataForStore);
    }

    @ProducesJson
    @Get("/instances")
    public KafkaStreamsMetadata getInstances() {
        Collection<StreamsMetadata> allMetadata = this.storage.getTraceStorageStream().allMetadata();
        allMetadata.addAll(this.storage.getDependencyStorageStream().allMetadata());
        return KafkaStreamsMetadata.create(allMetadata);
    }

    static byte[] writeTraces(List<List<Span>> list) {
        int size = list.size();
        int i = size > 1 ? 2 + (size - 1) : 2;
        for (List<Span> list2 : list) {
            int size2 = list2.size();
            i += 2;
            if (size2 > 1) {
                i += size2 - 1;
            }
            Iterator<Span> it = list2.iterator();
            while (it.hasNext()) {
                i += SpanBytesEncoder.JSON_V2.sizeInBytes(it.next());
            }
        }
        byte[] bArr = new byte[i];
        int i2 = 0 + 1;
        bArr[0] = 91;
        for (int i3 = 0; i3 < size; i3++) {
            i2 += SpanBytesEncoder.JSON_V2.encodeList(list.get(i3), bArr, i2);
            if (i3 + 1 < size) {
                i2++;
                bArr[i2] = 44;
            }
        }
        bArr[i2] = 93;
        return bArr;
    }
}
