package io.numaproj.numaflow.mapstreamer;

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/numaproj/numaflow/mapstreamer/Service.class */
public class Service extends MapGrpc.MapImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Service.class);
    private final MapStreamer mapStreamer;
    private final CompletableFuture<Void> shutdownSignal;

    @Override // io.numaproj.numaflow.map.v1.MapGrpc.AsyncService
    public StreamObserver<MapOuterClass.MapRequest> mapFn(final StreamObserver<MapOuterClass.MapResponse> streamObserver) {
        return this.mapStreamer == null ? ServerCalls.asyncUnimplementedStreamingCall(MapGrpc.getMapFnMethod(), streamObserver) : new StreamObserver<MapOuterClass.MapRequest>() { // from class: io.numaproj.numaflow.mapstreamer.Service.1
            private boolean handshakeDone = false;

            public void onNext(MapOuterClass.MapRequest mapRequest) {
                if (!this.handshakeDone) {
                    if (!mapRequest.hasHandshake() || !mapRequest.getHandshake().getSot()) {
                        streamObserver.onError(Status.INVALID_ARGUMENT.withDescription("Handshake request not received").asException());
                        return;
                    } else {
                        streamObserver.onNext(MapOuterClass.MapResponse.newBuilder().setHandshake(mapRequest.getHandshake()).m112build());
                        this.handshakeDone = true;
                        return;
                    }
                }
                try {
                    Service.this.mapStreamer.processMessage((String[]) mapRequest.getRequest().mo73getKeysList().toArray(new String[0]), Service.this.constructHandlerDatum(mapRequest), new OutputObserverImpl(streamObserver));
                    streamObserver.onNext(MapOuterClass.MapResponse.newBuilder().setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true).m189build()).m112build());
                } catch (Exception e) {
                    Service.log.error("Encountered error in mapFn onNext - {}", e.getMessage());
                    Service.this.shutdownSignal.completeExceptionally(e);
                    streamObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asException());
                }
            }

            public void onError(Throwable th) {
                Service.log.error("Encountered error in mapStream Stream - {}", th.getMessage());
                Service.this.shutdownSignal.completeExceptionally(th);
                streamObserver.onError(Status.INTERNAL.withDescription(th.getMessage()).withCause(th).asException());
            }

            public void onCompleted() {
                streamObserver.onCompleted();
            }
        };
    }

    @Override // io.numaproj.numaflow.map.v1.MapGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<MapOuterClass.ReadyResponse> streamObserver) {
        streamObserver.onNext(MapOuterClass.ReadyResponse.newBuilder().setReady(true).m164build());
        streamObserver.onCompleted();
    }

    private HandlerDatum constructHandlerDatum(MapOuterClass.MapRequest mapRequest) {
        return new HandlerDatum(mapRequest.getRequest().getValue().toByteArray(), Instant.ofEpochSecond(mapRequest.getRequest().getWatermark().getSeconds(), mapRequest.getRequest().getWatermark().getNanos()), Instant.ofEpochSecond(mapRequest.getRequest().getEventTime().getSeconds(), mapRequest.getRequest().getEventTime().getNanos()), mapRequest.getRequest().getHeadersMap());
    }

    @Generated
    public Service(MapStreamer mapStreamer, CompletableFuture<Void> completableFuture) {
        this.mapStreamer = mapStreamer;
        this.shutdownSignal = completableFuture;
    }
}
