package io.numaproj.numaflow.sourcetransformer;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

/* loaded from: input_file:io/numaproj/numaflow/sourcetransformer/TransformerActor.class */
class TransformerActor extends AbstractActor {
    private final SourceTransformer transformer;

    public TransformerActor(SourceTransformer sourceTransformer) {
        this.transformer = sourceTransformer;
    }

    public static Props props(SourceTransformer sourceTransformer) {
        return Props.create(TransformerActor.class, new Object[]{sourceTransformer});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Sourcetransformer.SourceTransformRequest.class, this::processRequest).build();
    }

    private void processRequest(Sourcetransformer.SourceTransformRequest sourceTransformRequest) {
        Sourcetransformer.SourceTransformRequest.Request request = sourceTransformRequest.getRequest();
        try {
            getSender().tell(buildResponse(this.transformer.processMessage((String[]) request.mo2436getKeysList().toArray(new String[0]), new HandlerDatum(request.getValue().toByteArray(), Instant.ofEpochSecond(request.getWatermark().getSeconds(), request.getWatermark().getNanos()), Instant.ofEpochSecond(request.getEventTime().getSeconds(), request.getEventTime().getNanos()), request.getHeadersMap())), request.getId()), getSelf());
        } catch (Exception e) {
            getSender().tell(e, getSelf());
        }
        context().stop(getSelf());
    }

    private Sourcetransformer.SourceTransformResponse buildResponse(MessageList messageList, String str) {
        Sourcetransformer.SourceTransformResponse.Builder newBuilder = Sourcetransformer.SourceTransformResponse.newBuilder();
        messageList.getMessages().forEach(message -> {
            newBuilder.addResults(Sourcetransformer.SourceTransformResponse.Result.newBuilder().setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue())).setEventTime(Timestamp.newBuilder().setSeconds(message.getEventTime().getEpochSecond()).setNanos(message.getEventTime().getNano())).addAllKeys(message.getKeys() == null ? new ArrayList<>() : List.of((Object[]) message.getKeys())).addAllTags(message.getTags() == null ? new ArrayList<>() : List.of((Object[]) message.getTags())).m2566build());
        });
        return newBuilder.setId(str).m2517build();
    }
}
