package io.numaproj.numaflow.sourcetransformer;

import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.Timestamp;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc;
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/sourcetransformer/Service.class */
class Service extends SourceTransformGrpc.SourceTransformImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Service.class);
    private final SourceTransformer transformer;

    @Override // io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc.AsyncService
    public void sourceTransformFn(Sourcetransformer.SourceTransformRequest sourceTransformRequest, StreamObserver<Sourcetransformer.SourceTransformResponse> streamObserver) {
        if (this.transformer == null) {
            ServerCalls.asyncUnimplementedUnaryCall(MapGrpc.getMapFnMethod(), streamObserver);
            return;
        }
        streamObserver.onNext(buildResponse(this.transformer.processMessage((String[]) sourceTransformRequest.mo2151getKeysList().toArray(new String[0]), new HandlerDatum(sourceTransformRequest.getValue().toByteArray(), Instant.ofEpochSecond(sourceTransformRequest.getWatermark().getSeconds(), sourceTransformRequest.getWatermark().getNanos()), Instant.ofEpochSecond(sourceTransformRequest.getEventTime().getSeconds(), sourceTransformRequest.getEventTime().getNanos()), sourceTransformRequest.getHeadersMap()))));
        streamObserver.onCompleted();
    }

    @Override // io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<Sourcetransformer.ReadyResponse> streamObserver) {
        streamObserver.onNext(Sourcetransformer.ReadyResponse.newBuilder().setReady(true).m2136build());
        streamObserver.onCompleted();
    }

    private Sourcetransformer.SourceTransformResponse buildResponse(MessageList messageList) {
        Sourcetransformer.SourceTransformResponse.Builder newBuilder = Sourcetransformer.SourceTransformResponse.newBuilder();
        messageList.getMessages().forEach(message -> {
            newBuilder.addResults(Sourcetransformer.SourceTransformResponse.Result.newBuilder().setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue())).setEventTime(Timestamp.newBuilder().setSeconds(message.getEventTime().getEpochSecond()).setNanos(message.getEventTime().getNano())).addAllKeys(message.getKeys() == null ? new ArrayList<>() : List.of((Object[]) message.getKeys())).addAllTags(message.getTags() == null ? new ArrayList<>() : List.of((Object[]) message.getTags())).m2281build());
        });
        return newBuilder.m2232build();
    }

    @Generated
    public Service(SourceTransformer sourceTransformer) {
        this.transformer = sourceTransformer;
    }
}
