package zipkin2.storage.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;
import zipkin2.Call;
import zipkin2.DependencyLink;
import zipkin2.Span;
import zipkin2.codec.DependencyLinkBytesDecoder;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.storage.QueryRequest;
import zipkin2.storage.ServiceAndSpanNames;
import zipkin2.storage.SpanStore;
import zipkin2.storage.Traces;
import zipkin2.storage.kafka.internal.KafkaStoreListCall;
import zipkin2.storage.kafka.internal.KafkaStoreScatterGatherListCall;
import zipkin2.storage.kafka.internal.KafkaStoreSingleKeyListCall;
import zipkin2.storage.kafka.streams.DependencyStorageTopology;
import zipkin2.storage.kafka.streams.TraceStorageTopology;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/KafkaSpanStore.class */
public final class KafkaSpanStore implements SpanStore, Traces, ServiceAndSpanNames {
    static final ObjectMapper MAPPER = new ObjectMapper();
    final KafkaStorage storage;
    final BiFunction<String, Integer, String> httpBaseUrl;
    final boolean traceSearchEnabled;
    final boolean traceByIdQueryEnabled;
    final boolean dependencyQueryEnabled;

    /* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/KafkaSpanStore$GetDependenciesCall.class */
    static final class GetDependenciesCall extends KafkaStoreScatterGatherListCall<DependencyLink> {
        static final long DEPENDENCIES_LIMIT = 1000;
        final KafkaStreams dependencyStoreStream;
        final BiFunction<String, Integer, String> httpBaseUrl;
        final long endTs;
        final long lookback;

