package io.numaproj.numaflow.sessionreducer;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;

/* loaded from: input_file:io/numaproj/numaflow/sessionreducer/SessionReducerActor.class */
class SessionReducerActor extends AbstractActor {
    private final SessionReducer sessionReducer;
    private final OutputStreamObserver outputStream;
    private Sessionreduce.KeyedWindow keyedWindow;
    private boolean isClosed = false;

    public SessionReducerActor(Sessionreduce.KeyedWindow keyedWindow, SessionReducer sessionReducer, OutputStreamObserver outputStreamObserver) {
        this.keyedWindow = keyedWindow;
        this.sessionReducer = sessionReducer;
        this.outputStream = outputStreamObserver;
    }

    public static Props props(Sessionreduce.KeyedWindow keyedWindow, SessionReducer sessionReducer, ActorRef actorRef) {
        return Props.create(SessionReducerActor.class, new Object[]{keyedWindow, sessionReducer, new OutputStreamObserverImpl(actorRef, keyedWindow)});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Sessionreduce.KeyedWindow.class, this::updateKeyedWindow).match(HandlerDatum.class, this::invokeHandler).match(String.class, this::handleEOF).match(GetAccumulatorRequest.class, this::handleGetAccumulatorRequest).match(MergeAccumulatorRequest.class, this::handleMergeAccumulatorRequest).build();
    }

    private void updateKeyedWindow(Sessionreduce.KeyedWindow keyedWindow) {
        this.keyedWindow = keyedWindow;
        ((OutputStreamObserverImpl) this.outputStream).setKeyedWindow(keyedWindow);
    }

    private void invokeHandler(HandlerDatum handlerDatum) {
        this.sessionReducer.processMessage((String[]) this.keyedWindow.mo721getKeysList().toArray(new String[0]), handlerDatum, this.outputStream);
    }

    private void handleEOF(String str) {
        if (this.isClosed) {
            return;
        }
        this.sessionReducer.handleEndOfStream((String[]) this.keyedWindow.mo721getKeysList().toArray(new String[0]), this.outputStream);
        getSender().tell(buildEOFResponse(), getSelf());
        this.isClosed = true;
    }

    private void handleGetAccumulatorRequest(GetAccumulatorRequest getAccumulatorRequest) {
        getSender().tell(buildMergeResponse(this.sessionReducer.accumulator(), getAccumulatorRequest.getMergeTaskId()), getSelf());
        this.isClosed = true;
    }

    private void handleMergeAccumulatorRequest(MergeAccumulatorRequest mergeAccumulatorRequest) {
        this.sessionReducer.mergeAccumulator(mergeAccumulatorRequest.getAccumulator());
    }

    private ActorResponse buildEOFResponse() {
        Sessionreduce.SessionReduceResponse.Builder newBuilder = Sessionreduce.SessionReduceResponse.newBuilder();
        newBuilder.setKeyedWindow(this.keyedWindow);
        newBuilder.setEOF(true);
        return ActorResponse.builder().response(newBuilder.m993build()).build();
    }

    private ActorResponse buildMergeResponse(byte[] bArr, String str) {
        Sessionreduce.SessionReduceResponse.Builder newBuilder = Sessionreduce.SessionReduceResponse.newBuilder();
        newBuilder.setKeyedWindow(this.keyedWindow);
        return ActorResponse.builder().response(newBuilder.m993build()).accumulator(bArr).mergeTaskId(str).build();
    }
}
