package io.numaproj.numaflow.reducer;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceGrpc;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducer.MessageList;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;

/* loaded from: input_file:io/numaproj/numaflow/reducer/ReducerTestKit.class */
public class ReducerTestKit {
    private final ReducerFactory<? extends Reducer> reducer;
    private final GRPCConfig grpcConfig;
    private Server server;

    /* loaded from: input_file:io/numaproj/numaflow/reducer/ReducerTestKit$Client.class */
    public static class Client {
        private final ManagedChannel channel;
        private final ReduceGrpc.ReduceStub reduceStub;

        public Client() {
            this("localhost", 50051);
        }

        public Client(String str, int i) {
            this.channel = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
            this.reduceStub = ReduceGrpc.newStub(this.channel);
        }

        public MessageList sendReduceRequest(TestReduceRequest testReduceRequest) throws Exception {
            ArrayList arrayList = new ArrayList();
            io.grpc.Metadata metadata = new io.grpc.Metadata();
            metadata.put(Metadata.Key.of(GrpcServerUtils.WIN_START_KEY, io.grpc.Metadata.ASCII_STRING_MARSHALLER), String.valueOf(testReduceRequest.getStartTime().toEpochMilli()));
            metadata.put(Metadata.Key.of(GrpcServerUtils.WIN_END_KEY, io.grpc.Metadata.ASCII_STRING_MARSHALLER), String.valueOf(testReduceRequest.getEndTime().toEpochMilli()));
            for (Datum datum : testReduceRequest.getDatumList()) {
                arrayList.add(ReduceOuterClass.ReduceRequest.Payload.newBuilder().setValue(datum.getValue() != null ? ByteString.copyFrom(datum.getValue()) : ByteString.EMPTY).setEventTime(datum.getEventTime() == null ? Timestamp.newBuilder().build() : Timestamp.newBuilder().setSeconds(datum.getEventTime().getEpochSecond()).setNanos(datum.getEventTime().getNano()).build()).setWatermark(datum.getWatermark() == null ? Timestamp.newBuilder().build() : Timestamp.newBuilder().setSeconds(datum.getWatermark().getEpochSecond()).setNanos(datum.getWatermark().getNano()).build()).addAllKeys(Arrays.asList(testReduceRequest.getKeys())).putAllHeaders(datum.getHeaders() == null ? new HashMap<>() : datum.getHeaders()).m547build());
            }
            final ArrayList<ReduceOuterClass.ReduceResponse> arrayList2 = new ArrayList();
            final CompletableFuture completableFuture = new CompletableFuture();
            StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn = this.reduceStub.withInterceptors(new ClientInterceptor[]{MetadataUtils.newAttachHeadersInterceptor(metadata)}).reduceFn(new StreamObserver<ReduceOuterClass.ReduceResponse>() { // from class: io.numaproj.numaflow.reducer.ReducerTestKit.Client.1
                public void onNext(ReduceOuterClass.ReduceResponse reduceResponse) {
                    arrayList2.add(reduceResponse);
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }

                public void onCompleted() {
                    completableFuture.complete(true);
                }
            });
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                reduceFn.onNext(ReduceOuterClass.ReduceRequest.newBuilder().setPayload((ReduceOuterClass.ReduceRequest.Payload) it.next()).setOperation(ReduceOuterClass.ReduceRequest.WindowOperation.newBuilder().setEvent(ReduceOuterClass.ReduceRequest.WindowOperation.Event.APPEND).addWindows(ReduceOuterClass.Window.newBuilder().setStart(Timestamp.newBuilder().setSeconds(testReduceRequest.getStartTime().getEpochSecond()).setNanos(testReduceRequest.getStartTime().getNano()).build()).setEnd(Timestamp.newBuilder().setSeconds(testReduceRequest.getEndTime().getEpochSecond()).setNanos(testReduceRequest.getEndTime().getNano()).build()).m740build()).m595build()).m499build());
            }
            reduceFn.onCompleted();
            completableFuture.get();
            MessageList.MessageListBuilder newBuilder = MessageList.newBuilder();
            for (ReduceOuterClass.ReduceResponse reduceResponse : arrayList2) {
                if (reduceResponse.getEOF()) {
                    break;
                }
                newBuilder.addMessage(new Message(reduceResponse.getResult().getValue().toByteArray(), (String[]) reduceResponse.getResult().mo660getKeysList().toArray(new String[0]), (String[]) reduceResponse.getResult().mo659getTagsList().toArray(new String[0])));
            }
            return newBuilder.build();
        }

