package zipkin2.storage.kafka.streams;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Span;
import zipkin2.storage.kafka.streams.serdes.NamesSerde;
import zipkin2.storage.kafka.streams.serdes.SpanIdsSerde;
import zipkin2.storage.kafka.streams.serdes.SpansSerde;

/* loaded from: input_file:lib/zipkin-storage-kafka-0.9.3.jar:zipkin2/storage/kafka/streams/TraceStorageTopology.class */
public class TraceStorageTopology implements Supplier<Topology> {
    public static final String TRACES_STORE_NAME = "zipkin-traces";
    public static final String SPAN_IDS_BY_TS_STORE_NAME = "zipkin-traces-by-timestamp";
    public static final String SERVICE_NAMES_STORE_NAME = "zipkin-service-names";
    public static final String SPAN_NAMES_STORE_NAME = "zipkin-span-names";
    public static final String REMOTE_SERVICE_NAMES_STORE_NAME = "zipkin-remote-service-names";
    public static final String AUTOCOMPLETE_TAGS_STORE_NAME = "zipkin-autocomplete-tags";
    static final Logger LOG = LoggerFactory.getLogger(TraceStorageTopology.class);
    final String spansTopic;
    final List<String> autoCompleteKeys;
    final Duration traceTtl;
    final Duration traceTtlCheckInterval;
    final long minTracesStored;
    final boolean traceSearchEnabled;
    final boolean traceByIdQueryEnabled;
    final SpansSerde spansSerde = new SpansSerde();
    final SpanIdsSerde spanIdsSerde = new SpanIdsSerde();
    final NamesSerde namesSerde = new NamesSerde();
    final Counter brokenTracesTotal = Metrics.counter("zipkin.storage.kafka.traces.broken", new String[0]);

