package zipkin2.storage.kafka.streams;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import zipkin2.DependencyLink;
import zipkin2.storage.kafka.streams.serdes.DependencyLinkSerde;

/* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/streams/DependencyStorageTopology.class */
public final class DependencyStorageTopology implements Supplier<Topology> {
    public static final String DEPENDENCIES_STORE_NAME = "zipkin-dependencies";
    final String dependencyTopic;
    final Duration dependencyTtl;
    final Duration dependencyWindowSize;
    final boolean dependencyQueryEnabled;
    final DependencyLinkSerde dependencyLinkSerde = new DependencyLinkSerde();

    public DependencyStorageTopology(String str, Duration duration, Duration duration2, boolean z) {
        this.dependencyTopic = str;
        this.dependencyTtl = duration;
        this.dependencyWindowSize = duration2;
        this.dependencyQueryEnabled = z;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    public Topology get() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        if (this.dependencyQueryEnabled) {
            streamsBuilder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore(DEPENDENCIES_STORE_NAME, this.dependencyTtl, this.dependencyWindowSize, false), Serdes.String(), this.dependencyLinkSerde).withLoggingDisabled());
            streamsBuilder.stream(this.dependencyTopic, Consumed.with(Serdes.String(), this.dependencyLinkSerde)).process(() -> {
                return new Processor<String, DependencyLink>() { // from class: zipkin2.storage.kafka.streams.DependencyStorageTopology.1
                    ProcessorContext context;
                    WindowStore dependenciesStore;

                    @Override // org.apache.kafka.streams.processor.Processor
                    public void init(ProcessorContext processorContext) {
                        this.context = processorContext;
                        this.dependenciesStore = (WindowStore) processorContext.getStateStore(DependencyStorageTopology.DEPENDENCIES_STORE_NAME);
                    }

                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // org.apache.kafka.streams.processor.Processor
                    public void process(String str, DependencyLink dependencyLink) {
                        Instant ofEpochMilli = Instant.ofEpochMilli(this.context.timestamp());
                        WindowStoreIterator fetch = this.dependenciesStore.fetch((WindowStore) str, ofEpochMilli.minus((TemporalAmount) DependencyStorageTopology.this.dependencyWindowSize), ofEpochMilli);
                        try {
                            KeyValue keyValue = null;
                            if (fetch.hasNext()) {
                                keyValue = (KeyValue) fetch.next();
                            }
                            if (fetch.hasNext()) {
                                keyValue = (KeyValue) fetch.next();
                            }
                            if (keyValue != null) {
                                DependencyLink dependencyLink2 = (DependencyLink) keyValue.value;
                                this.dependenciesStore.put(str, dependencyLink2.toBuilder().callCount(dependencyLink2.callCount() + dependencyLink.callCount()).errorCount(dependencyLink2.errorCount() + dependencyLink.errorCount()).build(), ((Long) keyValue.key).longValue());
                            } else {
                                this.dependenciesStore.put(str, dependencyLink, ofEpochMilli.toEpochMilli());
                            }
                            if (fetch != null) {
                                fetch.close();
                            }
                        } catch (Throwable th) {
                            if (fetch != null) {
                                try {
                                    fetch.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }

                    @Override // org.apache.kafka.streams.processor.Processor
                    public void close() {
                    }
                };
            }, DEPENDENCIES_STORE_NAME);
        }
        return streamsBuilder.build();
    }
}
