package io.numaproj.numaflow.sessionreducer;

import akka.actor.ActorRef;
import com.google.protobuf.ByteString;
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import io.numaproj.numaflow.sessionreducer.model.Message;
import io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import lombok.Generated;

/* loaded from: input_file:io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.class */
class OutputStreamObserverImpl implements OutputStreamObserver {
    private final ActorRef outputActor;
    private Sessionreduce.KeyedWindow keyedWindow;

    @Override // io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver
    public void send(Message message) {
        this.outputActor.tell(buildResponse(message, this.keyedWindow), ActorRef.noSender());
    }

    private ActorResponse buildResponse(Message message, Sessionreduce.KeyedWindow keyedWindow) {
        Sessionreduce.SessionReduceResponse.Builder newBuilder = Sessionreduce.SessionReduceResponse.newBuilder();
        newBuilder.setKeyedWindow(keyedWindow);
        newBuilder.setEOF(false);
        newBuilder.setResult(Sessionreduce.SessionReduceResponse.Result.newBuilder().setValue(ByteString.copyFrom(message.getValue())).addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys())).addAllTags(message.getTags() == null ? new ArrayList<>() : List.of((Object[]) message.getTags())).m1042build());
        return ActorResponse.builder().response(newBuilder.m993build()).build();
    }

    @Generated
    public OutputStreamObserverImpl(ActorRef actorRef, Sessionreduce.KeyedWindow keyedWindow) {
        this.outputActor = actorRef;
        this.keyedWindow = keyedWindow;
    }

    @Generated
    public void setKeyedWindow(Sessionreduce.KeyedWindow keyedWindow) {
        this.keyedWindow = keyedWindow;
    }
}
