package io.numaproj.numaflow.sessionreducer;

import akka.actor.AbstractActor;
import akka.actor.Props;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/sessionreducer/OutputActor.class */
class OutputActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OutputActor.class);
    StreamObserver<Sessionreduce.SessionReduceResponse> responseObserver;

    public static Props props(StreamObserver<Sessionreduce.SessionReduceResponse> streamObserver) {
        return Props.create((Class<?>) OutputActor.class, streamObserver);
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(ActorResponse.class, this::handleResponse).match(String.class, this::handleEOF).build();
    }

    private void handleResponse(ActorResponse actorResponse) {
        this.responseObserver.onNext(actorResponse.getResponse());
        if (actorResponse.isLast()) {
            closeSystem();
        }
    }

    private void handleEOF(String str) {
        closeSystem();
    }

    private void closeSystem() {
        this.responseObserver.onCompleted();
        getContext().getSystem().stop(getSender());
    }

    @Generated
    public OutputActor(StreamObserver<Sessionreduce.SessionReduceResponse> streamObserver) {
        this.responseObserver = streamObserver;
    }
}
