package io.numaproj.numaflow.mapper;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/mapper/MapperTestKit.class */
public class MapperTestKit {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MapperTestKit.class);
    private final Mapper mapper;
    private final GRPCConfig grpcConfig;
    private Server server;

    /* loaded from: input_file:io/numaproj/numaflow/mapper/MapperTestKit$Client.class */
    public static class Client {
        private final ManagedChannel channel;
        private final StreamObserver<MapOuterClass.MapRequest> requestStreamObserver;
        private final ConcurrentHashMap<String, CompletableFuture<MapOuterClass.MapResponse>> responseFutures;

        /* loaded from: input_file:io/numaproj/numaflow/mapper/MapperTestKit$Client$ResponseObserver.class */
        private class ResponseObserver implements StreamObserver<MapOuterClass.MapResponse> {
            private ResponseObserver() {
            }

            public void onNext(MapOuterClass.MapResponse mapResponse) {
                CompletableFuture<MapOuterClass.MapResponse> completableFuture;
                if (mapResponse.hasHandshake() || (completableFuture = Client.this.responseFutures.get(mapResponse.getId())) == null) {
                    return;
                }
                completableFuture.complete(mapResponse);
            }

            public void onError(Throwable th) {
                Iterator<CompletableFuture<MapOuterClass.MapResponse>> it = Client.this.responseFutures.values().iterator();
                while (it.hasNext()) {
                    it.next().completeExceptionally(th);
                }
            }

            public void onCompleted() {
                Client.this.responseFutures.values().removeIf((v0) -> {
                    return v0.isDone();
                });
            }
        }

        public Client() {
            this("localhost", 50051);
        }

        public Client(String str, int i) {
            this.responseFutures = new ConcurrentHashMap<>();
            this.channel = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
            this.requestStreamObserver = MapGrpc.newStub(this.channel).mapFn(new ResponseObserver());
            this.requestStreamObserver.onNext(MapOuterClass.MapRequest.newBuilder().setHandshake(MapOuterClass.Handshake.newBuilder().setSot(true)).m102build());
        }

        public MessageList sendRequest(String[] strArr, Datum datum) {
            String uuid = UUID.randomUUID().toString();
            CompletableFuture<MapOuterClass.MapResponse> completableFuture = new CompletableFuture<>();
            this.responseFutures.put(uuid, completableFuture);
            try {
                this.requestStreamObserver.onNext(createRequest(strArr, datum, uuid));
                MessageList createResponse = createResponse(completableFuture.get());
                this.responseFutures.remove(uuid);
                return createResponse;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void close() throws InterruptedException {
            this.requestStreamObserver.onCompleted();
            this.channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
        }

        private MapOuterClass.MapRequest createRequest(String[] strArr, Datum datum, String str) {
            return MapOuterClass.MapRequest.newBuilder().setRequest(MapOuterClass.MapRequest.Request.newBuilder().addAllKeys(strArr == null ? new ArrayList<>() : List.of((Object[]) strArr)).setValue(datum.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(datum.getValue())).setEventTime(datum.getEventTime() == null ? Timestamp.newBuilder().build() : Timestamp.newBuilder().setSeconds(datum.getEventTime().getEpochSecond()).setNanos(datum.getEventTime().getNano()).build()).setWatermark(datum.getWatermark() == null ? Timestamp.newBuilder().build() : Timestamp.newBuilder().setSeconds(datum.getWatermark().getEpochSecond()).setNanos(datum.getWatermark().getNano()).build()).putAllHeaders(datum.getHeaders() == null ? new HashMap<>() : datum.getHeaders()).m150build()).setId(str).m102build();
        }

        private MessageList createResponse(MapOuterClass.MapResponse mapResponse) {
            return new MessageList((List) mapResponse.getResultsList().stream().map(result -> {
                return new Message(result.getValue().toByteArray(), (String[]) result.mo214getKeysList().toArray(new String[0]), (String[]) result.mo213getTagsList().toArray(new String[0]));
            }).collect(Collectors.toList()));
        }
    }

    /* loaded from: input_file:io/numaproj/numaflow/mapper/MapperTestKit$TestDatum.class */
    public static class TestDatum implements Datum {
        private final byte[] value;
        private final Instant eventTime;
        private final Instant watermark;
        private final Map<String, String> headers;

        @Generated
        /* loaded from: input_file:io/numaproj/numaflow/mapper/MapperTestKit$TestDatum$TestDatumBuilder.class */
        public static class TestDatumBuilder {

            @Generated
            private byte[] value;

            @Generated
            private Instant eventTime;

            @Generated
            private Instant watermark;

            @Generated
            private Map<String, String> headers;

            @Generated
            TestDatumBuilder() {
            }

            @Generated
            public TestDatumBuilder value(byte[] bArr) {
                this.value = bArr;
                return this;
            }

            @Generated
            public TestDatumBuilder eventTime(Instant instant) {
                this.eventTime = instant;
                return this;
            }

            @Generated
            public TestDatumBuilder watermark(Instant instant) {
                this.watermark = instant;
                return this;
            }

            @Generated
            public TestDatumBuilder headers(Map<String, String> map) {
                this.headers = map;
                return this;
            }

            @Generated
            public TestDatum build() {
                return new TestDatum(this.value, this.eventTime, this.watermark, this.headers);
            }

            @Generated
            public String toString() {
                return "MapperTestKit.TestDatum.TestDatumBuilder(value=" + Arrays.toString(this.value) + ", eventTime=" + this.eventTime + ", watermark=" + this.watermark + ", headers=" + this.headers + ")";
            }
        }

        @Generated
        TestDatum(byte[] bArr, Instant instant, Instant instant2, Map<String, String> map) {
            this.value = bArr;
            this.eventTime = instant;
            this.watermark = instant2;
            this.headers = map;
        }

        @Generated
        public static TestDatumBuilder builder() {
            return new TestDatumBuilder();
        }

        @Override // io.numaproj.numaflow.mapper.Datum
        @Generated
        public byte[] getValue() {
            return this.value;
        }

        @Override // io.numaproj.numaflow.mapper.Datum
        @Generated
        public Instant getEventTime() {
            return this.eventTime;
        }

        @Override // io.numaproj.numaflow.mapper.Datum
        @Generated
        public Instant getWatermark() {
            return this.watermark;
        }

        @Override // io.numaproj.numaflow.mapper.Datum
        @Generated
        public Map<String, String> getHeaders() {
            return this.headers;
        }
    }

    public MapperTestKit(Mapper mapper) {
        this(mapper, GRPCConfig.defaultGrpcConfig());
    }

    public MapperTestKit(Mapper mapper, GRPCConfig gRPCConfig) {
        this.mapper = mapper;
        this.grpcConfig = gRPCConfig;
    }

    public void startServer() throws Exception {
        this.server = new Server(this.mapper, this.grpcConfig);
        this.server.start();
    }

    public void stopServer() throws Exception {
        if (this.server != null) {
            this.server.stop();
        }
    }
}
