package zipkin2.storage.kafka.streams;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.Stores;
import zipkin2.DependencyLink;
import zipkin2.Span;
import zipkin2.internal.DependencyLinker;
import zipkin2.internal.Trace;
import zipkin2.storage.kafka.streams.serdes.DependencyLinkSerde;
import zipkin2.storage.kafka.streams.serdes.SpansSerde;

/* loaded from: input_file:lib/zipkin-storage-kafka-0.9.2.jar:zipkin2/storage/kafka/streams/SpanAggregationTopology.class */
public final class SpanAggregationTopology implements Supplier<Topology> {
    static final String TRACE_AGGREGATION_STORE = "trace-aggregation";
    final String spansTopic;
    final String traceTopic;
    final String dependencyTopic;
    final Duration traceTimeout;
    final boolean aggregationEnabled;
    final SpansSerde spansSerde = new SpansSerde();
    final DependencyLinkSerde dependencyLinkSerde = new DependencyLinkSerde();

    public SpanAggregationTopology(String str, String str2, String str3, Duration duration, boolean z) {
        this.spansTopic = str;
        this.traceTopic = str2;
        this.dependencyTopic = str3;
        this.traceTimeout = duration;
        this.aggregationEnabled = z;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    public Topology get() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        if (this.aggregationEnabled) {
            KStream selectKey = streamsBuilder.stream(this.spansTopic, Consumed.with(Serdes.String(), this.spansSerde)).groupByKey().windowedBy(SessionWindows.with(this.traceTimeout).grace(Duration.ZERO)).aggregate(ArrayList::new, aggregateSpans(), joinAggregates(), Materialized.as(Stores.persistentSessionStore(TRACE_AGGREGATION_STORE, Duration.ofDays(1L))).withKeySerde(Serdes.String()).withValueSerde(this.spansSerde).withLoggingDisabled().withCachingEnabled()).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().selectKey((windowed, list) -> {
                return (String) windowed.key();
            });
            selectKey.to(this.traceTopic, Produced.with(Serdes.String(), this.spansSerde));
            selectKey.flatMapValues(spansToDependencyLinks()).selectKey((str, dependencyLink) -> {
                return DependencyLinkSerde.linkKey(dependencyLink);
            }).to(this.dependencyTopic, Produced.with(Serdes.String(), this.dependencyLinkSerde));
        }
        return streamsBuilder.build();
    }

    Merger<String, List<Span>> joinAggregates() {
        return (str, list, list2) -> {
            list.addAll(list2);
            return Trace.merge(list);
        };
    }

    Aggregator<String, List<Span>, List<Span>> aggregateSpans() {
        return (str, list, list2) -> {
            list2.addAll(list);
            return Trace.merge(list2);
        };
    }

    ValueMapper<List<Span>, List<DependencyLink>> spansToDependencyLinks() {
        return list -> {
            return list == null ? new ArrayList() : new DependencyLinker().putTrace(list).link();
        };
    }
}
