/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.pnc.logprocessor.eventduration;

import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.jboss.pnc.logprocessor.eventduration.MergeTransformer;
import org.jboss.pnc.logprocessor.eventduration.domain.LogEvent;

public class LogProcessorTopology {
    public static final String LOG_STORE = "log-store";
    private final String inputTopic;
    private final String outputTopic;
    private final Optional<String> durationsTopic;

    public LogProcessorTopology(String inputTopic, String outputTopic, Optional<String> durationsTopic) {
        this.inputTopic = inputTopic;
        this.outputTopic = outputTopic;
        this.durationsTopic = durationsTopic;
    }

    Topology buildTopology(Properties properties) {
        StreamsBuilder builder = new StreamsBuilder();
        HashMap<String, String> configuration = new HashMap<String, String>();
        configuration.put("cleanup.policy", "compact");
        configuration.put("min.compaction.lag.ms", "0");
        configuration.put("segment.bytes", String.valueOf(0x4000000));
        Serde<LogEvent> logSerde = Serdes.serdeFrom(new LogEvent.JsonSerializer(), new LogEvent.JsonDeserializer());
        KStream<String, LogEvent> input = builder.stream(this.inputTopic, Consumed.with(Serdes.String(), logSerde));
        StoreBuilder<KeyValueStore<String, LogEvent>> storageStoreBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(LOG_STORE), Serdes.String(), logSerde).withCachingEnabled().withLoggingEnabled(configuration);
        builder.addStateStore(storageStoreBuilder);
        KStream output = input.transform(MergeTransformer::new, LOG_STORE);
        output.to(this.outputTopic, Produced.with(Serdes.String(), logSerde));
        if (this.durationsTopic.isPresent()) {
            output.filter((key, logEvent) -> this.isEndLogEvent((LogEvent)logEvent)).to(this.durationsTopic.get(), Produced.with(Serdes.String(), logSerde));
        }
        return builder.build(properties);
    }

    private boolean isEndLogEvent(LogEvent logEvent) {
        return logEvent.getEventType().isPresent() && logEvent.getEventType().get().equals((Object)LogEvent.EventType.END);
    }
}

