package io.virtualan.cucumblan.core;

import io.cucumber.datatable.DataTable;
import io.cucumber.java.Before;
import io.cucumber.java.Scenario;
import io.cucumber.java.en.Given;
import io.virtualan.csvson.Csvson;
import io.virtualan.cucumblan.core.msg.kafka.KafkaConsumerClient;
import io.virtualan.cucumblan.core.msg.kafka.KafkaProducerClient;
import io.virtualan.cucumblan.core.msg.kafka.MessageContext;
import io.virtualan.cucumblan.message.exception.MessageNotDefinedException;
import io.virtualan.cucumblan.message.type.MessageType;
import io.virtualan.cucumblan.props.TopicConfiguration;
import io.virtualan.cucumblan.props.util.EventRequest;
import io.virtualan.cucumblan.props.util.MsgHelper;
import io.virtualan.cucumblan.props.util.ScenarioContext;
import io.virtualan.cucumblan.props.util.StepDefinitionHelper;
import io.virtualan.mapson.exception.BadInputDataException;
import java.util.List;
import java.util.Map;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/virtualan/cucumblan/core/MsgBaseStepDefinition.class */
public class MsgBaseStepDefinition {
    private static final Logger log = LoggerFactory.getLogger(MsgBaseStepDefinition.class);
    private Scenario scenario;

    @Before
    public void before(Scenario scenario) {
        this.scenario = scenario;
    }

    @Given("As a user perform message (.*) action$")
    public void dummyGiven(String str) throws Exception {
    }

    @Given("Send message (.*) for event (.*) in partition (.*) on (.*) with type (.*)$")
    public void produceMessageWithPartition(String str, String str2, Integer num, String str3, String str4, Object obj) throws MessageNotDefinedException {
        String obj2 = StepDefinitionHelper.getActualValue(str2).toString();
        String obj3 = StepDefinitionHelper.getActualValue(str4).toString();
        String property = TopicConfiguration.getProperty(obj2);
        MessageType messageType = MessageContext.getMessageTypes().get(obj3);
        if (property == null || messageType == null) {
            Assertions.assertTrue(false, str2 + " is not configured for any topic. or " + str4 + " is not configured");
            return;
        }
        MessageType buildProducerMessage = messageType.buildProducerMessage(obj);
        this.scenario.log(buildProducerMessage.toString());
        KafkaProducerClient.sendMessage(str3, property, buildProducerMessage.getKey(), buildProducerMessage.getMessage(), num, buildProducerMessage.getHeaders());
    }

    @Given("Pause message (.*) for process for (.*) milliseconds$")
    public void produceMessage(String str, long j) throws InterruptedException {
        Thread.sleep(j);
    }

    @Given("Clear the consumed message (.*) for the event (.*)$")
    public void clearMessage(String str, String str2) throws InterruptedException {
        MessageContext.removeEventContextMap(StepDefinitionHelper.getActualValue(str2).toString());
    }

    @Given("Send message (.*) for event (.*) on (.*) with type (.*)$")
    public void produceMessage(String str, String str2, String str3, String str4, DataTable dataTable) throws MessageNotDefinedException {
        String obj = StepDefinitionHelper.getActualValue(str2).toString();
        String obj2 = StepDefinitionHelper.getActualValue(str4).toString();
        String property = TopicConfiguration.getProperty(obj);
        MessageType messageType = MessageContext.getMessageTypes().get(obj2);
        if (property == null || messageType == null) {
            Assertions.assertTrue(false, str2 + " is not configured for any topic. or " + str4 + " is not configured");
            return;
        }
        MessageType buildProducerMessage = messageType.buildProducerMessage(dataTable);
        this.scenario.log(buildProducerMessage.toString());
        if (buildProducerMessage.getKey() != null) {
            KafkaProducerClient.sendMessage(str3, property, buildProducerMessage.getKey(), buildProducerMessage.getMessage(), null, buildProducerMessage.getHeaders());
        } else {
            KafkaProducerClient.sendMessage(str3, property, null, buildProducerMessage.getMessage(), null, buildProducerMessage.getHeaders());
        }
    }

    @Given("Send inline message (.*) for event (.*) on (.*) with type (.*)$")
    public void produceMessage(String str, String str2, String str3, String str4, List<String> list) throws MessageNotDefinedException {
        String obj = StepDefinitionHelper.getActualValue(str2).toString();
        String obj2 = StepDefinitionHelper.getActualValue(str4).toString();
        String property = TopicConfiguration.getProperty(obj);
        MessageType messageType = MessageContext.getMessageTypes().get(obj2);
        if (property == null || messageType == null) {
            Assertions.assertTrue(false, str2 + " is not configured for any topic. or " + str4 + " is not configured");
            return;
        }
        MessageType buildProducerMessage = messageType.buildProducerMessage(list);
        this.scenario.log(buildProducerMessage.toString());
        if (buildProducerMessage.getKey() != null) {
            KafkaProducerClient.sendMessage(str3, property, buildProducerMessage.getKey(), buildProducerMessage.getMessage(), null, buildProducerMessage.getHeaders());
        } else {
            KafkaProducerClient.sendMessage(str3, property, null, buildProducerMessage.getMessage(), null, buildProducerMessage.getHeaders());
        }
    }

