package io.numaproj.numaflow.function;

import akka.actor.AbstractActor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ChildRestartStats;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.function.handlers.ReduceHandler;
import io.numaproj.numaflow.function.handlers.ReducerFactory;
import io.numaproj.numaflow.function.interfaces.Metadata;
import io.numaproj.numaflow.function.v1.Udfunction;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.PartialFunction;
import scala.collection.Iterable;

/* loaded from: input_file:io/numaproj/numaflow/function/ReduceSupervisorActor.class */
class ReduceSupervisorActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReduceSupervisorActor.class);
    private final ReducerFactory<? extends ReduceHandler> reducerFactory;
    private final Metadata md;
    private final ActorRef shutdownActor;
    private final StreamObserver<Udfunction.DatumResponseList> responseObserver;
    private final Map<String, ActorRef> actorsMap = new HashMap();

    /* loaded from: input_file:io/numaproj/numaflow/function/ReduceSupervisorActor$ReduceSupervisorStrategy.class */
    private final class ReduceSupervisorStrategy extends SupervisorStrategy {
        private ReduceSupervisorStrategy() {
        }

        @Override // akka.actor.SupervisorStrategy
        public PartialFunction<Throwable, SupervisorStrategy.Directive> decider() {
            return DeciderBuilder.match(Exception.class, exc -> {
                return SupervisorStrategy.stop();
            }).build();
        }

        @Override // akka.actor.SupervisorStrategy
        public void handleChildTerminated(ActorContext actorContext, ActorRef actorRef, Iterable<ActorRef> iterable) {
        }

        @Override // akka.actor.SupervisorStrategy
        public void processFailure(ActorContext actorContext, boolean z, ActorRef actorRef, Throwable th, ChildRestartStats childRestartStats, Iterable<ChildRestartStats> iterable) {
            Preconditions.checkArgument(!z, "on failures, we will never restart our actors, we escalate");
            ReduceSupervisorActor.log.debug("process failure of supervisor strategy executed - {}", ReduceSupervisorActor.this.getSelf().toString());
            ReduceSupervisorActor.this.shutdownActor.tell(th, actorContext.parent());
        }
    }

    public ReduceSupervisorActor(ReducerFactory<? extends ReduceHandler> reducerFactory, Metadata metadata, ActorRef actorRef, StreamObserver<Udfunction.DatumResponseList> streamObserver) {
        this.reducerFactory = reducerFactory;
        this.md = metadata;
        this.shutdownActor = actorRef;
        this.responseObserver = streamObserver;
    }

    public static Props props(ReducerFactory<? extends ReduceHandler> reducerFactory, Metadata metadata, ActorRef actorRef, StreamObserver<Udfunction.DatumResponseList> streamObserver) {
        return Props.create((Class<?>) ReduceSupervisorActor.class, reducerFactory, metadata, actorRef, streamObserver);
    }

    @Override // akka.actor.AbstractActor
    public void preRestart(Throwable th, Optional<Object> optional) {
        log.debug("supervisor pre restart was executed");
        this.shutdownActor.tell(th, ActorRef.noSender());
        FunctionService.functionActorSystem.stop(getSelf());
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public SupervisorStrategy supervisorStrategy() {
        return new ReduceSupervisorStrategy();
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public void postStop() {
        log.debug("post stop of supervisor executed - {}", getSelf().toString());
        this.shutdownActor.tell("SUCCESS", ActorRef.noSender());
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Udfunction.DatumRequest.class, this::invokeActors).match(String.class, this::sendEOF).match(ActorResponse.class, this::responseListener).build();
    }

    private void invokeActors(Udfunction.DatumRequest datumRequest) {
        String[] strArr = (String[]) datumRequest.getKeysList().toArray(new String[0]);
        String join = String.join(FunctionConstants.DELIMITTER, strArr);
        if (!this.actorsMap.containsKey(join)) {
            this.actorsMap.put(join, getContext().actorOf(ReduceActor.props(strArr, this.md, this.reducerFactory.createReducer())));
        }
        this.actorsMap.get(join).tell(constructHandlerDatum(datumRequest), getSelf());
    }

    private void sendEOF(String str) {
        Iterator<Map.Entry<String, ActorRef>> it = this.actorsMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().tell(str, getSelf());
        }
    }

    private void responseListener(ActorResponse actorResponse) {
        this.responseObserver.onNext(actorResponse.getDatumList());
        this.actorsMap.remove(String.join(FunctionConstants.DELIMITTER, actorResponse.getKeys()));
        if (this.actorsMap.isEmpty()) {
            this.responseObserver.onCompleted();
            getContext().getSystem().stop(getSelf());
        }
    }

    private HandlerDatum constructHandlerDatum(Udfunction.DatumRequest datumRequest) {
        return new HandlerDatum(datumRequest.getValue().toByteArray(), Instant.ofEpochSecond(datumRequest.getWatermark().getWatermark().getSeconds(), datumRequest.getWatermark().getWatermark().getNanos()), Instant.ofEpochSecond(datumRequest.getEventTime().getEventTime().getSeconds(), datumRequest.getEventTime().getEventTime().getNanos()), new HandlerDatumMetadata(datumRequest.getMetadata().getId(), datumRequest.getMetadata().getNumDelivered()));
    }
}
