package io.numaproj.numaflow.sessionreducer;

import akka.actor.AbstractActor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ChildRestartStats;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.PartialFunction;
import scala.collection.Iterable;

/* loaded from: input_file:io/numaproj/numaflow/sessionreducer/SupervisorActor.class */
class SupervisorActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(SupervisorActor.class);
    private final SessionReducerFactory<? extends SessionReducer> sessionReducerFactory;
    private final ActorRef shutdownActor;
    private final ActorRef outputActor;
    private int numberOfPendingAccumulators;
    private ActorRef mergeRequestSender;
    private final Map<String, ActorRef> actorsMap = new HashMap();
    private boolean isInputStreamClosed = false;

    /* loaded from: input_file:io/numaproj/numaflow/sessionreducer/SupervisorActor$ReduceSupervisorStrategy.class */
    private final class ReduceSupervisorStrategy extends SupervisorStrategy {
        private ReduceSupervisorStrategy() {
        }

        public PartialFunction<Throwable, SupervisorStrategy.Directive> decider() {
            return DeciderBuilder.match(Exception.class, exc -> {
                return SupervisorStrategy.stop();
            }).build();
        }

        public void handleChildTerminated(ActorContext actorContext, ActorRef actorRef, Iterable<ActorRef> iterable) {
        }

        public void processFailure(ActorContext actorContext, boolean z, ActorRef actorRef, Throwable th, ChildRestartStats childRestartStats, Iterable<ChildRestartStats> iterable) {
            Preconditions.checkArgument(!z, "on failures, we will never restart our actors, we escalate");
            SupervisorActor.log.debug("process failure of supervisor strategy executed - {}", SupervisorActor.this.getSelf().toString());
            SupervisorActor.this.shutdownActor.tell(th, actorContext.parent());
        }
    }

    public SupervisorActor(SessionReducerFactory<? extends SessionReducer> sessionReducerFactory, ActorRef actorRef, ActorRef actorRef2) {
        this.sessionReducerFactory = sessionReducerFactory;
        this.shutdownActor = actorRef;
        this.outputActor = actorRef2;
    }

    public static Props props(SessionReducerFactory<? extends SessionReducer> sessionReducerFactory, ActorRef actorRef, ActorRef actorRef2) {
        return Props.create(SupervisorActor.class, new Object[]{sessionReducerFactory, actorRef, actorRef2});
    }

    public void preRestart(Throwable th, Optional<Object> optional) {
        log.debug("supervisor pre restart was executed");
        this.shutdownActor.tell(th, ActorRef.noSender());
        Service.sessionReduceActorSystem.stop(getSelf());
    }

    public SupervisorStrategy supervisorStrategy() {
        return new ReduceSupervisorStrategy();
    }

    public void postStop() {
        log.debug("post stop of supervisor executed - {}", getSelf().toString());
        this.shutdownActor.tell("SUCCESS", ActorRef.noSender());
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(String.class, this::handleEOF).match(Sessionreduce.SessionReduceRequest.class, this::handleReduceRequest).match(ActorResponse.class, this::handleActorResponse).build();
    }

    private void handleEOF(String str) {
        this.isInputStreamClosed = true;
        if (this.actorsMap.isEmpty()) {
            this.outputActor.tell(str, getSelf());
            return;
        }
        Iterator<Map.Entry<String, ActorRef>> it = this.actorsMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().tell(str, getSelf());
        }
    }

    private void handleReduceRequest(Sessionreduce.SessionReduceRequest sessionReduceRequest) {
        Sessionreduce.SessionReduceRequest.WindowOperation operation = sessionReduceRequest.getOperation();
        switch (operation.getEvent()) {
            case OPEN:
                if (operation.getKeyedWindowsCount() != 1) {
                    throw new RuntimeException("open operation error: expected exactly one window");
                }
                if (this.actorsMap.containsKey(UniqueIdGenerator.getUniqueIdentifier(operation.getKeyedWindows(0)))) {
                    throw new RuntimeException("received an OPEN request but the session reducer actor already exists");
                }
                invokeActor(ActorRequest.builder().type(ActorRequestType.OPEN).keyedWindow(operation.getKeyedWindows(0)).payload(sessionReduceRequest.hasPayload() ? sessionReduceRequest.getPayload() : null).build());
                return;
            case APPEND:
                if (operation.getKeyedWindowsCount() != 1) {
                    throw new RuntimeException("append operation error: expected exactly one window");
                }
                invokeActor(ActorRequest.builder().type(ActorRequestType.APPEND).keyedWindow(operation.getKeyedWindows(0)).payload(sessionReduceRequest.hasPayload() ? sessionReduceRequest.getPayload() : null).build());
                return;
            case CLOSE:
                operation.getKeyedWindowsList().forEach(keyedWindow -> {
                    invokeActor(ActorRequest.builder().type(ActorRequestType.CLOSE).keyedWindow(keyedWindow).build());
                });
                return;
            case EXPAND:
                if (operation.getKeyedWindowsCount() != 2) {
                    throw new RuntimeException("expand operation error: expected exactly two windows");
                }
                String uniqueIdentifier = UniqueIdGenerator.getUniqueIdentifier(operation.getKeyedWindows(0));
                String uniqueIdentifier2 = UniqueIdGenerator.getUniqueIdentifier(operation.getKeyedWindows(1));
                if (!this.actorsMap.containsKey(uniqueIdentifier)) {
                    throw new RuntimeException("expand operation error: session not found for id: " + uniqueIdentifier);
                }
                invokeActor(ActorRequest.builder().type(ActorRequestType.EXPAND).keyedWindow(operation.getKeyedWindows(0)).newKeyedWindow(operation.getKeyedWindows(1)).build());
                this.actorsMap.put(uniqueIdentifier2, this.actorsMap.get(uniqueIdentifier));
                this.actorsMap.remove(uniqueIdentifier);
                invokeActor(ActorRequest.builder().type(ActorRequestType.APPEND).keyedWindow(operation.getKeyedWindows(1)).payload(sessionReduceRequest.hasPayload() ? sessionReduceRequest.getPayload() : null).build());
                return;
            case MERGE:
                this.mergeRequestSender = getSender();
                Timestamp start = operation.getKeyedWindows(0).getStart();
                Timestamp end = operation.getKeyedWindows(0).getEnd();
                for (Sessionreduce.KeyedWindow keyedWindow2 : operation.getKeyedWindowsList()) {
                    String uniqueIdentifier3 = UniqueIdGenerator.getUniqueIdentifier(keyedWindow2);
                    if (!this.actorsMap.containsKey(uniqueIdentifier3)) {
                        throw new RuntimeException("merge operation error: session not found for id: " + uniqueIdentifier3);
                    }
                    if (Instant.ofEpochSecond(keyedWindow2.getStart().getSeconds(), keyedWindow2.getStart().getNanos()).isBefore(Instant.ofEpochSecond(start.getSeconds(), start.getNanos()))) {
                        start = keyedWindow2.getStart();
                    }
                    if (Instant.ofEpochSecond(keyedWindow2.getEnd().getSeconds(), keyedWindow2.getEnd().getNanos()).isAfter(Instant.ofEpochSecond(end.getSeconds(), end.getNanos()))) {
                        end = keyedWindow2.getEnd();
                    }
                }
                Sessionreduce.KeyedWindow m754build = Sessionreduce.KeyedWindow.newBuilder().setStart(start).setEnd(end).addAllKeys(operation.getKeyedWindows(0).mo721getKeysList()).setSlot(operation.getKeyedWindows(0).getSlot()).m754build();
                String uniqueIdentifier4 = UniqueIdGenerator.getUniqueIdentifier(m754build);
                this.numberOfPendingAccumulators = operation.getKeyedWindowsCount();
                Iterator<Sessionreduce.KeyedWindow> it = operation.getKeyedWindowsList().iterator();
                while (it.hasNext()) {
                    invokeActor(ActorRequest.builder().type(ActorRequestType.GET_ACCUMULATOR).keyedWindow(it.next()).mergeTaskId(uniqueIdentifier4).build());
                }
                invokeActor(ActorRequest.builder().type(ActorRequestType.OPEN).keyedWindow(m754build).build());
                return;
            default:
                throw new RuntimeException("received an unsupported window operation: " + operation.getEvent());
        }
    }

    private void invokeActor(ActorRequest actorRequest) {
        String uniqueIdentifier = UniqueIdGenerator.getUniqueIdentifier(actorRequest.getKeyedWindow());
        switch (actorRequest.getType()) {
            case OPEN:
                this.actorsMap.put(uniqueIdentifier, getContext().actorOf(SessionReducerActor.props(actorRequest.getKeyedWindow(), this.sessionReducerFactory.createSessionReducer(), this.outputActor)));
                break;
            case APPEND:
                if (!this.actorsMap.containsKey(uniqueIdentifier)) {
                    this.actorsMap.put(uniqueIdentifier, getContext().actorOf(SessionReducerActor.props(actorRequest.getKeyedWindow(), this.sessionReducerFactory.createSessionReducer(), this.outputActor)));
                    break;
                }
                break;
            case CLOSE:
                if (this.actorsMap.containsKey(uniqueIdentifier)) {
                    this.actorsMap.get(uniqueIdentifier).tell("EOF", getSelf());
                    break;
                }
                break;
            case EXPAND:
                this.actorsMap.get(uniqueIdentifier).tell(actorRequest.getNewKeyedWindow(), getSelf());
                break;
            case GET_ACCUMULATOR:
                this.actorsMap.get(uniqueIdentifier).tell(new GetAccumulatorRequest(actorRequest.getMergeTaskId()), getSelf());
                break;
        }
        if (actorRequest.getPayload() != null) {
            this.actorsMap.get(uniqueIdentifier).tell(constructHandlerDatum(actorRequest.getPayload()), getSelf());
        }
    }

    private void handleActorResponse(ActorResponse actorResponse) {
        String uniqueIdentifier = UniqueIdGenerator.getUniqueIdentifier(actorResponse.getResponse().getKeyedWindow());
        if (actorResponse.isEOFResponse()) {
            this.actorsMap.remove(uniqueIdentifier);
            if (!this.actorsMap.isEmpty() || !this.isInputStreamClosed) {
                this.outputActor.tell(actorResponse, getSelf());
                return;
            } else {
                actorResponse.setLast(true);
                this.outputActor.tell(actorResponse, getSelf());
                return;
            }
        }
        String mergeTaskId = actorResponse.getMergeTaskId();
        if (!this.actorsMap.containsKey(mergeTaskId)) {
            throw new RuntimeException("received an accumulator but the corresponding parent merge session doesn't exist.");
        }
        this.numberOfPendingAccumulators--;
        if (!uniqueIdentifier.equals(mergeTaskId)) {
            this.actorsMap.remove(uniqueIdentifier);
        }
        this.actorsMap.get(mergeTaskId).tell(new MergeAccumulatorRequest(actorResponse.getAccumulator()), getSelf());
        if (this.numberOfPendingAccumulators == 0) {
            this.mergeRequestSender.tell(new MergeDoneResponse(), getSelf());
        }
    }

    private HandlerDatum constructHandlerDatum(Sessionreduce.SessionReduceRequest.Payload payload) {
        return new HandlerDatum(payload.getValue().toByteArray(), Instant.ofEpochSecond(payload.getWatermark().getSeconds(), payload.getWatermark().getNanos()), Instant.ofEpochSecond(payload.getEventTime().getSeconds(), payload.getEventTime().getNanos()), payload.getHeadersMap());
    }
}
