package org.kie.kogito.index.service.messaging;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.index.model.ProcessInstanceState;
import org.kie.kogito.index.service.AbstractIndexingServiceIT;
import org.kie.kogito.index.service.GraphQLUtils;
import org.kie.kogito.index.test.TestUtils;
import org.kie.kogito.persistence.protobuf.ProtobufService;
import org.kie.kogito.test.quarkus.kafka.KafkaTestClient;

/* loaded from: input_file:org/kie/kogito/index/service/messaging/AbstractMessagingLoadKafkaIT.class */
public abstract class AbstractMessagingLoadKafkaIT {

    @ConfigProperty(name = "kafka.bootstrap.servers")
    String kafkaBootstrapServers;

    @ConfigProperty(name = "kogito.data-index.domain-indexing", defaultValue = "true")
    Boolean indexDomain;
    List<KafkaTestClient> kafkaClients;
    Duration timeout = Duration.ofMinutes(1);
    Integer producers = 1000;

    @Inject
    ObjectMapper mapper;

    @Inject
    ProtobufService protobufService;

    @BeforeEach
    void setup() throws Exception {
        if (this.indexDomain.booleanValue()) {
            this.protobufService.registerProtoBufferType(getTestProtobufFileContent());
        }
        this.kafkaClients = (List) ((Stream) Stream.generate(() -> {
            return new KafkaTestClient(this.kafkaBootstrapServers);
        }).parallel()).limit(this.producers.intValue()).collect(Collectors.toList());
    }

    @AfterEach
    void close() {
        if (this.kafkaClients != null) {
            this.kafkaClients.parallelStream().forEach(kafkaTestClient -> {
                kafkaTestClient.shutdown();
            });
            this.kafkaClients.clear();
        }
    }

    @Test
    void testMessagingEvents() {
        if (this.indexDomain.booleanValue()) {
            RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ Travels { id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.Travels", CoreMatchers.isA(Collection.class), new Object[0]);
        }
        this.kafkaClients.parallelStream().map(kafkaTestClient -> {
            try {
                String uuid = UUID.randomUUID().toString();
                String uuid2 = UUID.randomUUID().toString();
                sendProcessInstanceEvent(kafkaTestClient, TestUtils.getProcessCloudEvent("travels", uuid, ProcessInstanceState.ACTIVE, (String) null, (String) null, (String) null, AbstractIndexingServiceIT.CURRENT_USER));
                sendUserTaskEvent(kafkaTestClient, TestUtils.getUserTaskCloudEvent(uuid2, "travels", uuid, (String) null, (String) null, "InProgress"));
                sendUserTaskEvent(kafkaTestClient, TestUtils.getUserTaskCloudEvent(uuid2, "travels", uuid, (String) null, (String) null, "Completed"));
                sendProcessInstanceEvent(kafkaTestClient, TestUtils.getProcessCloudEvent("travels", uuid, ProcessInstanceState.COMPLETED, (String) null, (String) null, (String) null, AbstractIndexingServiceIT.CURRENT_USER));
                return uuid;
            } catch (Exception e) {
                Assertions.fail(e.getMessage(), e);
                throw new RuntimeException(e);
            }
        }).forEach(str -> {
            validateUserTaskInstance(str, "Completed");
            validateProcessInstance(str, ProcessInstanceState.COMPLETED);
            validateProcessDefinition("travels", "1.0");
        });
    }

    private void validateProcessDefinition(String str, String str2) {
        Awaitility.await().atMost(this.timeout).untilAsserted(() -> {
            RestAssured.given().contentType(ContentType.JSON).body(GraphQLUtils.getProcessDefinitionByIdAndVersion(str, str2)).when().post("/graphql", new Object[0]).then().statusCode(200).body("data.ProcessDefinitions.size()", CoreMatchers.is(1), new Object[0]).body("data.ProcessDefinitions[0].id", CoreMatchers.is(str), new Object[0]).body("data.ProcessDefinitions[0].version", CoreMatchers.is(str2), new Object[0]);
        });
    }

    private void validateProcessInstance(String str, ProcessInstanceState processInstanceState) {
        Awaitility.await().atMost(this.timeout).untilAsserted(() -> {
            RestAssured.given().contentType(ContentType.JSON).body(GraphQLUtils.getProcessInstanceById(str)).when().post("/graphql", new Object[0]).then().statusCode(200).body("data.ProcessInstances.size()", CoreMatchers.is(1), new Object[0]).body("data.ProcessInstances[0].id", CoreMatchers.is(str), new Object[0]).body("data.ProcessInstances[0].state", CoreMatchers.is(processInstanceState.toString()), new Object[0]);
        });
        if (this.indexDomain.booleanValue()) {
            Awaitility.await().atMost(this.timeout).untilAsserted(() -> {
                RestAssured.given().contentType(ContentType.JSON).body(GraphQLUtils.getTravelsByProcessInstanceId(str)).when().post("/graphql", new Object[0]).then().statusCode(200).body("data.Travels.size()", CoreMatchers.is(1), new Object[0]).body("data.Travels[0].id", CoreMatchers.is(str), new Object[0]).body("data.Travels[0].metadata.processInstances.size()", CoreMatchers.is(1), new Object[0]).body("data.Travels[0].metadata.processInstances[0].id", CoreMatchers.is(str), new Object[0]).body("data.Travels[0].metadata.processInstances[0].state", CoreMatchers.is(processInstanceState.toString()), new Object[0]);
            });
        }
    }

    private void validateUserTaskInstance(String str, String str2) {
        Awaitility.await().atMost(this.timeout).untilAsserted(() -> {
            String str3 = (String) RestAssured.given().contentType(ContentType.JSON).body(GraphQLUtils.getUserTaskInstanceByProcessInstanceId(str)).when().post("/graphql", new Object[0]).then().statusCode(200).body("data.UserTaskInstances.size()", CoreMatchers.is(1), new Object[0]).body("data.UserTaskInstances[0].processInstanceId", CoreMatchers.is(str), new Object[0]).body("data.UserTaskInstances[0].state", CoreMatchers.is(str2), new Object[0]).extract().body().path("data.UserTaskInstances[0].id", new String[0]);
            if (this.indexDomain.booleanValue()) {
                RestAssured.given().contentType(ContentType.JSON).body(GraphQLUtils.getTravelsByUserTaskId(str3)).when().post("/graphql", new Object[0]).then().statusCode(200).body("data.Travels.size()", CoreMatchers.is(1), new Object[0]).body("data.Travels[0].id", CoreMatchers.is(str), new Object[0]).body("data.Travels[0].metadata.userTasks.size()", CoreMatchers.is(1), new Object[0]).body("data.Travels[0].metadata.userTasks[0].id", CoreMatchers.is(str3), new Object[0]).body("data.Travels[0].metadata.userTasks[0].state", CoreMatchers.is(str2), new Object[0]);
            }
        });
    }

    private void sendProcessInstanceEvent(KafkaTestClient kafkaTestClient, ProcessInstanceDataEvent processInstanceDataEvent) throws Exception {
        kafkaTestClient.produce(this.mapper.writeValueAsString(processInstanceDataEvent), "kogito-processinstances-events");
    }

    private void sendUserTaskEvent(KafkaTestClient kafkaTestClient, UserTaskInstanceDataEvent userTaskInstanceDataEvent) throws Exception {
        kafkaTestClient.produce(this.mapper.writeValueAsString(userTaskInstanceDataEvent), "kogito-usertaskinstances-events");
    }

    protected String getTestProtobufFileContent() throws Exception {
        return null;
    }
}