        GetDependenciesCall(KafkaStreams kafkaStreams, BiFunction<String, Integer, String> biFunction, long j, long j2) {
            super(kafkaStreams, DependencyStorageTopology.DEPENDENCIES_STORE_NAME, biFunction, "/dependencies?endTs=" + j + "&lookback=" + j2, DEPENDENCIES_LIMIT);
            this.dependencyStoreStream = kafkaStreams;
            this.httpBaseUrl = biFunction;
            this.endTs = j;
            this.lookback = j2;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // zipkin2.storage.kafka.internal.KafkaStoreListCall
        public DependencyLink parseItem(JsonNode jsonNode) throws JsonProcessingException {
            return (DependencyLink) DependencyLinkBytesDecoder.JSON_V1.decodeOne(KafkaSpanStore.MAPPER.writeValueAsBytes(jsonNode));
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public Call<List<DependencyLink>> m206clone() {
            return new GetDependenciesCall(this.dependencyStoreStream, this.httpBaseUrl, this.endTs, this.lookback);
        }
    }

    /* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/KafkaSpanStore$GetRemoteServiceNamesCall.class */
    static final class GetRemoteServiceNamesCall extends KafkaStoreSingleKeyListCall<String> {
        final KafkaStreams traceStoreStream;
        final String serviceName;
        final BiFunction<String, Integer, String> httpBaseUrl;

        GetRemoteServiceNamesCall(KafkaStreams kafkaStreams, String str, BiFunction<String, Integer, String> biFunction) {
            super(kafkaStreams, TraceStorageTopology.REMOTE_SERVICE_NAMES_STORE_NAME, biFunction, "/serviceNames/" + str + "/remoteServiceNames", str);
            this.traceStoreStream = kafkaStreams;
            this.serviceName = str;
            this.httpBaseUrl = biFunction;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // zipkin2.storage.kafka.internal.KafkaStoreListCall
        public String parseItem(JsonNode jsonNode) {
            return jsonNode.textValue();
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public Call<List<String>> m207clone() {
            return new GetRemoteServiceNamesCall(this.traceStoreStream, this.serviceName, this.httpBaseUrl);
        }
    }

    /* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/KafkaSpanStore$GetServiceNamesCall.class */
    static final class GetServiceNamesCall extends KafkaStoreScatterGatherListCall<String> {
        static final long SERVICE_NAMES_LIMIT = 1000;
        final KafkaStreams traceStoreStream;
        final BiFunction<String, Integer, String> httpBaseUrl;

        GetServiceNamesCall(KafkaStreams kafkaStreams, BiFunction<String, Integer, String> biFunction) {
            super(kafkaStreams, TraceStorageTopology.SERVICE_NAMES_STORE_NAME, biFunction, "/serviceNames", SERVICE_NAMES_LIMIT);
            this.traceStoreStream = kafkaStreams;
            this.httpBaseUrl = biFunction;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // zipkin2.storage.kafka.internal.KafkaStoreListCall
        public String parseItem(JsonNode jsonNode) {
            return jsonNode.textValue();
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public Call<List<String>> m208clone() {
            return new GetServiceNamesCall(this.traceStoreStream, this.httpBaseUrl);
        }
    }

    /* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/KafkaSpanStore$GetSpanNamesCall.class */
    static final class GetSpanNamesCall extends KafkaStoreSingleKeyListCall<String> {
        final KafkaStreams traceStoreStream;
        final String serviceName;
        final BiFunction<String, Integer, String> httpBaseUrl;

        GetSpanNamesCall(KafkaStreams kafkaStreams, String str, BiFunction<String, Integer, String> biFunction) {
            super(kafkaStreams, TraceStorageTopology.SPAN_NAMES_STORE_NAME, biFunction, "/serviceNames/" + str + "/spanNames", str);
            this.traceStoreStream = kafkaStreams;
            this.serviceName = str;
            this.httpBaseUrl = biFunction;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // zipkin2.storage.kafka.internal.KafkaStoreListCall
        public String parseItem(JsonNode jsonNode) {
            return jsonNode.textValue();
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public Call<List<String>> m209clone() {
            return new GetSpanNamesCall(this.traceStoreStream, this.serviceName, this.httpBaseUrl);
        }
    }

    /* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/KafkaSpanStore$GetTraceCall.class */
    static final class GetTraceCall extends KafkaStoreSingleKeyListCall<Span> {
        final KafkaStreams traceStoreStream;
        final BiFunction<String, Integer, String> httpBaseUrl;
        final String traceId;

        GetTraceCall(KafkaStreams kafkaStreams, BiFunction<String, Integer, String> biFunction, String str) {
            super(kafkaStreams, TraceStorageTopology.TRACES_STORE_NAME, biFunction, String.format("/traces/%s", str), str);
            this.traceStoreStream = kafkaStreams;
            this.httpBaseUrl = biFunction;
            this.traceId = str;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // zipkin2.storage.kafka.internal.KafkaStoreListCall
        public Span parseItem(JsonNode jsonNode) throws JsonProcessingException {
            return (Span) SpanBytesDecoder.JSON_V2.decodeOne(KafkaSpanStore.MAPPER.writeValueAsBytes(jsonNode));
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public Call<List<Span>> m210clone() {
            return new GetTraceCall(this.traceStoreStream, this.httpBaseUrl, this.traceId);
        }
    }

    /* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/KafkaSpanStore$GetTraceManyCall.class */
    static final class GetTraceManyCall extends KafkaStoreListCall<List<Span>> {
        static final StringSerializer STRING_SERIALIZER = new StringSerializer();
        final KafkaStreams traceStoreStream;
        final BiFunction<String, Integer, String> httpBaseUrl;
        final String traceIds;

        GetTraceManyCall(KafkaStreams kafkaStreams, BiFunction<String, Integer, String> biFunction, String str) {
            super(kafkaStreams, TraceStorageTopology.TRACES_STORE_NAME, biFunction, "/traceMany?traceIds=" + str);
            this.traceStoreStream = kafkaStreams;
            this.httpBaseUrl = biFunction;
            this.traceIds = str;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // zipkin2.storage.kafka.internal.KafkaStoreListCall
        public List<Span> parseItem(JsonNode jsonNode) throws JsonProcessingException {
            return SpanBytesDecoder.JSON_V2.decodeList(KafkaSpanStore.MAPPER.writeValueAsBytes(jsonNode));
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public Call<List<List<Span>>> m212clone() {
            return new GetTraceManyCall(this.traceStoreStream, this.httpBaseUrl, this.traceIds);
        }

        @Override // zipkin2.storage.kafka.internal.KafkaStoreListCall
        protected CompletableFuture<List<List<Span>>> listFuture() {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            for (String str : this.traceIds.split(",", JsonConverterConfig.SCHEMAS_CACHE_SIZE_DEFAULT)) {
                StreamsMetadata metadataForKey = this.traceStoreStream.metadataForKey(TraceStorageTopology.TRACES_STORE_NAME, str, (Serializer<String>) STRING_SERIALIZER);
                List list = (List) linkedHashMap.get(metadataForKey.hostInfo());
                if (list == null) {
                    list = new ArrayList();
                }
                list.add(str);
                linkedHashMap.put(metadataForKey.hostInfo(), list);
            }
            List list2 = (List) linkedHashMap.entrySet().stream().map(entry -> {
                return httpClient((HostInfo) entry.getKey()).get("/traceMany?traceIds=" + String.join(",", (Iterable<? extends CharSequence>) entry.getValue()));
            }).map((v0) -> {
                return v0.aggregate();
            }).collect(Collectors.toList());
            return CompletableFuture.allOf((CompletableFuture[]) list2.toArray(new CompletableFuture[0])).thenApply(r5 -> {
                return (List) list2.stream().map(completableFuture -> {
                    return (AggregatedHttpResponse) completableFuture.getNow(AggregatedHttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR));
                }).map(this::content).map(this::parseList).flatMap((v0) -> {
                    return v0.stream();
                }).distinct().collect(Collectors.toList());
            });
        }
    }

    /* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/KafkaSpanStore$GetTracesCall.class */
    static final class GetTracesCall extends KafkaStoreScatterGatherListCall<List<Span>> {
        final KafkaStreams traceStoreStream;
        final BiFunction<String, Integer, String> httpBaseUrl;
        final QueryRequest request;

        GetTracesCall(KafkaStreams kafkaStreams, BiFunction<String, Integer, String> biFunction, QueryRequest queryRequest) {
            super(kafkaStreams, TraceStorageTopology.TRACES_STORE_NAME, biFunction, "/traces?" + (queryRequest.serviceName() == null ? "" : "serviceName=" + queryRequest.serviceName() + "&") + (queryRequest.remoteServiceName() == null ? "" : "remoteServiceName=" + queryRequest.remoteServiceName() + "&") + (queryRequest.spanName() == null ? "" : "spanName=" + queryRequest.spanName() + "&") + (queryRequest.annotationQueryString() == null ? "" : "annotationQuery=" + queryRequest.annotationQueryString() + "&") + (queryRequest.minDuration() == null ? "" : "minDuration=" + queryRequest.minDuration() + "&") + (queryRequest.maxDuration() == null ? "" : "maxDuration=" + queryRequest.maxDuration() + "&") + "endTs=" + queryRequest.endTs() + "&lookback=" + queryRequest.lookback() + "&limit=" + queryRequest.limit(), queryRequest.limit());
            this.traceStoreStream = kafkaStreams;
            this.httpBaseUrl = biFunction;
            this.request = queryRequest;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // zipkin2.storage.kafka.internal.KafkaStoreListCall
        public List<Span> parseItem(JsonNode jsonNode) throws JsonProcessingException {
            return SpanBytesDecoder.JSON_V2.decodeList(KafkaSpanStore.MAPPER.writeValueAsBytes(jsonNode));
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public Call<List<List<Span>>> m213clone() {
            return new GetTracesCall(this.traceStoreStream, this.httpBaseUrl, this.request);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSpanStore(KafkaStorage kafkaStorage) {
        this.storage = kafkaStorage;
        this.httpBaseUrl = kafkaStorage.httpBaseUrl;
        this.traceByIdQueryEnabled = kafkaStorage.traceByIdQueryEnabled;
        this.traceSearchEnabled = kafkaStorage.traceSearchEnabled;
        this.dependencyQueryEnabled = kafkaStorage.dependencyQueryEnabled;
    }

    public Call<List<List<Span>>> getTraces(QueryRequest queryRequest) {
        return this.traceSearchEnabled ? new GetTracesCall(this.storage.getTraceStorageStream(), this.httpBaseUrl, queryRequest) : Call.emptyList();
    }

    public Call<List<Span>> getTrace(String str) {
        return this.traceByIdQueryEnabled ? new GetTraceCall(this.storage.getTraceStorageStream(), this.httpBaseUrl, Span.normalizeTraceId(str)) : Call.emptyList();
    }

    public Call<List<List<Span>>> getTraces(Iterable<String> iterable) {
        if (!this.traceByIdQueryEnabled) {
            return Call.emptyList();
        }
        StringJoiner stringJoiner = new StringJoiner(",");
        Iterator<String> it = iterable.iterator();
        while (it.hasNext()) {
            stringJoiner.add(Span.normalizeTraceId(it.next()));
        }
        return stringJoiner.length() == 0 ? Call.emptyList() : new GetTraceManyCall(this.storage.getTraceStorageStream(), this.httpBaseUrl, stringJoiner.toString());
    }

    @Deprecated
    public Call<List<String>> getServiceNames() {
        return this.traceSearchEnabled ? new GetServiceNamesCall(this.storage.getTraceStorageStream(), this.httpBaseUrl) : Call.emptyList();
    }

    @Deprecated
    public Call<List<String>> getSpanNames(String str) {
        return this.traceSearchEnabled ? new GetSpanNamesCall(this.storage.getTraceStorageStream(), str, this.httpBaseUrl) : Call.emptyList();
    }

    public Call<List<String>> getRemoteServiceNames(String str) {
        return this.traceSearchEnabled ? new GetRemoteServiceNamesCall(this.storage.getTraceStorageStream(), str, this.httpBaseUrl) : Call.emptyList();
    }

    public Call<List<DependencyLink>> getDependencies(long j, long j2) {
        return this.dependencyQueryEnabled ? new GetDependenciesCall(this.storage.getDependencyStorageStream(), this.httpBaseUrl, j, j2) : Call.emptyList();
    }
}
