/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.qe.kafka.streams;

import io.quarkus.kafka.client.serialization.JsonbSerde;
import io.quarkus.qe.kafka.model.LoginAggregation;
import io.quarkus.qe.kafka.model.LoginAttempt;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.ws.rs.core.Response;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class WindowedLoginDeniedStream {
    public static final String LOGIN_AGGREGATION_STORE = "login-aggregation-store";
    public static final String LOGIN_ATTEMPTS_TOPIC = "login-http-response-values";
    public static final String LOGIN_DENIED_AGGREGATED_TOPIC = "login-denied";
    public static final String LOGIN_ALERTS_TOPIC = "login-alerts";
    @ConfigProperty(name="login.denied.windows.sec")
    int windowsLoginSec;

    @Produces
    public Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        JsonbSerde loginAttemptSerde = new JsonbSerde(LoginAttempt.class);
        JsonbSerde loginAggregationSerde = new JsonbSerde(LoginAggregation.class);
        builder.stream(LOGIN_ATTEMPTS_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)loginAttemptSerde)).groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofSeconds(this.windowsLoginSec))).aggregate(LoginAggregation::new, (id, value, aggregation) -> aggregation.updateFrom((LoginAttempt)value), Materialized.as((String)LOGIN_AGGREGATION_STORE).withKeySerde(Serdes.String()).withValueSerde((Serde)loginAggregationSerde)).toStream().filter((k, v) -> v.code == Response.Status.UNAUTHORIZED.getStatusCode() || v.code == Response.Status.FORBIDDEN.getStatusCode()).to(LOGIN_DENIED_AGGREGATED_TOPIC);
        return builder.build();
    }

    @Incoming(value="login-denied")
    @Outgoing(value="login-alerts")
    @Broadcast
    public String fanOut(String jsonLoginAggregation) {
        return jsonLoginAggregation;
    }
}

