package io.numaproj.numaflow.reducer;

import akka.actor.AbstractActor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ChildRestartStats;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.PartialFunction;
import scala.collection.Iterable;

/* loaded from: input_file:io/numaproj/numaflow/reducer/ReduceSupervisorActor.class */
class ReduceSupervisorActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReduceSupervisorActor.class);
    private final ReducerFactory<? extends Reducer> reducerFactory;
    private final Metadata md;
    private final ActorRef shutdownActor;
    private final StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;
    private final Map<String, ActorRef> actorsMap = new HashMap();

    /* loaded from: input_file:io/numaproj/numaflow/reducer/ReduceSupervisorActor$ReduceSupervisorStrategy.class */
    private final class ReduceSupervisorStrategy extends SupervisorStrategy {
        private ReduceSupervisorStrategy() {
        }

        public PartialFunction<Throwable, SupervisorStrategy.Directive> decider() {
            return DeciderBuilder.match(Exception.class, exc -> {
                return SupervisorStrategy.stop();
            }).build();
        }

        public void handleChildTerminated(ActorContext actorContext, ActorRef actorRef, Iterable<ActorRef> iterable) {
        }

        public void processFailure(ActorContext actorContext, boolean z, ActorRef actorRef, Throwable th, ChildRestartStats childRestartStats, Iterable<ChildRestartStats> iterable) {
            Preconditions.checkArgument(!z, "on failures, we will never restart our actors, we escalate");
            ReduceSupervisorActor.log.debug("process failure of supervisor strategy executed - {}", ReduceSupervisorActor.this.getSelf().toString());
            ReduceSupervisorActor.this.shutdownActor.tell(th, actorContext.parent());
        }
    }

    public ReduceSupervisorActor(ReducerFactory<? extends Reducer> reducerFactory, Metadata metadata, ActorRef actorRef, StreamObserver<ReduceOuterClass.ReduceResponse> streamObserver) {
        this.reducerFactory = reducerFactory;
        this.md = metadata;
        this.shutdownActor = actorRef;
        this.responseObserver = streamObserver;
    }

    public static Props props(ReducerFactory<? extends Reducer> reducerFactory, Metadata metadata, ActorRef actorRef, StreamObserver<ReduceOuterClass.ReduceResponse> streamObserver) {
        return Props.create(ReduceSupervisorActor.class, new Object[]{reducerFactory, metadata, actorRef, streamObserver});
    }

    public void preRestart(Throwable th, Optional<Object> optional) {
        log.debug("supervisor pre restart was executed");
        this.shutdownActor.tell(th, ActorRef.noSender());
        Service.reduceActorSystem.stop(getSelf());
    }

    public SupervisorStrategy supervisorStrategy() {
        return new ReduceSupervisorStrategy();
    }

    public void postStop() {
        log.debug("post stop of supervisor executed - {}", getSelf().toString());
        this.shutdownActor.tell("SUCCESS", ActorRef.noSender());
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ActorRequest.class, this::invokeActors).match(String.class, this::sendEOF).match(ActorResponse.class, this::responseListener).build();
    }

    private void invokeActors(ActorRequest actorRequest) {
        String[] keySet = actorRequest.getKeySet();
        String uniqueIdentifier = actorRequest.getUniqueIdentifier();
        if (!this.actorsMap.containsKey(uniqueIdentifier)) {
            this.actorsMap.put(uniqueIdentifier, getContext().actorOf(ReduceActor.props(keySet, this.md, this.reducerFactory.createReducer())));
        }
        this.actorsMap.get(uniqueIdentifier).tell(constructHandlerDatum(actorRequest.getRequest().getPayload()), getSelf());
    }

    private void sendEOF(String str) {
        Iterator<Map.Entry<String, ActorRef>> it = this.actorsMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().tell(str, getSelf());
        }
    }

    private void responseListener(ActorResponse actorResponse) {
        if (!actorResponse.getResponse().getEOF()) {
            this.responseObserver.onNext(actorResponse.getResponse());
            return;
        }
        this.actorsMap.remove(actorResponse.getUniqueIdentifier());
        if (this.actorsMap.isEmpty()) {
            this.responseObserver.onNext(actorResponse.getResponse());
            this.responseObserver.onCompleted();
            getContext().getSystem().stop(getSelf());
        }
    }

    private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) {
        return new HandlerDatum(payload.getValue().toByteArray(), Instant.ofEpochSecond(payload.getWatermark().getSeconds(), payload.getWatermark().getNanos()), Instant.ofEpochSecond(payload.getEventTime().getSeconds(), payload.getEventTime().getNanos()), payload.getHeadersMap());
    }
}
