package io.numaproj.numaflow.batchmapper;

import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/numaproj/numaflow/batchmapper/Service.class */
public class Service extends MapGrpc.MapImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Service.class);
    private final ExecutorService mapTaskExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    private final long SHUTDOWN_TIME = 30;
    private final BatchMapper batchMapper;
    private final CompletableFuture<Void> shutdownSignal;

    @Override // io.numaproj.numaflow.map.v1.MapGrpc.AsyncService
    public StreamObserver<MapOuterClass.MapRequest> mapFn(final StreamObserver<MapOuterClass.MapResponse> streamObserver) {
        return this.batchMapper == null ? ServerCalls.asyncUnimplementedStreamingCall(MapGrpc.getMapFnMethod(), streamObserver) : new StreamObserver<MapOuterClass.MapRequest>() { // from class: io.numaproj.numaflow.batchmapper.Service.1
            private boolean startOfStream = true;
            private boolean handshakeDone = false;
            private DatumIteratorImpl datumStream;
            private CompletableFuture<BatchResponses> result;

            public void onNext(MapOuterClass.MapRequest mapRequest) {
                try {
                    if (!this.handshakeDone) {
                        if (!mapRequest.hasHandshake() || !mapRequest.getHandshake().getSot()) {
                            streamObserver.onError(Status.INVALID_ARGUMENT.withDescription("Handshake request not received").asException());
                            return;
                        } else {
                            streamObserver.onNext(MapOuterClass.MapResponse.newBuilder().setHandshake(mapRequest.getHandshake()).m198build());
                            this.handshakeDone = true;
                            return;
                        }
                    }
                    if (this.startOfStream) {
                        this.datumStream = new DatumIteratorImpl();
                        this.result = CompletableFuture.supplyAsync(() -> {
                            return Service.this.batchMapper.processMessage(this.datumStream);
                        }, Service.this.mapTaskExecutor);
                        this.startOfStream = false;
                    }
                    if (mapRequest.hasStatus() && mapRequest.getStatus().getEot()) {
                        this.datumStream.writeMessage(HandlerDatum.EOF_DATUM);
                        Service.this.buildAndStreamResponse(this.result.join(), streamObserver);
                        this.startOfStream = true;
                    } else {
                        this.datumStream.writeMessage(Service.this.constructHandlerDatum(mapRequest));
                    }
                } catch (Exception e) {
                    Service.log.error("Encountered an error in batch map onNext - {}", e.getMessage());
                    Service.this.shutdownSignal.completeExceptionally(e);
                    streamObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asException());
                }
            }

            public void onError(Throwable th) {
                Service.log.error("Error Encountered in batchMap Stream - {}", th.getMessage());
                Service.this.shutdownSignal.completeExceptionally(th);
                streamObserver.onError(Status.INTERNAL.withDescription(th.getMessage()).withCause(th).asException());
            }

            public void onCompleted() {
                streamObserver.onCompleted();
            }
        };
    }

    private void buildAndStreamResponse(BatchResponses batchResponses, StreamObserver<MapOuterClass.MapResponse> streamObserver) {
        batchResponses.getItems().forEach(batchResponse -> {
            ArrayList arrayList = new ArrayList();
            batchResponse.getItems().forEach(message -> {
                arrayList.add(MapOuterClass.MapResponse.Result.newBuilder().setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue())).addAllKeys(message.getKeys() == null ? new ArrayList<>() : List.of((Object[]) message.getKeys())).addAllTags(message.getTags() == null ? new ArrayList<>() : List.of((Object[]) message.getTags())).m247build());
            });
            streamObserver.onNext(MapOuterClass.MapResponse.newBuilder().setId(batchResponse.getId()).addAllResults(arrayList).m198build());
        });
        streamObserver.onNext(MapOuterClass.MapResponse.newBuilder().setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true).m341build()).m198build());
        streamObserver.onCompleted();
    }

    @Override // io.numaproj.numaflow.map.v1.MapGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<MapOuterClass.ReadyResponse> streamObserver) {
        streamObserver.onNext(MapOuterClass.ReadyResponse.newBuilder().setReady(true).m294build());
        streamObserver.onCompleted();
    }

    private HandlerDatum constructHandlerDatum(MapOuterClass.MapRequest mapRequest) {
        return new HandlerDatum((String[]) mapRequest.getRequest().mo117getKeysList().toArray(new String[0]), mapRequest.getRequest().getValue().toByteArray(), Instant.ofEpochSecond(mapRequest.getRequest().getWatermark().getSeconds(), mapRequest.getRequest().getWatermark().getNanos()), Instant.ofEpochSecond(mapRequest.getRequest().getEventTime().getSeconds(), mapRequest.getRequest().getEventTime().getNanos()), mapRequest.getId(), mapRequest.getRequest().getHeadersMap());
    }

    public void shutDown() {
        this.mapTaskExecutor.shutdown();
        try {
            if (this.mapTaskExecutor.awaitTermination(30L, TimeUnit.SECONDS)) {
                log.info("BatchMap executor was terminated.");
            } else {
                log.error("BatchMap executor did not terminate in the specified time.");
                log.error("BatchMap executor was abruptly shut down. {} tasks will not be executed.", Integer.valueOf(this.mapTaskExecutor.shutdownNow().size()));
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
            e.printStackTrace();
        }
    }

    @Generated
    public Service(BatchMapper batchMapper, CompletableFuture<Void> completableFuture) {
        this.batchMapper = batchMapper;
        this.shutdownSignal = completableFuture;
    }
}
