package io.kgraph.rest.server.graph;

import io.kgraph.GraphAlgorithmState;
import io.kgraph.library.GraphAlgorithmType;
import io.kgraph.library.cf.CfLongId;
import io.kgraph.library.cf.EdgeCfLongIdFloatValueParser;
import io.kgraph.rest.server.KafkaGraphsApplication;
import io.kgraph.rest.server.utils.EdgeLongIdLongValueParser;
import io.kgraph.rest.server.utils.VertexLongIdLongValueParser;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.MediaType;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.util.MultiValueMap;

@AutoConfigureWebTestClient(timeout = "36000")
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {KafkaGraphsApplication.class})
/* loaded from: input_file:io/kgraph/rest/server/graph/GraphIntegrationTest.class */
public class GraphIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1) { // from class: io.kgraph.rest.server.graph.GraphIntegrationTest.1
        public void start() throws IOException {
            super.start();
            System.setProperty("spring.embedded.kafka.brokers", bootstrapServers());
            System.setProperty("spring.embedded.zookeeper.connect", zKConnectString());
        }
    };

    @Autowired
    private WebTestClient webTestClient;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Test
    public void testConnectedComponents() {
        this.webTestClient.post().uri("/import", new Object[0]).syncBody(generateCCBody()).exchange().expectStatus().isOk().expectBody(Void.class);
        GroupEdgesBySourceRequest groupEdgesBySourceRequest = new GroupEdgesBySourceRequest();
        groupEdgesBySourceRequest.setAlgorithm(GraphAlgorithmType.wcc);
        groupEdgesBySourceRequest.setInitialVerticesTopic("initial-cc-vertices");
        groupEdgesBySourceRequest.setInitialEdgesTopic("initial-cc-edges");
        groupEdgesBySourceRequest.setVerticesTopic("new-cc-vertices");
        groupEdgesBySourceRequest.setEdgesGroupedBySourceTopic("new-cc-edges");
        groupEdgesBySourceRequest.setAsync(false);
        this.webTestClient.post().uri("/prepare", new Object[0]).contentType(MediaType.APPLICATION_JSON).syncBody(groupEdgesBySourceRequest).exchange().expectStatus().isOk().expectBody(Void.class);
        GraphAlgorithmCreateRequest graphAlgorithmCreateRequest = new GraphAlgorithmCreateRequest();
        graphAlgorithmCreateRequest.setAlgorithm(GraphAlgorithmType.wcc);
        graphAlgorithmCreateRequest.setVerticesTopic("new-cc-vertices");
        graphAlgorithmCreateRequest.setEdgesGroupedBySourceTopic("new-cc-edges");
        String id = ((GraphAlgorithmId) this.webTestClient.post().uri("/pregel", new Object[0]).contentType(MediaType.APPLICATION_JSON).syncBody(graphAlgorithmCreateRequest).exchange().expectStatus().isOk().expectBody(GraphAlgorithmId.class).returnResult().getResponseBody()).getId();
        this.webTestClient.post().uri("/pregel/{id}", new Object[]{id}).contentType(MediaType.APPLICATION_JSON).syncBody(new GraphAlgorithmRunRequest()).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult();
        GraphAlgorithmState.State state = GraphAlgorithmState.State.RUNNING;
        while (state == GraphAlgorithmState.State.RUNNING) {
            state = ((GraphAlgorithmStatus) this.webTestClient.get().uri("/pregel/{id}", new Object[]{id}).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult().getResponseBody()).getState();
        }
        Map map = (Map) this.webTestClient.get().uri("/pregel/{id}/result", new Object[]{id}).accept(new MediaType[]{MediaType.TEXT_EVENT_STREAM}).exchange().expectStatus().isOk().returnResult(KeyValue.class).getResponseBody().collectMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }).block();
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals("0", map.get(String.valueOf(i)));
        }
        for (int i2 = 10; i2 < 21; i2++) {
            Assert.assertEquals("10", map.get(String.valueOf(i2)));
        }
    }

    private MultiValueMap<String, HttpEntity<?>> generateCCBody() {
        MultipartBodyBuilder multipartBodyBuilder = new MultipartBodyBuilder();
        multipartBodyBuilder.part("verticesTopic", "initial-cc-vertices");
        multipartBodyBuilder.part("edgesTopic", "initial-cc-edges");
        multipartBodyBuilder.part("vertexFile", new ClassPathResource("vertices_simple.txt"));
        multipartBodyBuilder.part("edgeFile", new ClassPathResource("edges_simple.txt"));
        multipartBodyBuilder.part("vertexParser", VertexLongIdLongValueParser.class.getName());
        multipartBodyBuilder.part("edgeParser", EdgeLongIdLongValueParser.class.getName());
        multipartBodyBuilder.part("keySerializer", LongSerializer.class.getName());
        multipartBodyBuilder.part("vertexValueSerializer", LongSerializer.class.getName());
        multipartBodyBuilder.part("edgeValueSerializer", LongSerializer.class.getName());
        multipartBodyBuilder.part("numPartitions", "50");
        multipartBodyBuilder.part("replicationFactor", "1");
        return multipartBodyBuilder.build();
    }

    @Test
    public void testSvdpp() {
        this.webTestClient.post().uri("/import", new Object[0]).syncBody(generateSvdppBody()).exchange().expectStatus().isOk().expectBody(Void.class);
        GroupEdgesBySourceRequest groupEdgesBySourceRequest = new GroupEdgesBySourceRequest();
        groupEdgesBySourceRequest.setAlgorithm(GraphAlgorithmType.svdpp);
        groupEdgesBySourceRequest.setInitialEdgesTopic("initial-svdpp-edges");
        groupEdgesBySourceRequest.setVerticesTopic("new-svdpp-vertices");
        groupEdgesBySourceRequest.setEdgesGroupedBySourceTopic("new-svdpp-edges");
        groupEdgesBySourceRequest.setAsync(false);
        this.webTestClient.post().uri("/prepare", new Object[0]).contentType(MediaType.APPLICATION_JSON).syncBody(groupEdgesBySourceRequest).exchange().expectStatus().isOk().expectBody(Void.class);
        HashMap hashMap = new HashMap();
        hashMap.put("random.seed", "0");
        hashMap.put("iterations", "3");
        GraphAlgorithmCreateRequest graphAlgorithmCreateRequest = new GraphAlgorithmCreateRequest();
        graphAlgorithmCreateRequest.setConfigs(hashMap);
        graphAlgorithmCreateRequest.setAlgorithm(GraphAlgorithmType.svdpp);
        graphAlgorithmCreateRequest.setVerticesTopic("new-svdpp-vertices");
        graphAlgorithmCreateRequest.setEdgesGroupedBySourceTopic("new-svdpp-edges");
        String id = ((GraphAlgorithmId) this.webTestClient.post().uri("/pregel", new Object[0]).contentType(MediaType.APPLICATION_JSON).syncBody(graphAlgorithmCreateRequest).exchange().expectStatus().isOk().expectBody(GraphAlgorithmId.class).returnResult().getResponseBody()).getId();
        this.webTestClient.post().uri("/pregel/{id}", new Object[]{id}).contentType(MediaType.APPLICATION_JSON).syncBody(new GraphAlgorithmRunRequest()).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult();
        GraphAlgorithmState.State state = GraphAlgorithmState.State.RUNNING;
        while (state == GraphAlgorithmState.State.RUNNING) {
            state = ((GraphAlgorithmStatus) this.webTestClient.get().uri("/pregel/{id}", new Object[]{id}).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult().getResponseBody()).getState();
        }
        NavigableMap navigableMap = (NavigableMap) this.webTestClient.get().uri("/pregel/{id}/result", new Object[]{id}).accept(new MediaType[]{MediaType.TEXT_EVENT_STREAM}).exchange().expectStatus().isOk().returnResult(KeyValue.class).getResponseBody().collectMap(keyValue -> {
            return new CfLongId(keyValue.getKey());
        }, (v0) -> {
            return v0.getValue();
        }, TreeMap::new).block();
        Assert.assertEquals("(1, 0)=(0.11611404, [0.006397, 0.008010])", navigableMap.firstEntry().toString());
        Assert.assertEquals("(20, 1)=(0.6374174, [0.007310, 0.002405])", navigableMap.lastEntry().toString());
    }

    private MultiValueMap<String, HttpEntity<?>> generateSvdppBody() {
        MultipartBodyBuilder multipartBodyBuilder = new MultipartBodyBuilder();
        multipartBodyBuilder.part("edgesTopic", "initial-svdpp-edges");
        multipartBodyBuilder.part("edgeFile", new ClassPathResource("ratings_simple.txt"));
        multipartBodyBuilder.part("edgeParser", EdgeCfLongIdFloatValueParser.class.getName());
        multipartBodyBuilder.part("edgeValueSerializer", FloatSerializer.class.getName());
        multipartBodyBuilder.part("numPartitions", "50");
        multipartBodyBuilder.part("replicationFactor", "1");
        return multipartBodyBuilder.build();
    }
}
