package io.numaproj.numaflow.function;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.ByteString;
import io.numaproj.numaflow.function.handlers.ReduceHandler;
import io.numaproj.numaflow.function.interfaces.Metadata;
import io.numaproj.numaflow.function.types.MessageList;
import io.numaproj.numaflow.function.v1.Udfunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/function/ReduceActor.class */
class ReduceActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReduceActor.class);
    private String[] keys;
    private Metadata md;
    private ReduceHandler groupBy;

    public static Props props(String[] strArr, Metadata metadata, ReduceHandler reduceHandler) {
        return Props.create((Class<?>) ReduceActor.class, strArr, metadata, reduceHandler);
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(HandlerDatum.class, this::invokeHandler).match(String.class, this::getResult).build();
    }

    private void invokeHandler(HandlerDatum handlerDatum) {
        this.groupBy.addMessage(this.keys, handlerDatum, this.md);
    }

    private void getResult(String str) {
        getSender().tell(buildDatumListResponse(this.groupBy.getOutput(this.keys, this.md)), getSelf());
    }

    private ActorResponse buildDatumListResponse(MessageList messageList) {
        Udfunction.DatumResponseList.Builder newBuilder = Udfunction.DatumResponseList.newBuilder();
        messageList.getMessages().forEach(message -> {
            newBuilder.addElements(Udfunction.DatumResponse.newBuilder().setValue(ByteString.copyFrom(message.getValue())).addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys())).addAllTags(message.getTags() == null ? new ArrayList<>() : List.of((Object[]) message.getTags())).build());
        });
        return new ActorResponse(this.keys, newBuilder.build());
    }

    @Generated
    public ReduceActor(String[] strArr, Metadata metadata, ReduceHandler reduceHandler) {
        this.keys = strArr;
        this.md = metadata;
        this.groupBy = reduceHandler;
    }
}
