package io.numaproj.numaflow.sink;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import io.numaproj.numaflow.sink.handler.SinkHandler;
import io.numaproj.numaflow.sink.types.ResponseList;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/sink/SinkActor.class */
class SinkActor extends AbstractActor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SinkActor.class);
    private final SinkHandler sinkHandler;
    private final ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();

    public static Props props(SinkHandler sinkHandler) {
        return Props.create((Class<?>) SinkActor.class, sinkHandler);
    }

    public SinkActor(SinkHandler sinkHandler) {
        this.sinkHandler = sinkHandler;
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(HandlerDatum.class, this::invokeHandler).match(String.class, this::getResult).build();
    }

    private void invokeHandler(HandlerDatum handlerDatum) {
        this.responseListBuilder.addResponse(this.sinkHandler.processMessage(handlerDatum));
    }

    private void getResult(String str) {
        getContext().getParent().tell(this.responseListBuilder.build(), ActorRef.noSender());
    }
}
