package io.numaproj.numaflow.sourcer;

import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.source.v1.SourceGrpc;
import io.numaproj.numaflow.source.v1.SourceOuterClass;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;

/* loaded from: input_file:io/numaproj/numaflow/sourcer/SourcerTestKit.class */
public class SourcerTestKit {
    private final Sourcer sourcer;
    private final GRPCConfig grpcConfig;
    private Server server;

    /* loaded from: input_file:io/numaproj/numaflow/sourcer/SourcerTestKit$Client.class */
    public static class Client {
        private final ManagedChannel channel;
        private final SourceGrpc.SourceStub sourceStub;

        public Client() {
            this("localhost", 50051);
        }

        public Client(String str, int i) {
            this.channel = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
            this.sourceStub = SourceGrpc.newStub(this.channel);
        }

        public void close() throws InterruptedException {
            this.channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
        }

        public void sendReadRequest(ReadRequest readRequest, final OutputObserver outputObserver) throws Exception {
            SourceOuterClass.ReadRequest m1887build = SourceOuterClass.ReadRequest.newBuilder().setRequest(SourceOuterClass.ReadRequest.Request.newBuilder().setNumRecords(readRequest.getCount()).setTimeoutInMs((int) readRequest.getTimeout().toMillis()).m1934build()).m1887build();
            final CompletableFuture completableFuture = new CompletableFuture();
            this.sourceStub.readFn(m1887build, new StreamObserver<SourceOuterClass.ReadResponse>() { // from class: io.numaproj.numaflow.sourcer.SourcerTestKit.Client.1
                public void onNext(SourceOuterClass.ReadResponse readResponse) {
                    outputObserver.send(new Message(readResponse.getResult().getPayload().toByteArray(), new Offset(readResponse.getResult().getOffset().getOffset().toByteArray(), Integer.valueOf(readResponse.getResult().getOffset().getPartitionId())), Instant.ofEpochSecond(readResponse.getResult().getEventTime().getSeconds(), readResponse.getResult().getEventTime().getNanos()), (String[]) readResponse.getResult().mo1996getKeysList().toArray(new String[0]), readResponse.getResult().getHeadersMap()));
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }

                public void onCompleted() {
                    completableFuture.complete(true);
                }
            });
            completableFuture.get();
        }

        public void sendAckRequest(AckRequest ackRequest) throws Exception {
            final CompletableFuture completableFuture = new CompletableFuture();
            SourceOuterClass.AckRequest.Request.Builder newBuilder = SourceOuterClass.AckRequest.Request.newBuilder();
            for (Offset offset : ackRequest.getOffsets()) {
                newBuilder.addOffsets(SourceOuterClass.Offset.newBuilder().setOffset(ByteString.copyFrom(offset.getValue())).setPartitionId(offset.getPartitionId().intValue()).m1652build());
            }
            this.sourceStub.ackFn(SourceOuterClass.AckRequest.newBuilder().setRequest(newBuilder.m1511build()).m1464build(), new StreamObserver<SourceOuterClass.AckResponse>() { // from class: io.numaproj.numaflow.sourcer.SourcerTestKit.Client.2
                public void onNext(SourceOuterClass.AckResponse ackResponse) {
                    completableFuture.complete(ackResponse);
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }

                public void onCompleted() {
                    if (completableFuture.isDone()) {
                        return;
                    }
                    completableFuture.completeExceptionally(new RuntimeException("Server completed without a response"));
                }
            });
            completableFuture.get();
        }

        public long sendPendingRequest() throws Exception {
            final CompletableFuture completableFuture = new CompletableFuture();
            this.sourceStub.pendingFn(Empty.newBuilder().build(), new StreamObserver<SourceOuterClass.PendingResponse>() { // from class: io.numaproj.numaflow.sourcer.SourcerTestKit.Client.3
                public void onNext(SourceOuterClass.PendingResponse pendingResponse) {
                    completableFuture.complete(pendingResponse);
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }

                public void onCompleted() {
                    if (completableFuture.isDone()) {
                        return;
                    }
                    completableFuture.completeExceptionally(new RuntimeException("Server completed without a response"));
                }
            });
            return ((SourceOuterClass.PendingResponse) completableFuture.get()).getResult().getCount();
        }

        public List<Integer> sendGetPartitionsRequest() throws Exception {
            final CompletableFuture completableFuture = new CompletableFuture();
            this.sourceStub.partitionsFn(Empty.newBuilder().build(), new StreamObserver<SourceOuterClass.PartitionsResponse>() { // from class: io.numaproj.numaflow.sourcer.SourcerTestKit.Client.4
                public void onNext(SourceOuterClass.PartitionsResponse partitionsResponse) {
                    completableFuture.complete(partitionsResponse);
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }

                public void onCompleted() {
                    if (completableFuture.isDone()) {
                        return;
                    }
                    completableFuture.completeExceptionally(new RuntimeException("Server completed without a response"));
                }
            });
            return ((SourceOuterClass.PartitionsResponse) completableFuture.get()).getResult().getPartitionsList();
        }
    }

