package io.numaproj.numaflow.sourcer;

import com.google.protobuf.Empty;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.source.v1.SourceGrpc;
import io.numaproj.numaflow.source.v1.SourceOuterClass;
import java.time.Duration;
import java.util.ArrayList;

/* loaded from: input_file:io/numaproj/numaflow/sourcer/Service.class */
class Service extends SourceGrpc.SourceImplBase {
    private final Sourcer sourcer;

    public Service(Sourcer sourcer) {
        this.sourcer = sourcer;
    }

    @Override // io.numaproj.numaflow.source.v1.SourceGrpc.AsyncService
    public void readFn(SourceOuterClass.ReadRequest readRequest, StreamObserver<SourceOuterClass.ReadResponse> streamObserver) {
        if (this.sourcer == null) {
            ServerCalls.asyncUnimplementedUnaryCall(SourceGrpc.getReadFnMethod(), streamObserver);
            return;
        }
        this.sourcer.read(new ReadRequestImpl(readRequest.getRequest().getNumRecords(), Duration.ofMillis(readRequest.getRequest().getTimeoutInMs())), new OutputObserverImpl(streamObserver));
        streamObserver.onCompleted();
    }

    @Override // io.numaproj.numaflow.source.v1.SourceGrpc.AsyncService
    public void ackFn(SourceOuterClass.AckRequest ackRequest, StreamObserver<SourceOuterClass.AckResponse> streamObserver) {
        if (this.sourcer == null) {
            ServerCalls.asyncUnimplementedUnaryCall(SourceGrpc.getAckFnMethod(), streamObserver);
            return;
        }
        ArrayList arrayList = new ArrayList(ackRequest.getRequest().getOffsetsCount());
        for (SourceOuterClass.Offset offset : ackRequest.getRequest().getOffsetsList()) {
            arrayList.add(new Offset(offset.getOffset().toByteArray(), offset.getPartitionId()));
        }
        this.sourcer.ack(new AckRequestImpl(arrayList));
        streamObserver.onNext(SourceOuterClass.AckResponse.newBuilder().setResult(SourceOuterClass.AckResponse.Result.newBuilder().setSuccess(Empty.newBuilder().build())).build());
        streamObserver.onCompleted();
    }

    @Override // io.numaproj.numaflow.source.v1.SourceGrpc.AsyncService
    public void pendingFn(Empty empty, StreamObserver<SourceOuterClass.PendingResponse> streamObserver) {
        if (this.sourcer == null) {
            ServerCalls.asyncUnimplementedUnaryCall(SourceGrpc.getPendingFnMethod(), streamObserver);
        } else {
            streamObserver.onNext(SourceOuterClass.PendingResponse.newBuilder().setResult(SourceOuterClass.PendingResponse.Result.newBuilder().setCount(this.sourcer.getPending()).build()).build());
            streamObserver.onCompleted();
        }
    }

    @Override // io.numaproj.numaflow.source.v1.SourceGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<SourceOuterClass.ReadyResponse> streamObserver) {
        streamObserver.onNext(SourceOuterClass.ReadyResponse.newBuilder().setReady(true).build());
        streamObserver.onCompleted();
    }
}