    @Given("Send mapson message (.*) for event (.*) on (.*) with type (.*)$")
    public void produceMessageMapson(String str, String str2, String str3, String str4, Map<String, String> map) throws MessageNotDefinedException {
        String obj = StepDefinitionHelper.getActualValue(str2).toString();
        String obj2 = StepDefinitionHelper.getActualValue(str4).toString();
        String property = TopicConfiguration.getProperty(obj);
        MessageType messageType = MessageContext.getMessageTypes().get(obj2);
        if (property == null || messageType == null) {
            Assertions.assertTrue(false, str2 + " is not configured for any topic. or " + str4 + " is not configured");
            return;
        }
        MessageType buildProducerMessage = messageType.buildProducerMessage(map);
        this.scenario.log(buildProducerMessage.toString());
        if (buildProducerMessage.getKey() != null) {
            KafkaProducerClient.sendMessage(str3, property, buildProducerMessage.getKey(), buildProducerMessage.getMessage(), null, buildProducerMessage.getHeaders());
        } else {
            KafkaProducerClient.sendMessage(str3, property, null, buildProducerMessage.getMessage(), null, buildProducerMessage.getHeaders());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Given("Verify (.*) for event (.*) contains (.*) on (.*) with type (.*)$")
    public void verifyConsumedJSONObject(String str, String str2, String str3, String str4, String str5, List<String> list) throws InterruptedException, BadInputDataException, MessageNotDefinedException {
        String obj = StepDefinitionHelper.getActualValue(str2).toString();
        String obj2 = StepDefinitionHelper.getActualValue(str5).toString();
        String obj3 = StepDefinitionHelper.getActualValue(str3).toString();
        EventRequest eventRequest = new EventRequest();
        eventRequest.setRecheck(0);
        eventRequest.setEventName(obj);
        eventRequest.setType(obj2);
        eventRequest.setId(obj3);
        eventRequest.setResource(str4);
        MessageType event = KafkaConsumerClient.getEvent(eventRequest);
        if (event == 0) {
            Assertions.assertTrue(false, " Unable to read event name (" + str2 + ") with identifier : " + str3);
            return;
        }
        JSONArray buildCSVson = Csvson.buildCSVson(list, ScenarioContext.getContext(String.valueOf(Thread.currentThread().getId())));
        this.scenario.attach(buildCSVson.toString(4), "application/json", "ExpectedResponse:");
        if (event.getMessageAsJson() instanceof JSONObject) {
            this.scenario.attach(((JSONObject) event.getMessageAsJson()).toString(4), "application/json", "ActualResponse:");
            JSONAssert.assertEquals(buildCSVson.getJSONObject(0), (JSONObject) event.getMessageAsJson(), JSONCompareMode.LENIENT);
        } else {
            this.scenario.attach(((JSONArray) event.getMessageAsJson()).toString(4), "application/json", "ActualResponse:");
            JSONAssert.assertEquals(buildCSVson, (JSONArray) event, JSONCompareMode.LENIENT);
        }
    }

    @Given("Verify-by-elements (.*) for event (.*) contains (.*) on (.*) with type (.*)$")
    public void consumeMessage(String str, String str2, String str3, String str4, String str5, Map<String, String> map) throws InterruptedException, MessageNotDefinedException {
        String obj = StepDefinitionHelper.getActualValue(str2).toString();
        String obj2 = StepDefinitionHelper.getActualValue(str5).toString();
        String obj3 = StepDefinitionHelper.getActualValue(str3).toString();
        EventRequest eventRequest = new EventRequest();
        eventRequest.setRecheck(0);
        eventRequest.setEventName(obj);
        eventRequest.setType(obj2);
        eventRequest.setId(obj3);
        eventRequest.setResource(str4);
        MessageType event = KafkaConsumerClient.getEvent(eventRequest);
        if (event == null) {
            Assertions.assertTrue(false, " Unable to read event name (" + str2 + ") with identifier : " + str3);
        } else {
            this.scenario.attach(event.getMessage().toString(), "application/json", "ActualResponse");
            map.forEach((str6, str7) -> {
                Assertions.assertEquals(StepDefinitionHelper.getObjectValue(StepDefinitionHelper.getActualValue(str7)), MsgHelper.getJSON(event.getMessageAsJson().toString(), str6), str6 + " is not failed.");
            });
        }
    }
}
