package io.numaproj.numaflow.reducestreamer;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
import java.util.List;
import lombok.Generated;

/* loaded from: input_file:io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.class */
class ReduceStreamerActor extends AbstractActor {
    private String[] keys;
    private Metadata md;
    private ReduceStreamer groupBy;
    private OutputStreamObserver outputStream;

    public static Props props(String[] strArr, Metadata metadata, ReduceStreamer reduceStreamer, ActorRef actorRef) {
        return Props.create(ReduceStreamerActor.class, new Object[]{strArr, metadata, reduceStreamer, new OutputStreamObserverImpl(metadata, actorRef)});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(HandlerDatum.class, this::invokeHandler).match(String.class, this::sendEOF).build();
    }

    private void invokeHandler(HandlerDatum handlerDatum) {
        this.groupBy.processMessage(this.keys, handlerDatum, this.outputStream, this.md);
    }

    private void sendEOF(String str) {
        this.groupBy.handleEndOfStream(this.keys, this.outputStream, this.md);
        getSender().tell(buildEOFResponse(), getSelf());
    }

    private ActorResponse buildEOFResponse() {
        ReduceOuterClass.ReduceResponse.Builder newBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
        newBuilder.setWindow(ReduceOuterClass.Window.newBuilder().setStart(Timestamp.newBuilder().setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()).setNanos(this.md.getIntervalWindow().getStartTime().getNano())).setEnd(Timestamp.newBuilder().setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()).setNanos(this.md.getIntervalWindow().getEndTime().getNano())).setSlot("slot-0").m740build());
        newBuilder.setEOF(true);
        newBuilder.setResult(ReduceOuterClass.ReduceResponse.Result.newBuilder().addAllKeys(List.of((Object[]) this.keys)).m693build());
        return new ActorResponse(newBuilder.m644build());
    }

    @Generated
    public ReduceStreamerActor(String[] strArr, Metadata metadata, ReduceStreamer reduceStreamer, OutputStreamObserver outputStreamObserver) {
        this.keys = strArr;
        this.md = metadata;
        this.groupBy = reduceStreamer;
        this.outputStream = outputStreamObserver;
    }
}
