package io.numaproj.numaflow.sinker;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sink.v1.SinkGrpc;
import io.numaproj.numaflow.sink.v1.SinkOuterClass;
import io.numaproj.numaflow.sinker.ResponseList;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import jdk.jfr.Experimental;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:io/numaproj/numaflow/sinker/SinkerTestKit.class */
public class SinkerTestKit {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(SinkerTestKit.class);
    private final Sinker sinker;
    private final GRPCConfig grpcConfig;
    private Server server;

    /* loaded from: input_file:io/numaproj/numaflow/sinker/SinkerTestKit$Client.class */
    public static class Client {
        private final ManagedChannel channel;
        private final SinkGrpc.SinkStub sinkStub;

        public Client() {
            this("localhost", 50051);
        }

        public Client(String str, int i) {
            this.channel = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
            this.sinkStub = SinkGrpc.newStub(this.channel);
        }

        public ResponseList sendRequest(DatumIterator datumIterator) {
            Datum next;
            final ArrayList arrayList = new ArrayList();
            final CompletableFuture completableFuture = new CompletableFuture();
            StreamObserver<SinkOuterClass.SinkRequest> sinkFn = this.sinkStub.sinkFn(new StreamObserver<SinkOuterClass.SinkResponse>() { // from class: io.numaproj.numaflow.sinker.SinkerTestKit.Client.1
                public void onNext(SinkOuterClass.SinkResponse sinkResponse) {
                    arrayList.add(sinkResponse);
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }

                public void onCompleted() {
                    completableFuture.complete(arrayList);
                }
            });
            sinkFn.onNext(SinkOuterClass.SinkRequest.newBuilder().setHandshake(SinkOuterClass.Handshake.newBuilder().setSot(true).m1210build()).m1304build());
            while (true) {
                try {
                    next = datumIterator.next();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (next == null) {
                    break;
                }
                sinkFn.onNext(SinkOuterClass.SinkRequest.newBuilder().setRequest(SinkOuterClass.SinkRequest.Request.newBuilder().addAllKeys(next.getKeys() == null ? new ArrayList<>() : List.of((Object[]) next.getKeys())).setValue(next.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(next.getValue())).setId(next.getId()).setEventTime(next.getEventTime() == null ? Timestamp.newBuilder().build() : Timestamp.newBuilder().setSeconds(next.getEventTime().getEpochSecond()).setNanos(next.getEventTime().getNano()).build()).setWatermark(next.getWatermark() == null ? Timestamp.newBuilder().build() : Timestamp.newBuilder().setSeconds(next.getWatermark().getEpochSecond()).setNanos(next.getWatermark().getNano()).build()).putAllHeaders(next.getHeaders() == null ? new HashMap<>() : next.getHeaders()).m1352build()).m1304build());
            }
            sinkFn.onNext(SinkOuterClass.SinkRequest.newBuilder().setStatus(SinkOuterClass.TransmissionStatus.newBuilder().setEot(true)).m1304build());
            sinkFn.onCompleted();
            try {
                List<SinkOuterClass.SinkResponse> list = (List) completableFuture.get();
                ResponseList.ResponseListBuilder newBuilder = ResponseList.newBuilder();
                for (SinkOuterClass.SinkResponse sinkResponse : list) {
                    if (!sinkResponse.getHandshake().getSot() && (!sinkResponse.hasStatus() || !sinkResponse.getStatus().getEot())) {
                        for (SinkOuterClass.SinkResponse.Result result : sinkResponse.getResultsList()) {
                            if (result.getStatus() == SinkOuterClass.Status.SUCCESS) {
                                newBuilder.addResponse(Response.responseOK(result.getId()));
                            } else if (result.getStatus() == SinkOuterClass.Status.FALLBACK) {
                                newBuilder.addResponse(Response.responseFallback(result.getId()));
                            } else {
                                newBuilder.addResponse(Response.responseFailure(result.getId(), result.getErrMsg()));
                            }
                        }
                    }
                }
                return newBuilder.build();
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }

        public void close() throws InterruptedException {
            this.channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
        }
    }

    /* loaded from: input_file:io/numaproj/numaflow/sinker/SinkerTestKit$TestBlockingIterator.class */
    public static class TestBlockingIterator implements DatumIterator {
        private volatile boolean closed = false;
        private final LinkedBlockingQueue<Datum> queue = new LinkedBlockingQueue<>(1);

        @Override // io.numaproj.numaflow.sinker.DatumIterator
        public Datum next() throws InterruptedException {
            if (this.closed) {
                return null;
            }
            return this.queue.take();
        }

        public void addDatum(Datum datum) throws InterruptedException {
            if (this.closed) {
                return;
            }
            this.queue.put(datum);
        }

        public void close() {
            this.closed = true;
        }
    }

    /* loaded from: input_file:io/numaproj/numaflow/sinker/SinkerTestKit$TestDatum.class */
    public static class TestDatum implements Datum {
        private final String id;
        private final byte[] value;
        private final String[] keys;
        private final Instant eventTime;
        private final Instant watermark;
        private final Map<String, String> headers;

        @Generated
        /* loaded from: input_file:io/numaproj/numaflow/sinker/SinkerTestKit$TestDatum$TestDatumBuilder.class */
        public static class TestDatumBuilder {

            @Generated
            private String id;

            @Generated
            private byte[] value;

            @Generated
            private String[] keys;

            @Generated
            private Instant eventTime;

            @Generated
            private Instant watermark;

            @Generated
            private Map<String, String> headers;

            @Generated
            TestDatumBuilder() {
            }

            @Generated
            public TestDatumBuilder id(String str) {
                this.id = str;
                return this;
            }

            @Generated
            public TestDatumBuilder value(byte[] bArr) {
                this.value = bArr;
                return this;
            }

            @Generated
            public TestDatumBuilder keys(String[] strArr) {
                this.keys = strArr;
                return this;
            }

            @Generated
            public TestDatumBuilder eventTime(Instant instant) {
                this.eventTime = instant;
                return this;
            }

            @Generated
            public TestDatumBuilder watermark(Instant instant) {
                this.watermark = instant;
                return this;
            }

            @Generated
            public TestDatumBuilder headers(Map<String, String> map) {
                this.headers = map;
                return this;
            }

            @Generated
            public TestDatum build() {
                return new TestDatum(this.id, this.value, this.keys, this.eventTime, this.watermark, this.headers);
            }

            @Generated
            public String toString() {
                return "SinkerTestKit.TestDatum.TestDatumBuilder(id=" + this.id + ", value=" + Arrays.toString(this.value) + ", keys=" + Arrays.deepToString(this.keys) + ", eventTime=" + this.eventTime + ", watermark=" + this.watermark + ", headers=" + this.headers + ")";
            }
        }

        @Generated
        TestDatum(String str, byte[] bArr, String[] strArr, Instant instant, Instant instant2, Map<String, String> map) {
            this.id = str;
            this.value = bArr;
            this.keys = strArr;
            this.eventTime = instant;
            this.watermark = instant2;
            this.headers = map;
        }

        @Generated
        public static TestDatumBuilder builder() {
            return new TestDatumBuilder();
        }

        @Override // io.numaproj.numaflow.sinker.Datum
        @Generated
        public String getId() {
            return this.id;
        }

        @Override // io.numaproj.numaflow.sinker.Datum
        @Generated
        public byte[] getValue() {
            return this.value;
        }

        @Override // io.numaproj.numaflow.sinker.Datum
        @Generated
        public String[] getKeys() {
            return this.keys;
        }

        @Override // io.numaproj.numaflow.sinker.Datum
        @Generated
        public Instant getEventTime() {
            return this.eventTime;
        }

        @Override // io.numaproj.numaflow.sinker.Datum
        @Generated
        public Instant getWatermark() {
            return this.watermark;
        }

        @Override // io.numaproj.numaflow.sinker.Datum
        @Generated
        public Map<String, String> getHeaders() {
            return this.headers;
        }
    }

    /* loaded from: input_file:io/numaproj/numaflow/sinker/SinkerTestKit$TestListIterator.class */
    public static class TestListIterator implements DatumIterator {
        private final List<Datum> data = new ArrayList();
        private int index = 0;

        @Override // io.numaproj.numaflow.sinker.DatumIterator
        public Datum next() throws InterruptedException {
            if (this.index >= this.data.size()) {
                return null;
            }
            List<Datum> list = this.data;
            int i = this.index;
            this.index = i + 1;
            return list.get(i);
        }

        public void addDatum(Datum datum) {
            this.data.add(datum);
        }

        @Generated
        public List<Datum> getData() {
            return this.data;
        }

        @Generated
        public int getIndex() {
            return this.index;
        }

        @Generated
        public void setIndex(int i) {
            this.index = i;
        }
    }

    public SinkerTestKit(Sinker sinker) {
        this(sinker, GRPCConfig.defaultGrpcConfig());
    }

    public SinkerTestKit(Sinker sinker, GRPCConfig gRPCConfig) {
        this.sinker = sinker;
        this.grpcConfig = gRPCConfig;
    }

    public void startServer() throws Exception {
        this.server = new Server(this.sinker, this.grpcConfig);
        this.server.start();
    }

    public void stopServer() throws InterruptedException {
        if (this.server != null) {
            this.server.stop();
        }
    }
}