        public void close() {
            this.channel.shutdown();
        }
    }

    /* loaded from: input_file:io/numaproj/numaflow/reducer/ReducerTestKit$TestDatum.class */
    public static class TestDatum implements Datum {
        private byte[] value;
        private Instant eventTime;
        private Instant watermark;
        private Map<String, String> headers;

        @Generated
        /* loaded from: input_file:io/numaproj/numaflow/reducer/ReducerTestKit$TestDatum$TestDatumBuilder.class */
        public static class TestDatumBuilder {

            @Generated
            private byte[] value;

            @Generated
            private Instant eventTime;

            @Generated
            private Instant watermark;

            @Generated
            private Map<String, String> headers;

            @Generated
            TestDatumBuilder() {
            }

            @Generated
            public TestDatumBuilder value(byte[] bArr) {
                this.value = bArr;
                return this;
            }

            @Generated
            public TestDatumBuilder eventTime(Instant instant) {
                this.eventTime = instant;
                return this;
            }

            @Generated
            public TestDatumBuilder watermark(Instant instant) {
                this.watermark = instant;
                return this;
            }

            @Generated
            public TestDatumBuilder headers(Map<String, String> map) {
                this.headers = map;
                return this;
            }

            @Generated
            public TestDatum build() {
                return new TestDatum(this.value, this.eventTime, this.watermark, this.headers);
            }

            @Generated
            public String toString() {
                return "ReducerTestKit.TestDatum.TestDatumBuilder(value=" + Arrays.toString(this.value) + ", eventTime=" + this.eventTime + ", watermark=" + this.watermark + ", headers=" + this.headers + ")";
            }
        }

        @Generated
        TestDatum(byte[] bArr, Instant instant, Instant instant2, Map<String, String> map) {
            this.value = bArr;
            this.eventTime = instant;
            this.watermark = instant2;
            this.headers = map;
        }

        @Generated
        public static TestDatumBuilder builder() {
            return new TestDatumBuilder();
        }

        @Override // io.numaproj.numaflow.reducer.Datum
        @Generated
        public byte[] getValue() {
            return this.value;
        }

        @Override // io.numaproj.numaflow.reducer.Datum
        @Generated
        public Instant getEventTime() {
            return this.eventTime;
        }

        @Override // io.numaproj.numaflow.reducer.Datum
        @Generated
        public Instant getWatermark() {
            return this.watermark;
        }

        @Override // io.numaproj.numaflow.reducer.Datum
        @Generated
        public Map<String, String> getHeaders() {
            return this.headers;
        }
    }

    /* loaded from: input_file:io/numaproj/numaflow/reducer/ReducerTestKit$TestReduceRequest.class */
    public static class TestReduceRequest {
        private List<Datum> datumList;
        private String[] keys;
        private Instant startTime;
        private Instant endTime;

        @Generated
        /* loaded from: input_file:io/numaproj/numaflow/reducer/ReducerTestKit$TestReduceRequest$TestReduceRequestBuilder.class */
        public static class TestReduceRequestBuilder {

            @Generated
            private List<Datum> datumList;

            @Generated
            private String[] keys;

            @Generated
            private Instant startTime;

            @Generated
            private Instant endTime;

            @Generated
            TestReduceRequestBuilder() {
            }

            @Generated
            public TestReduceRequestBuilder datumList(List<Datum> list) {
                this.datumList = list;
                return this;
            }

            @Generated
            public TestReduceRequestBuilder keys(String[] strArr) {
                this.keys = strArr;
                return this;
            }

            @Generated
            public TestReduceRequestBuilder startTime(Instant instant) {
                this.startTime = instant;
                return this;
            }

            @Generated
            public TestReduceRequestBuilder endTime(Instant instant) {
                this.endTime = instant;
                return this;
            }

            @Generated
            public TestReduceRequest build() {
                return new TestReduceRequest(this.datumList, this.keys, this.startTime, this.endTime);
            }

            @Generated
            public String toString() {
                return "ReducerTestKit.TestReduceRequest.TestReduceRequestBuilder(datumList=" + this.datumList + ", keys=" + Arrays.deepToString(this.keys) + ", startTime=" + this.startTime + ", endTime=" + this.endTime + ")";
            }
        }

        @Generated
        TestReduceRequest(List<Datum> list, String[] strArr, Instant instant, Instant instant2) {
            this.datumList = list;
            this.keys = strArr;
            this.startTime = instant;
            this.endTime = instant2;
        }

        @Generated
        public static TestReduceRequestBuilder builder() {
            return new TestReduceRequestBuilder();
        }

        @Generated
        public List<Datum> getDatumList() {
            return this.datumList;
        }

        @Generated
        public String[] getKeys() {
            return this.keys;
        }

        @Generated
        public Instant getStartTime() {
            return this.startTime;
        }

        @Generated
        public Instant getEndTime() {
            return this.endTime;
        }

        @Generated
        public void setDatumList(List<Datum> list) {
            this.datumList = list;
        }

        @Generated
        public void setKeys(String[] strArr) {
            this.keys = strArr;
        }

        @Generated
        public void setStartTime(Instant instant) {
            this.startTime = instant;
        }

        @Generated
        public void setEndTime(Instant instant) {
            this.endTime = instant;
        }
    }

    public ReducerTestKit(ReducerFactory<? extends Reducer> reducerFactory) {
        this(reducerFactory, GRPCConfig.defaultGrpcConfig());
    }

    public ReducerTestKit(ReducerFactory<? extends Reducer> reducerFactory, GRPCConfig gRPCConfig) {
        this.reducer = reducerFactory;
        this.grpcConfig = gRPCConfig;
    }

    public void startServer() throws Exception {
        this.server = new Server(this.reducer, this.grpcConfig);
        this.server.start();
    }

    public void stopServer() throws InterruptedException {
        if (this.server != null) {
            this.server.stop();
        }
    }
}
