package io.numaproj.numaflow.batchmapper;

import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.batchmap.v1.BatchMapGrpc;
import io.numaproj.numaflow.batchmap.v1.Batchmap;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/numaproj/numaflow/batchmapper/Service.class */
public class Service extends BatchMapGrpc.BatchMapImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Service.class);
    private final ExecutorService batchMapTaskExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    private final long SHUTDOWN_TIME = 30;
    private final BatchMapper batchMapper;

    @Override // io.numaproj.numaflow.batchmap.v1.BatchMapGrpc.AsyncService
    public StreamObserver<Batchmap.BatchMapRequest> batchMapFn(final StreamObserver<Batchmap.BatchMapResponse> streamObserver) {
        if (this.batchMapper == null) {
            return ServerCalls.asyncUnimplementedStreamingCall(BatchMapGrpc.getBatchMapFnMethod(), streamObserver);
        }
        final DatumIteratorImpl datumIteratorImpl = new DatumIteratorImpl();
        final Future submit = this.batchMapTaskExecutor.submit(() -> {
            return this.batchMapper.processMessage(datumIteratorImpl);
        });
        return new StreamObserver<Batchmap.BatchMapRequest>() { // from class: io.numaproj.numaflow.batchmapper.Service.1
            public void onNext(Batchmap.BatchMapRequest batchMapRequest) {
                try {
                    datumIteratorImpl.writeMessage(Service.this.constructHandlerDatum(batchMapRequest));
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    onError(e);
                }
            }

            public void onError(Throwable th) {
                Service.log.error("Error Encountered in batchMap Stream", th);
                streamObserver.onError(Status.UNKNOWN.withDescription(th.getMessage()).withCause(th).asException());
            }

            public void onCompleted() {
                try {
                    datumIteratorImpl.writeMessage(HandlerDatum.EOF_DATUM);
                    BatchResponses batchResponses = (BatchResponses) submit.get();
                    Service.log.debug("Finished the call Result size is :{} and iterator count is :{}", Integer.valueOf(batchResponses.getItems().size()), Integer.valueOf(datumIteratorImpl.getCount()));
                    if (batchResponses.getItems().size() != datumIteratorImpl.getCount() - 1) {
                        throw new RuntimeException("Number of results did not match expected " + (datumIteratorImpl.getCount() - 1) + " but got " + batchResponses.getItems().size());
                    }
                    Service.this.buildAndStreamResponse(batchResponses, streamObserver);
                } catch (Exception e) {
                    Service.log.error("Error Encountered in batchMap Stream onCompleted", e);
                    onError(e);
                }
            }
        };
    }

    private void buildAndStreamResponse(BatchResponses batchResponses, StreamObserver<Batchmap.BatchMapResponse> streamObserver) {
        batchResponses.getItems().forEach(batchResponse -> {
            ArrayList arrayList = new ArrayList();
            batchResponse.getItems().forEach(message -> {
                arrayList.add(Batchmap.BatchMapResponse.Result.newBuilder().setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue())).addAllKeys(message.getKeys() == null ? new ArrayList<>() : List.of((Object[]) message.getKeys())).addAllTags(message.getTags() == null ? new ArrayList<>() : List.of((Object[]) message.getTags())).m145build());
            });
            streamObserver.onNext(Batchmap.BatchMapResponse.newBuilder().setId(batchResponse.getId()).addAllResults(arrayList).m96build());
        });
        streamObserver.onCompleted();
    }

    @Override // io.numaproj.numaflow.batchmap.v1.BatchMapGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<Batchmap.ReadyResponse> streamObserver) {
        streamObserver.onNext(Batchmap.ReadyResponse.newBuilder().setReady(true).m192build());
        streamObserver.onCompleted();
    }

    private HandlerDatum constructHandlerDatum(Batchmap.BatchMapRequest batchMapRequest) {
        return new HandlerDatum((String[]) batchMapRequest.mo15getKeysList().toArray(new String[0]), batchMapRequest.getValue().toByteArray(), Instant.ofEpochSecond(batchMapRequest.getWatermark().getSeconds(), batchMapRequest.getWatermark().getNanos()), Instant.ofEpochSecond(batchMapRequest.getEventTime().getSeconds(), batchMapRequest.getEventTime().getNanos()), batchMapRequest.getId(), batchMapRequest.getHeadersMap());
    }

    public void shutDown() {
        this.batchMapTaskExecutor.shutdown();
        try {
            if (this.batchMapTaskExecutor.awaitTermination(30L, TimeUnit.SECONDS)) {
                log.info("BatchMap executor was terminated.");
            } else {
                log.error("BatchMap executor did not terminate in the specified time.");
                log.error("BatchMap executor was abruptly shut down. " + this.batchMapTaskExecutor.shutdownNow().size() + " tasks will not be executed.");
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
            e.printStackTrace();
        }
    }

    @Generated
    public Service(BatchMapper batchMapper) {
        this.batchMapper = batchMapper;
    }
}
