package io.numaproj.numaflow.sinker;

import com.google.protobuf.Empty;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sink.v1.SinkGrpc;
import io.numaproj.numaflow.sink.v1.SinkOuterClass;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/numaproj/numaflow/sinker/Service.class */
public class Service extends SinkGrpc.SinkImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Service.class);
    private final ExecutorService sinkTaskExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    private final long SHUTDOWN_TIME = 30;
    private final Sinker sinker;

    public Service(Sinker sinker) {
        this.sinker = sinker;
    }

    @Override // io.numaproj.numaflow.sink.v1.SinkGrpc.SinkImplBase
    public StreamObserver<SinkOuterClass.SinkRequest> sinkFn(final StreamObserver<SinkOuterClass.SinkResponse> streamObserver) {
        if (this.sinker == null) {
            return ServerCalls.asyncUnimplementedStreamingCall(SinkGrpc.getSinkFnMethod(), streamObserver);
        }
        final DatumIteratorImpl datumIteratorImpl = new DatumIteratorImpl();
        final Future submit = this.sinkTaskExecutor.submit(() -> {
            return this.sinker.processMessages(datumIteratorImpl);
        });
        return new StreamObserver<SinkOuterClass.SinkRequest>() { // from class: io.numaproj.numaflow.sinker.Service.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(SinkOuterClass.SinkRequest sinkRequest) {
                try {
                    datumIteratorImpl.writeMessage(Service.this.constructHandlerDatum(sinkRequest));
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    onError(e);
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                Service.log.error("Encountered error in sinkFn - {}", th.getMessage());
                streamObserver.onError(th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                SinkOuterClass.SinkResponse build = SinkOuterClass.SinkResponse.newBuilder().build();
                try {
                    datumIteratorImpl.writeMessage(HandlerDatum.EOF_DATUM);
                    build = Service.this.buildResponseList((ResponseList) submit.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    onError(e);
                }
                streamObserver.onNext(build);
                streamObserver.onCompleted();
            }
        };
    }

    @Override // io.numaproj.numaflow.sink.v1.SinkGrpc.SinkImplBase
    public void isReady(Empty empty, StreamObserver<SinkOuterClass.ReadyResponse> streamObserver) {
        streamObserver.onNext(SinkOuterClass.ReadyResponse.newBuilder().setReady(true).build());
        streamObserver.onCompleted();
    }

    private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest sinkRequest) {
        return new HandlerDatum((String[]) sinkRequest.getKeysList().toArray(new String[0]), sinkRequest.getValue().toByteArray(), Instant.ofEpochSecond(sinkRequest.getWatermark().getSeconds(), sinkRequest.getWatermark().getNanos()), Instant.ofEpochSecond(sinkRequest.getEventTime().getSeconds(), sinkRequest.getEventTime().getNanos()), sinkRequest.getId());
    }

    public SinkOuterClass.SinkResponse buildResponseList(ResponseList responseList) {
        SinkOuterClass.SinkResponse.Builder newBuilder = SinkOuterClass.SinkResponse.newBuilder();
        responseList.getResponses().forEach(response -> {
            newBuilder.addResults(SinkOuterClass.SinkResponse.Result.newBuilder().setId(response.getId() == null ? "" : response.getId()).setErrMsg(response.getErr() == null ? "" : response.getErr()).setSuccess(response.getSuccess().booleanValue()).build());
        });
        return newBuilder.build();
    }

    public void shutDown() {
        this.sinkTaskExecutor.shutdown();
        try {
            if (this.sinkTaskExecutor.awaitTermination(30L, TimeUnit.SECONDS)) {
                log.error("Sink executor was terminated.");
            } else {
                log.error("Sink executor did not terminate in the specified time.");
                log.error("Sink executor was abruptly shut down. " + this.sinkTaskExecutor.shutdownNow().size() + " tasks will not be executed.");
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
            e.printStackTrace();
        }
    }
}
