package io.numaproj.numaflow.sourcer;

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.source.v1.SourceGrpc;
import io.numaproj.numaflow.source.v1.SourceOuterClass;
import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/numaproj/numaflow/sourcer/Service.class */
public class Service extends SourceGrpc.SourceImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Service.class);
    private final Sourcer sourcer;
    private final CompletableFuture<Void> shutdownSignal;

    @Override // io.numaproj.numaflow.source.v1.SourceGrpc.AsyncService
    public StreamObserver<SourceOuterClass.ReadRequest> readFn(final StreamObserver<SourceOuterClass.ReadResponse> streamObserver) {
        return new StreamObserver<SourceOuterClass.ReadRequest>() { // from class: io.numaproj.numaflow.sourcer.Service.1
            private boolean handshakeDone = false;

            public void onNext(SourceOuterClass.ReadRequest readRequest) {
                if (!this.handshakeDone) {
                    if (!readRequest.hasHandshake() || !readRequest.getHandshake().getSot()) {
                        streamObserver.onError(Status.INVALID_ARGUMENT.withDescription("Handshake request not received").asException());
                        return;
                    } else {
                        streamObserver.onNext(SourceOuterClass.ReadResponse.newBuilder().setHandshake(readRequest.getHandshake()).m1175build());
                        this.handshakeDone = true;
                        return;
                    }
                }
                try {
                    Service.this.sourcer.read(new ReadRequestImpl(readRequest.getRequest().getNumRecords(), Duration.ofMillis(readRequest.getRequest().getTimeoutInMs())), new OutputObserverImpl(streamObserver));
                    streamObserver.onNext(SourceOuterClass.ReadResponse.newBuilder().setStatus(SourceOuterClass.ReadResponse.Status.newBuilder().setEot(true).setCode(SourceOuterClass.ReadResponse.Status.Code.SUCCESS).m1227build()).m1175build());
                } catch (Exception e) {
                    Service.log.error("Encountered error in readFn onNext", e);
                    Service.this.shutdownSignal.completeExceptionally(e);
                    streamObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asException());
                }
            }

            public void onError(Throwable th) {
                Service.log.error("Encountered error in readFn onNext", th);
                Service.this.shutdownSignal.completeExceptionally(th);
                streamObserver.onError(Status.INTERNAL.withDescription(th.getMessage()).withCause(th).asException());
            }

            public void onCompleted() {
                streamObserver.onCompleted();
            }
        };
    }

    @Override // io.numaproj.numaflow.source.v1.SourceGrpc.AsyncService
    public StreamObserver<SourceOuterClass.AckRequest> ackFn(final StreamObserver<SourceOuterClass.AckResponse> streamObserver) {
        return new StreamObserver<SourceOuterClass.AckRequest>() { // from class: io.numaproj.numaflow.sourcer.Service.2
            private boolean handshakeDone = false;

            public void onNext(SourceOuterClass.AckRequest ackRequest) {
                if (!this.handshakeDone) {
                    if (!ackRequest.hasHandshake() || !ackRequest.getHandshake().getSot()) {
                        streamObserver.onError(Status.INVALID_ARGUMENT.withDescription("Handshake request not received").asException());
                        return;
                    } else {
                        streamObserver.onNext(SourceOuterClass.AckResponse.newBuilder().setHandshake(ackRequest.getHandshake()).m925build());
                        this.handshakeDone = true;
                        return;
                    }
                }
                try {
                    ArrayList arrayList = new ArrayList(ackRequest.getRequest().getOffsetsCount());
                    for (SourceOuterClass.Offset offset : ackRequest.getRequest().getOffsetsList()) {
                        arrayList.add(new Offset(offset.getOffset().toByteArray(), Integer.valueOf(offset.getPartitionId())));
                    }
                    Service.this.sourcer.ack(new AckRequestImpl(arrayList));
                    streamObserver.onNext(SourceOuterClass.AckResponse.newBuilder().setResult(SourceOuterClass.AckResponse.Result.newBuilder().setSuccess(Empty.newBuilder().build())).m925build());
                } catch (Exception e) {
                    Service.log.error("Encountered error in ackFn onNext", e);
                    Service.this.shutdownSignal.completeExceptionally(e);
                    streamObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asException());
                }
            }

            public void onError(Throwable th) {
                Service.log.error("Encountered error in ackFn onNext", th);
                Service.this.shutdownSignal.completeExceptionally(th);
                streamObserver.onError(Status.INTERNAL.withDescription(th.getMessage()).withCause(th).asException());
            }

            public void onCompleted() {
                streamObserver.onCompleted();
            }
        };
    }

    @Override // io.numaproj.numaflow.source.v1.SourceGrpc.AsyncService
    public void pendingFn(Empty empty, StreamObserver<SourceOuterClass.PendingResponse> streamObserver) {
        if (this.sourcer == null) {
            ServerCalls.asyncUnimplementedUnaryCall(SourceGrpc.getPendingFnMethod(), streamObserver);
        } else {
            streamObserver.onNext(SourceOuterClass.PendingResponse.newBuilder().setResult(SourceOuterClass.PendingResponse.Result.newBuilder().setCount(this.sourcer.getPending()).m1100build()).m1075build());
            streamObserver.onCompleted();
        }
    }

    @Override // io.numaproj.numaflow.source.v1.SourceGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<SourceOuterClass.ReadyResponse> streamObserver) {
        streamObserver.onNext(SourceOuterClass.ReadyResponse.newBuilder().setReady(true).m1256build());
        streamObserver.onCompleted();
    }

    @Override // io.numaproj.numaflow.source.v1.SourceGrpc.AsyncService
    public void partitionsFn(Empty empty, StreamObserver<SourceOuterClass.PartitionsResponse> streamObserver) {
        if (this.sourcer == null) {
            ServerCalls.asyncUnimplementedUnaryCall(SourceGrpc.getPendingFnMethod(), streamObserver);
            return;
        }
        streamObserver.onNext(SourceOuterClass.PartitionsResponse.newBuilder().setResult(SourceOuterClass.PartitionsResponse.Result.newBuilder().addAllPartitions(this.sourcer.getPartitions())).m1025build());
        streamObserver.onCompleted();
    }

    @Generated
    public Service(Sourcer sourcer, CompletableFuture<Void> completableFuture) {
        this.sourcer = sourcer;
        this.shutdownSignal = completableFuture;
    }
}