    public TraceStorageTopology(String str, List<String> list, Duration duration, Duration duration2, long j, boolean z, boolean z2) {
        this.spansTopic = str;
        this.autoCompleteKeys = list;
        this.traceTtl = duration;
        this.traceTtlCheckInterval = duration2;
        this.minTracesStored = j;
        this.traceByIdQueryEnabled = z;
        this.traceSearchEnabled = z2;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    public Topology get() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        if (this.traceSearchEnabled || this.traceByIdQueryEnabled) {
            streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(TRACES_STORE_NAME), Serdes.String(), this.spansSerde).withLoggingDisabled()).addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(SPAN_IDS_BY_TS_STORE_NAME), Serdes.Long(), this.spanIdsSerde).withLoggingDisabled());
            KStream stream = streamsBuilder.stream(this.spansTopic, Consumed.with(Serdes.String(), this.spansSerde));
            stream.process(() -> {
                return new Processor<String, List<Span>>() { // from class: zipkin2.storage.kafka.streams.TraceStorageTopology.1
                    KeyValueStore tracesStore;
                    KeyValueStore spanIdsByTsStore;

                    @Override // org.apache.kafka.streams.processor.Processor
                    public void init(ProcessorContext processorContext) {
                        this.tracesStore = (KeyValueStore) processorContext.getStateStore(TraceStorageTopology.TRACES_STORE_NAME);
                        this.spanIdsByTsStore = (KeyValueStore) processorContext.getStateStore(TraceStorageTopology.SPAN_IDS_BY_TS_STORE_NAME);
                        processorContext.schedule(TraceStorageTopology.this.traceTtlCheckInterval, PunctuationType.STREAM_TIME, j -> {
                            if (TraceStorageTopology.this.traceTtl.toMillis() <= 0 || this.tracesStore.approximateNumEntries() <= TraceStorageTopology.this.minTracesStored) {
                                return;
                            }
                            long millis = j - TraceStorageTopology.this.traceTtl.toMillis();
                            KeyValueIterator<K, V> range = this.spanIdsByTsStore.range(0L, Long.valueOf(TimeUnit.MILLISECONDS.toMicros(millis)));
                            try {
                                range.forEachRemaining(keyValue -> {
                                    this.spanIdsByTsStore.delete((Long) keyValue.key);
                                    Iterator it = ((Set) keyValue.value).iterator();
                                    while (it.hasNext()) {
                                        this.tracesStore.delete((String) it.next());
                                    }
                                });
                                TraceStorageTopology.LOG.debug("Traces deletion emitted at {}, approx. number of traces stored {}", Instant.ofEpochMilli(millis).atZone(ZoneId.systemDefault()), Long.valueOf(this.tracesStore.approximateNumEntries()));
                                if (range != 0) {
                                    range.close();
                                }
                            } catch (Throwable th) {
                                if (range != 0) {
                                    try {
                                        range.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        });
                    }

                    @Override // org.apache.kafka.streams.processor.Processor
                    public void process(String str, List<Span> list) {
                        if (list.isEmpty()) {
                            return;
                        }
                        List list2 = (List) this.tracesStore.get(str);
                        if (list2 == null) {
                            list2 = new ArrayList();
                        } else {
                            TraceStorageTopology.this.brokenTracesTotal.increment();
                        }
                        list2.addAll(list);
                        this.tracesStore.put(str, list2);
                        long longValue = list.get(0).timestamp().longValue();
                        Set set = (Set) this.spanIdsByTsStore.get(Long.valueOf(longValue));
                        if (set == null) {
                            set = new LinkedHashSet();
                        }
                        set.add(str);
                        this.spanIdsByTsStore.put(Long.valueOf(longValue), set);
                    }

                    @Override // org.apache.kafka.streams.processor.Processor
                    public void close() {
                    }
                };
            }, TRACES_STORE_NAME, SPAN_IDS_BY_TS_STORE_NAME);
            if (this.traceSearchEnabled) {
                streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(SERVICE_NAMES_STORE_NAME), Serdes.String(), Serdes.String())).addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(SPAN_NAMES_STORE_NAME), Serdes.String(), this.namesSerde)).addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(REMOTE_SERVICE_NAMES_STORE_NAME), Serdes.String(), this.namesSerde)).addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(AUTOCOMPLETE_TAGS_STORE_NAME), Serdes.String(), this.namesSerde));
                stream.process(() -> {
                    return new Processor<String, List<Span>>() { // from class: zipkin2.storage.kafka.streams.TraceStorageTopology.2
                        KeyValueStore serviceNameStore;
                        KeyValueStore spanNamesStore;
                        KeyValueStore remoteServiceNamesStore;
                        KeyValueStore autocompleteTagsStore;

                        @Override // org.apache.kafka.streams.processor.Processor
                        public void init(ProcessorContext processorContext) {
                            this.serviceNameStore = (KeyValueStore) processorContext.getStateStore(TraceStorageTopology.SERVICE_NAMES_STORE_NAME);
                            this.spanNamesStore = (KeyValueStore) processorContext.getStateStore(TraceStorageTopology.SPAN_NAMES_STORE_NAME);
                            this.remoteServiceNamesStore = (KeyValueStore) processorContext.getStateStore(TraceStorageTopology.REMOTE_SERVICE_NAMES_STORE_NAME);
                            this.autocompleteTagsStore = (KeyValueStore) processorContext.getStateStore(TraceStorageTopology.AUTOCOMPLETE_TAGS_STORE_NAME);
                        }

                        @Override // org.apache.kafka.streams.processor.Processor
                        public void process(String str, List<Span> list) {
                            for (Span span : list) {
                                if (span.localServiceName() != null) {
                                    this.serviceNameStore.putIfAbsent(span.localServiceName(), span.localServiceName());
                                    if (span.name() != null) {
                                        Set set = (Set) this.spanNamesStore.get(span.localServiceName());
                                        if (set == null) {
                                            set = new LinkedHashSet();
                                        }
                                        if (!set.contains(span.name())) {
                                            set.add(span.name());
                                            this.spanNamesStore.put(span.localServiceName(), set);
                                        }
                                    }
                                    if (span.remoteServiceName() != null) {
                                        Set set2 = (Set) this.remoteServiceNamesStore.get(span.localServiceName());
                                        if (set2 == null) {
                                            set2 = new LinkedHashSet();
                                        }
                                        if (!set2.contains(span.remoteServiceName())) {
                                            set2.add(span.remoteServiceName());
                                            this.remoteServiceNamesStore.put(span.localServiceName(), set2);
                                        }
                                    }
                                }
                                if (!span.tags().isEmpty()) {
                                    TraceStorageTopology.this.autoCompleteKeys.forEach(str2 -> {
                                        String str2 = (String) span.tags().get(str2);
                                        if (str2 != null) {
                                            Set set3 = (Set) this.autocompleteTagsStore.get(str2);
                                            if (set3 == null) {
                                                set3 = new LinkedHashSet();
                                            }
                                            if (set3.contains(str2)) {
                                                return;
                                            }
                                            set3.add(str2);
                                            this.autocompleteTagsStore.put(str2, set3);
                                        }
                                    });
                                }
                            }
                        }

                        @Override // org.apache.kafka.streams.processor.Processor
                        public void close() {
                        }
                    };
                }, SERVICE_NAMES_STORE_NAME, SPAN_NAMES_STORE_NAME, REMOTE_SERVICE_NAMES_STORE_NAME, AUTOCOMPLETE_TAGS_STORE_NAME);
            }
        }
        return streamsBuilder.build();
    }
}
