package io.numaproj.numaflow.reducestreamer;

import akka.actor.AbstractActor;
import akka.actor.Props;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/reducestreamer/OutputActor.class */
class OutputActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(OutputActor.class);
    StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;

    public static Props props(StreamObserver<ReduceOuterClass.ReduceResponse> streamObserver) {
        return Props.create(OutputActor.class, new Object[]{streamObserver});
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(ActorResponse.class, this::handleResponse).build();
    }

    private void handleResponse(ActorResponse actorResponse) {
        if (!actorResponse.getResponse().getEOF()) {
            this.responseObserver.onNext(actorResponse.getResponse());
            return;
        }
        this.responseObserver.onNext(actorResponse.getResponse());
        this.responseObserver.onCompleted();
        getContext().getSystem().stop(getSender());
    }

    @Generated
    public OutputActor(StreamObserver<ReduceOuterClass.ReduceResponse> streamObserver) {
        this.responseObserver = streamObserver;
    }
}
