package io.numaproj.numaflow.sinker;

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sink.v1.SinkGrpc;
import io.numaproj.numaflow.sink.v1.SinkOuterClass;
import java.time.Instant;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/numaproj/numaflow/sinker/Service.class */
public class Service extends SinkGrpc.SinkImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Service.class);
    private final ExecutorService sinkTaskExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    private final Sinker sinker;
    private final CompletableFuture<Void> shutdownSignal;

    @Override // io.numaproj.numaflow.sink.v1.SinkGrpc.AsyncService
    public StreamObserver<SinkOuterClass.SinkRequest> sinkFn(final StreamObserver<SinkOuterClass.SinkResponse> streamObserver) {
        return new StreamObserver<SinkOuterClass.SinkRequest>() { // from class: io.numaproj.numaflow.sinker.Service.1
            private CompletableFuture<ResponseList> result;
            private DatumIteratorImpl datumStream;
            private boolean startOfStream = true;
            private boolean handshakeDone = false;

            public void onNext(SinkOuterClass.SinkRequest sinkRequest) {
                if (!this.handshakeDone) {
                    if (!sinkRequest.hasHandshake() || !sinkRequest.getHandshake().getSot()) {
                        streamObserver.onError(Status.INVALID_ARGUMENT.withDescription("Handshake request not received").asException());
                        return;
                    } else {
                        streamObserver.onNext(SinkOuterClass.SinkResponse.newBuilder().setHandshake(sinkRequest.getHandshake()).m786build());
                        this.handshakeDone = true;
                        return;
                    }
                }
                if (this.startOfStream) {
                    this.datumStream = new DatumIteratorImpl();
                    this.result = CompletableFuture.supplyAsync(() -> {
                        return Service.this.sinker.processMessages(this.datumStream);
                    }, Service.this.sinkTaskExecutor);
                    this.startOfStream = false;
                }
                try {
                    if (sinkRequest.hasStatus() && sinkRequest.getStatus().getEot()) {
                        this.datumStream.writeMessage(HandlerDatum.EOF_DATUM);
                        ResponseList join = this.result.join();
                        SinkOuterClass.SinkResponse.Builder newBuilder = SinkOuterClass.SinkResponse.newBuilder();
                        Iterator<Response> it = join.getResponses().iterator();
                        while (it.hasNext()) {
                            newBuilder.addResults(Service.this.buildResult(it.next()));
                        }
                        streamObserver.onNext(newBuilder.m786build());
                        streamObserver.onNext(SinkOuterClass.SinkResponse.newBuilder().setStatus(SinkOuterClass.TransmissionStatus.newBuilder().setEot(true).m838build()).m786build());
                        this.startOfStream = true;
                    } else {
                        this.datumStream.writeMessage(Service.this.constructHandlerDatum(sinkRequest));
                    }
                } catch (Exception e) {
                    Service.log.error("Encountered error in sinkFn onNext - {}", e.getMessage());
                    Service.this.shutdownSignal.completeExceptionally(e);
                    streamObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException());
                }
            }

            public void onError(Throwable th) {
                Service.log.error("Encountered error in sinkFn - {}", th.getMessage());
                Service.this.shutdownSignal.completeExceptionally(th);
                streamObserver.onError(Status.INTERNAL.withDescription(th.getMessage()).withCause(th).asException());
            }

            public void onCompleted() {
                streamObserver.onCompleted();
            }
        };
    }

    private SinkOuterClass.SinkResponse.Result buildResult(Response response) {
        return SinkOuterClass.SinkResponse.Result.newBuilder().setId(response.getId() == null ? "" : response.getId()).setErrMsg(response.getErr() == null ? "" : response.getErr()).setStatus(response.getFallback().booleanValue() ? SinkOuterClass.Status.FALLBACK : response.getSuccess().booleanValue() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE).m811build();
    }

    @Override // io.numaproj.numaflow.sink.v1.SinkGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<SinkOuterClass.ReadyResponse> streamObserver) {
        streamObserver.onNext(SinkOuterClass.ReadyResponse.newBuilder().setReady(true).m709build());
        streamObserver.onCompleted();
    }

    private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest sinkRequest) {
        return new HandlerDatum((String[]) sinkRequest.getRequest().mo747getKeysList().toArray(new String[0]), sinkRequest.getRequest().getValue().toByteArray(), Instant.ofEpochSecond(sinkRequest.getRequest().getWatermark().getSeconds(), sinkRequest.getRequest().getWatermark().getNanos()), Instant.ofEpochSecond(sinkRequest.getRequest().getEventTime().getSeconds(), sinkRequest.getRequest().getEventTime().getNanos()), sinkRequest.getRequest().getId(), sinkRequest.getRequest().getHeadersMap());
    }

    public void shutDown() {
        this.sinkTaskExecutor.shutdown();
        try {
            if (this.sinkTaskExecutor.awaitTermination(30L, TimeUnit.SECONDS)) {
                log.info("Sink executor was terminated.");
            } else {
                log.error("Sink executor did not terminate in the specified time.");
                log.error("Sink executor was abruptly shut down. {} tasks will not be executed.", Integer.valueOf(this.sinkTaskExecutor.shutdownNow().size()));
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
            e.printStackTrace();
        }
    }

    @Generated
    public Service(Sinker sinker, CompletableFuture<Void> completableFuture) {
        this.sinker = sinker;
        this.shutdownSignal = completableFuture;
    }
}
