package io.numaproj.numaflow.sourcetransformer;

import akka.actor.AbstractActor;
import akka.actor.AllDeadLetters;
import akka.actor.AllForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.class */
class TransformSupervisorActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TransformSupervisorActor.class);
    private final SourceTransformer transformer;
    private final StreamObserver<Sourcetransformer.SourceTransformResponse> responseObserver;
    private final CompletableFuture<Void> shutdownSignal;
    private Exception userException = null;
    private int activeTransformersCount = 0;

    public TransformSupervisorActor(SourceTransformer sourceTransformer, StreamObserver<Sourcetransformer.SourceTransformResponse> streamObserver, CompletableFuture<Void> completableFuture) {
        this.transformer = sourceTransformer;
        this.responseObserver = streamObserver;
        this.shutdownSignal = completableFuture;
    }

    public static Props props(SourceTransformer sourceTransformer, StreamObserver<Sourcetransformer.SourceTransformResponse> streamObserver, CompletableFuture<Void> completableFuture) {
        return Props.create(TransformSupervisorActor.class, new Object[]{sourceTransformer, streamObserver, completableFuture});
    }

    public void preRestart(Throwable th, Optional<Object> optional) {
        getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", th.getMessage());
        this.responseObserver.onError(Status.INTERNAL.withDescription(th.getMessage()).withCause(th).asException());
        Service.transformerActorSystem.stop(getSelf());
        this.shutdownSignal.completeExceptionally(th);
    }

    public void postStop() {
        log.debug("post stop of supervisor executed - {}", getSelf().toString());
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Sourcetransformer.SourceTransformRequest.class, this::processRequest).match(Sourcetransformer.SourceTransformResponse.class, this::sendResponse).match(Exception.class, this::handleFailure).match(AllDeadLetters.class, this::handleDeadLetters).match(String.class, str -> {
            this.responseObserver.onCompleted();
        }).build();
    }

    private void handleFailure(Exception exc) {
        log.error("Encountered error in sourceTransformFn", exc);
        if (this.userException == null) {
            this.userException = exc;
            this.responseObserver.onError(Status.INTERNAL.withDescription(exc.getMessage()).withCause(exc).asException());
        }
        this.activeTransformersCount--;
    }

    private void sendResponse(Sourcetransformer.SourceTransformResponse sourceTransformResponse) {
        this.responseObserver.onNext(sourceTransformResponse);
        this.activeTransformersCount--;
    }

    private void processRequest(Sourcetransformer.SourceTransformRequest sourceTransformRequest) {
        if (this.userException == null) {
            getContext().actorOf(TransformerActor.props(this.transformer)).tell(sourceTransformRequest, getSelf());
            this.activeTransformersCount++;
            return;
        }
        log.info("a previous transformer actor failed, not processing any more requests");
        if (this.activeTransformersCount == 0) {
            log.info("there is no more active transformer AKKA actors - stopping the system");
            getContext().getSystem().stop(getSelf());
            log.info("AKKA system stopped");
            this.shutdownSignal.completeExceptionally(this.userException);
        }
    }

    private void handleDeadLetters(AllDeadLetters allDeadLetters) {
        log.debug("got a dead letter, stopping the execution");
        this.responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());
        getContext().getSystem().stop(getSelf());
        this.shutdownSignal.completeExceptionally(new Throwable("dead letters"));
    }

    public SupervisorStrategy supervisorStrategy() {
        return new AllForOneStrategy(DeciderBuilder.match(Exception.class, exc -> {
            this.responseObserver.onError(Status.INTERNAL.withDescription(exc.getMessage()).withCause(exc).asException());
            return SupervisorStrategy.stop();
        }).build());
    }
}
