package io.numaproj.numaflow.sink;

import akka.actor.AbstractActor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ChildRestartStats;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sink.handler.SinkHandler;
import io.numaproj.numaflow.sink.types.ResponseList;
import io.numaproj.numaflow.sink.v1.Udsink;
import java.time.Instant;
import java.util.Optional;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.PartialFunction;
import scala.collection.Iterable;

/* loaded from: input_file:io/numaproj/numaflow/sink/SinkSupervisorActor.class */
class SinkSupervisorActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SinkSupervisorActor.class);
    private final ActorRef shutdownActor;
    private final ActorRef sinkActor;
    private final StreamObserver<Udsink.ResponseList> responseObserver;

    /* loaded from: input_file:io/numaproj/numaflow/sink/SinkSupervisorActor$SinkSupervisorStrategy.class */
    private final class SinkSupervisorStrategy extends SupervisorStrategy {
        private SinkSupervisorStrategy() {
        }

        @Override // akka.actor.SupervisorStrategy
        public PartialFunction<Throwable, SupervisorStrategy.Directive> decider() {
            return DeciderBuilder.match(Exception.class, exc -> {
                return SupervisorStrategy.stop();
            }).build();
        }

        @Override // akka.actor.SupervisorStrategy
        public void handleChildTerminated(ActorContext actorContext, ActorRef actorRef, Iterable<ActorRef> iterable) {
        }

        @Override // akka.actor.SupervisorStrategy
        public void processFailure(ActorContext actorContext, boolean z, ActorRef actorRef, Throwable th, ChildRestartStats childRestartStats, Iterable<ChildRestartStats> iterable) {
            Preconditions.checkArgument(!z, "on failures, we will never restart our actors, we escalate");
            SinkSupervisorActor.log.debug("process failure of supervisor strategy executed - {}", SinkSupervisorActor.this.getSelf().toString());
            SinkSupervisorActor.this.shutdownActor.tell(th, actorContext.parent());
            SinkSupervisorActor.this.getContext().getSystem().stop(SinkSupervisorActor.this.getSelf());
        }
    }

    public SinkSupervisorActor(SinkHandler sinkHandler, ActorRef actorRef, StreamObserver<Udsink.ResponseList> streamObserver) {
        this.shutdownActor = actorRef;
        this.responseObserver = streamObserver;
        this.sinkActor = getContext().actorOf(SinkActor.props(sinkHandler));
    }

    public static Props props(SinkHandler sinkHandler, ActorRef actorRef, StreamObserver<Udsink.ResponseList> streamObserver) {
        return Props.create((Class<?>) SinkSupervisorActor.class, sinkHandler, actorRef, streamObserver);
    }

    @Override // akka.actor.AbstractActor
    public void preRestart(Throwable th, Optional<Object> optional) {
        log.debug("supervisor pre restart was executed");
        this.shutdownActor.tell(th, ActorRef.noSender());
        SinkService.sinkActorSystem.stop(getSelf());
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public SupervisorStrategy supervisorStrategy() {
        return new SinkSupervisorStrategy();
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public void postStop() {
        log.debug("post stop of supervisor executed - {}", getSelf().toString());
        this.shutdownActor.tell("SUCCESS", ActorRef.noSender());
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Udsink.DatumRequest.class, this::invokeActor).match(ResponseList.class, this::processResponse).match(String.class, this::sendEOF).build();
    }

    private void invokeActor(Udsink.DatumRequest datumRequest) {
        this.sinkActor.tell(constructHandlerDatum(datumRequest), getSelf());
    }

    private void processResponse(ResponseList responseList) {
        this.responseObserver.onNext(buildResponseList(responseList));
        this.responseObserver.onCompleted();
        getContext().getSystem().stop(getSelf());
    }

    private void sendEOF(String str) {
        this.sinkActor.tell(str, getSelf());
    }

    private HandlerDatum constructHandlerDatum(Udsink.DatumRequest datumRequest) {
        return new HandlerDatum((String[]) datumRequest.getKeysList().toArray(new String[0]), datumRequest.getValue().toByteArray(), Instant.ofEpochSecond(datumRequest.getWatermark().getWatermark().getSeconds(), datumRequest.getWatermark().getWatermark().getNanos()), Instant.ofEpochSecond(datumRequest.getEventTime().getEventTime().getSeconds(), datumRequest.getEventTime().getEventTime().getNanos()), datumRequest.getId());
    }

    public Udsink.ResponseList buildResponseList(ResponseList responseList) {
        Udsink.ResponseList.Builder newBuilder = Udsink.ResponseList.newBuilder();
        responseList.getResponses().forEach(response -> {
            newBuilder.addResponses(Udsink.Response.newBuilder().setId(response.getId() == null ? "" : response.getId()).setErrMsg(response.getErr() == null ? "" : response.getErr()).setSuccess(response.getSuccess().booleanValue()).build());
        });
        return newBuilder.build();
    }
}