    /* loaded from: input_file:io/numaproj/numaflow/sourcer/SourcerTestKit$TestAckRequest.class */
    public static class TestAckRequest implements AckRequest {
        List<Offset> offsets;

        @Generated
        /* loaded from: input_file:io/numaproj/numaflow/sourcer/SourcerTestKit$TestAckRequest$TestAckRequestBuilder.class */
        public static class TestAckRequestBuilder {

            @Generated
            private List<Offset> offsets;

            @Generated
            TestAckRequestBuilder() {
            }

            @Generated
            public TestAckRequestBuilder offsets(List<Offset> list) {
                this.offsets = list;
                return this;
            }

            @Generated
            public TestAckRequest build() {
                return new TestAckRequest(this.offsets);
            }

            @Generated
            public String toString() {
                return "SourcerTestKit.TestAckRequest.TestAckRequestBuilder(offsets=" + this.offsets + ")";
            }
        }

        @Generated
        TestAckRequest(List<Offset> list) {
            this.offsets = list;
        }

        @Generated
        public static TestAckRequestBuilder builder() {
            return new TestAckRequestBuilder();
        }

        @Override // io.numaproj.numaflow.sourcer.AckRequest
        @Generated
        public List<Offset> getOffsets() {
            return this.offsets;
        }

        @Generated
        public void setOffsets(List<Offset> list) {
            this.offsets = list;
        }
    }

    /* loaded from: input_file:io/numaproj/numaflow/sourcer/SourcerTestKit$TestListBasedObserver.class */
    public static class TestListBasedObserver implements OutputObserver {
        private List<Message> messages = new ArrayList();

        @Override // io.numaproj.numaflow.sourcer.OutputObserver
        public void send(Message message) {
            this.messages.add(message);
        }

        @Generated
        public List<Message> getMessages() {
            return this.messages;
        }

        @Generated
        public void setMessages(List<Message> list) {
            this.messages = list;
        }
    }

    /* loaded from: input_file:io/numaproj/numaflow/sourcer/SourcerTestKit$TestReadRequest.class */
    public static class TestReadRequest implements ReadRequest {
        private long count;
        private Duration timeout;

        @Generated
        /* loaded from: input_file:io/numaproj/numaflow/sourcer/SourcerTestKit$TestReadRequest$TestReadRequestBuilder.class */
        public static class TestReadRequestBuilder {

            @Generated
            private long count;

            @Generated
            private Duration timeout;

            @Generated
            TestReadRequestBuilder() {
            }

            @Generated
            public TestReadRequestBuilder count(long j) {
                this.count = j;
                return this;
            }

            @Generated
            public TestReadRequestBuilder timeout(Duration duration) {
                this.timeout = duration;
                return this;
            }

            @Generated
            public TestReadRequest build() {
                return new TestReadRequest(this.count, this.timeout);
            }

            @Generated
            public String toString() {
                long j = this.count;
                Duration duration = this.timeout;
                return "SourcerTestKit.TestReadRequest.TestReadRequestBuilder(count=" + j + ", timeout=" + j + ")";
            }
        }

        @Generated
        TestReadRequest(long j, Duration duration) {
            this.count = j;
            this.timeout = duration;
        }

        @Generated
        public static TestReadRequestBuilder builder() {
            return new TestReadRequestBuilder();
        }

        @Override // io.numaproj.numaflow.sourcer.ReadRequest
        @Generated
        public long getCount() {
            return this.count;
        }

        @Override // io.numaproj.numaflow.sourcer.ReadRequest
        @Generated
        public Duration getTimeout() {
            return this.timeout;
        }

        @Generated
        public void setCount(long j) {
            this.count = j;
        }

        @Generated
        public void setTimeout(Duration duration) {
            this.timeout = duration;
        }
    }

    public SourcerTestKit(Sourcer sourcer) {
        this(sourcer, GRPCConfig.defaultGrpcConfig());
    }

    public SourcerTestKit(Sourcer sourcer, GRPCConfig gRPCConfig) {
        this.sourcer = sourcer;
        this.grpcConfig = gRPCConfig;
    }

    public void startServer() throws Exception {
        this.server = new Server(this.sourcer, this.grpcConfig);
        this.server.start();
    }

    public void stopServer() throws InterruptedException {
        if (this.server != null) {
            this.server.stop();
        }
    }
}
