package io.numaproj.numaflow.mapper;

import akka.actor.AbstractActor;
import akka.actor.AllDeadLetters;
import akka.actor.AllForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/mapper/MapSupervisorActor.class */
class MapSupervisorActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MapSupervisorActor.class);
    private final Mapper mapper;
    private final StreamObserver<MapOuterClass.MapResponse> responseObserver;
    private final CompletableFuture<Void> shutdownSignal;
    private Exception userException = null;
    private int activeMapperCount = 0;

    public MapSupervisorActor(Mapper mapper, StreamObserver<MapOuterClass.MapResponse> streamObserver, CompletableFuture<Void> completableFuture) {
        this.mapper = mapper;
        this.responseObserver = streamObserver;
        this.shutdownSignal = completableFuture;
    }

    public static Props props(Mapper mapper, StreamObserver<MapOuterClass.MapResponse> streamObserver, CompletableFuture<Void> completableFuture) {
        return Props.create(MapSupervisorActor.class, new Object[]{mapper, streamObserver, completableFuture});
    }

    public void preRestart(Throwable th, Optional<Object> optional) {
        getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", th.getMessage());
        this.shutdownSignal.completeExceptionally(th);
        this.responseObserver.onError(Status.INTERNAL.withDescription(th.getMessage()).withCause(th).asException());
        Service.mapperActorSystem.stop(getSelf());
        this.shutdownSignal.completeExceptionally(th);
    }

    public void postStop() {
        log.debug("post stop of supervisor executed - {}", getSelf().toString());
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(MapOuterClass.MapRequest.class, this::processRequest).match(MapOuterClass.MapResponse.class, this::sendResponse).match(Exception.class, this::handleFailure).match(AllDeadLetters.class, this::handleDeadLetters).match(String.class, str -> {
            this.responseObserver.onCompleted();
        }).build();
    }

    private void handleFailure(Exception exc) {
        log.error("Encountered error in mapFn", exc);
        if (this.userException == null) {
            this.userException = exc;
            this.responseObserver.onError(Status.INTERNAL.withDescription(exc.getMessage()).withCause(exc).asException());
        }
        this.activeMapperCount--;
    }

    private void sendResponse(MapOuterClass.MapResponse mapResponse) {
        this.responseObserver.onNext(mapResponse);
        this.activeMapperCount--;
    }

    private void processRequest(MapOuterClass.MapRequest mapRequest) {
        if (this.userException == null) {
            getContext().actorOf(MapperActor.props(this.mapper)).tell(mapRequest, getSelf());
            this.activeMapperCount++;
            return;
        }
        log.info("a previous mapper actor failed, not processing any more requests");
        if (this.activeMapperCount == 0) {
            log.info("there is no more active mapper AKKA actors - stopping the system");
            getContext().getSystem().stop(getSelf());
            log.info("AKKA system stopped");
            this.shutdownSignal.completeExceptionally(this.userException);
        }
    }

    private void handleDeadLetters(AllDeadLetters allDeadLetters) {
        log.debug("got a dead letter, stopping the execution");
        this.responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());
        getContext().getSystem().stop(getSelf());
        this.shutdownSignal.completeExceptionally(new Throwable("dead letters"));
    }

    public SupervisorStrategy supervisorStrategy() {
        return new AllForOneStrategy(DeciderBuilder.match(Exception.class, exc -> {
            this.shutdownSignal.completeExceptionally(exc);
            this.responseObserver.onError(Status.INTERNAL.withDescription(exc.getMessage()).withCause(exc).asException());
            return SupervisorStrategy.stop();
        }).build());
    }
}
