package org.jboss.pnc.logprocessor.eventduration;

import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.Stores;
import org.jboss.pnc.logprocessor.eventduration.domain.LogEvent;

/* loaded from: input_file:org/jboss/pnc/logprocessor/eventduration/LogProcessorTopology.class */
public class LogProcessorTopology {
    public static final String LOG_STORE = "log-store";
    private final String inputTopic;
    private final String outputTopic;
    private final Optional<String> durationsTopic;

    public LogProcessorTopology(String str, String str2, Optional<String> optional) {
        this.inputTopic = str;
        this.outputTopic = str2;
        this.durationsTopic = optional;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Topology buildTopology(Properties properties) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        HashMap hashMap = new HashMap();
        hashMap.put("cleanup.policy", "compact");
        hashMap.put("min.compaction.lag.ms", "0");
        hashMap.put("segment.bytes", String.valueOf(67108864));
        Serde serdeFrom = Serdes.serdeFrom(new LogEvent.JsonSerializer(), new LogEvent.JsonDeserializer());
        KStream stream = streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.String(), serdeFrom));
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(LOG_STORE), Serdes.String(), serdeFrom).withCachingEnabled().withLoggingEnabled(hashMap));
        KStream transform = stream.transform(MergeTransformer::new, new String[]{LOG_STORE});
        transform.to(this.outputTopic, Produced.with(Serdes.String(), serdeFrom));
        if (this.durationsTopic.isPresent()) {
            transform.filter((str, logEvent) -> {
                return isEndLogEvent(logEvent);
            }).to(this.durationsTopic.get(), Produced.with(Serdes.String(), serdeFrom));
        }
        return streamsBuilder.build(properties);
    }

    private boolean isEndLogEvent(LogEvent logEvent) {
        return logEvent.getEventType().isPresent() && logEvent.getEventType().get().equals(LogEvent.EventType.END);
    }
}
