package io.numaproj.numaflow.reducer;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/reducer/ReduceActor.class */
class ReduceActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReduceActor.class);
    private String[] keys;
    private Metadata md;
    private Reducer groupBy;

    public static Props props(String[] strArr, Metadata metadata, Reducer reducer) {
        return Props.create(ReduceActor.class, new Object[]{strArr, metadata, reducer});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(HandlerDatum.class, this::invokeHandler).match(String.class, this::getResult).build();
    }

    private void invokeHandler(HandlerDatum handlerDatum) {
        this.groupBy.addMessage(this.keys, handlerDatum, this.md);
    }

    private void getResult(String str) {
        this.groupBy.getOutput(this.keys, this.md).getMessages().forEach(message -> {
            getSender().tell(buildActorResponse(message), getSelf());
        });
        getSender().tell(buildEOFActorResponse(), getSelf());
    }

    private ActorResponse buildActorResponse(Message message) {
        ReduceOuterClass.ReduceResponse.Builder newBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
        newBuilder.setWindow(ReduceOuterClass.Window.newBuilder().setStart(Timestamp.newBuilder().setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()).setNanos(this.md.getIntervalWindow().getStartTime().getNano())).setEnd(Timestamp.newBuilder().setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()).setNanos(this.md.getIntervalWindow().getEndTime().getNano())).setSlot("slot-0").m689build());
        newBuilder.setEOF(false);
        newBuilder.setResult(ReduceOuterClass.ReduceResponse.Result.newBuilder().setValue(ByteString.copyFrom(message.getValue())).addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys())).addAllTags(message.getTags() == null ? new ArrayList<>() : List.of((Object[]) message.getTags())).m642build());
        return new ActorResponse(newBuilder.m593build());
    }

    private ActorResponse buildEOFActorResponse() {
        ReduceOuterClass.ReduceResponse.Builder newBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
        newBuilder.setWindow(ReduceOuterClass.Window.newBuilder().setStart(Timestamp.newBuilder().setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()).setNanos(this.md.getIntervalWindow().getStartTime().getNano())).setEnd(Timestamp.newBuilder().setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()).setNanos(this.md.getIntervalWindow().getEndTime().getNano())).setSlot("slot-0").m689build());
        newBuilder.setEOF(true);
        newBuilder.setResult(ReduceOuterClass.ReduceResponse.Result.newBuilder().addAllKeys(List.of((Object[]) this.keys)).m642build());
        return new ActorResponse(newBuilder.m593build());
    }

    @Generated
    public ReduceActor(String[] strArr, Metadata metadata, Reducer reducer) {
        this.keys = strArr;
        this.md = metadata;
        this.groupBy = reducer;
    }
}
