package io.virtualan.message.core;

import io.virtualan.core.model.VirtualServiceRequest;
import io.virtualan.core.util.ReturnMockResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnResource;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.transformer.GenericTransformer;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@EnableKafka
@ConditionalOnClass({IntegrationFlows.class})
@EnableIntegration
@ConditionalOnResource(resources = {"classpath:conf/kafka.json"})
/* loaded from: input_file:io/virtualan/message/core/MessagingApplication.class */
public class MessagingApplication {
    private static final Logger log = LoggerFactory.getLogger(MessagingApplication.class);
    private static List<NewTopic> topicList = new ArrayList();
    private String bootstrapServers;
    private List<String> topics = new ArrayList();
    private Map<String, Object> producerConfig = new HashMap();
    private Map<String, Object> consumerConfigs = new HashMap();

    @Autowired
    private MessageUtil messageUtil;

    /* loaded from: input_file:io/virtualan/message/core/MessagingApplication$ResponseMessage.class */
    interface ResponseMessage {
        MessageObject readResponseMessage(MessageObject messageObject);
    }

    /* loaded from: input_file:io/virtualan/message/core/MessagingApplication$SendMessage.class */
    interface SendMessage {
        String send(MessageObject messageObject);
    }

    @PostConstruct
    public void init() {
        JSONObject optJSONObject;
        try {
            JSONArray jsonObject = getJsonObject();
            if (jsonObject != null && (optJSONObject = jsonObject.optJSONObject(0)) != null) {
                this.bootstrapServers = optJSONObject.getString("broker");
                JSONArray jSONArray = optJSONObject.getJSONArray("topics");
                for (int i = 0; i < jSONArray.length(); i++) {
                    this.topics.add(jSONArray.get(i).toString());
                }
                addTopics();
                this.bootstrapServers = optJSONObject.getString("broker");
                getConfigMap(optJSONObject, "consumer", this.consumerConfigs);
                getConfigMap(optJSONObject, "producer", this.producerConfig);
                if (topicList != null) {
                    addTopic(topicList);
                }
            }
        } catch (Exception e) {
            log.error("Unable to load the kafka configuration");
        }
    }

    public Map getConfigMap(JSONObject jSONObject, String str, Map<String, Object> map) {
        Map<String, String> loadProperties = loadProperties(jSONObject.optString(str));
        if (loadProperties != null) {
            map.putAll(loadProperties);
        }
        return loadProperties;
    }

    public void addTopics() throws ExecutionException, InterruptedException {
        Set<String> topics = getTopics();
        for (String str : this.topics) {
            if (!topics.contains(str)) {
                topicList.add(addNewTopic(str));
            }
        }
    }

    private Map<String, String> loadProperties(String str) {
        InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(str);
        HashMap hashMap = new HashMap();
        Properties properties = new Properties();
        if (resourceAsStream == null) {
            log.warn("property file '{}' not found in the classpath.. loading default setting", str);
            return null;
        }
        try {
            properties.load(resourceAsStream);
            properties.forEach((obj, obj2) -> {
                hashMap.put(obj.toString(), obj2.toString());
            });
            return hashMap;
        } catch (IOException e) {
            log.warn("property file '{}' not found in the classpath.. loading default setting {}", str, e.getMessage());
            return null;
        }
    }

    private static JSONArray getJsonObject() throws IOException {
        InputStream resourceAsStream = MessagingApplication.class.getClassLoader().getResourceAsStream("conf/kafka.json");
        if (resourceAsStream != null) {
            return new JSONObject(readString(resourceAsStream)).optJSONArray("Kafka");
        }
        return null;
    }

    public static String readString(InputStream inputStream) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        try {
            String str = (String) bufferedReader.lines().collect(Collectors.joining(System.lineSeparator()));
            bufferedReader.close();
            return str;
        } catch (Throwable th) {
            try {
                bufferedReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        return AdminClient.create(properties);
    }

    public void addTopic(List<NewTopic> list) {
        getAdminClient().createTopics(list);
    }

    public boolean isTopicExists(String str) throws ExecutionException, InterruptedException {
        return getTopics().contains(str);
    }

    private static NewTopic addNewTopic(String str) {
        Short sh = 1;
        return new NewTopic(str, 5, sh.shortValue()).configs(new HashMap());
    }

    private Set<String> getTopics() throws ExecutionException, InterruptedException {
        return (Set) getAdminClient().listTopics().names().get();
    }

    @Bean
    private ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }

    @Bean
    private Map<String, Object> producerConfigs() {
        this.producerConfig.put("bootstrap.servers", this.bootstrapServers);
        if (this.producerConfig.size() == 1) {
            this.producerConfig.put("key.serializer", StringSerializer.class);
            this.producerConfig.put("value.serializer", StringSerializer.class);
            this.producerConfig.put("linger.ms", 1);
            this.producerConfig.put("acks", "all");
            this.producerConfig.put("retries", 0);
        }
        return this.producerConfig;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        this.consumerConfigs.put("bootstrap.servers", this.bootstrapServers);
        if (this.consumerConfigs.size() == 1) {
            this.consumerConfigs.put("key.deserializer", StringDeserializer.class);
            this.consumerConfigs.put("value.deserializer", StringDeserializer.class);
            this.consumerConfigs.put("group.id", "virtualan-consumer-1");
            this.consumerConfigs.put("auto.offset.reset", "earliest");
            this.consumerConfigs.put("max.poll.records", 1);
            this.consumerConfigs.put("max.poll.interval.ms", 1000);
            this.consumerConfigs.put("enable.auto.commit", true);
            this.consumerConfigs.put("auto.commit.interval.ms", 1000);
        }
        return this.consumerConfigs;
    }

    @Bean
    private ConsumerFactory<?, ?> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }

    @Bean
    private DirectChannel sentToTransformer() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel listeningFromTransformer() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow listenerFromKafkaFlow() {
        return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), KafkaMessageDrivenChannelAdapter.ListenerMode.record, (String[]) this.topics.toArray(new String[this.topics.size()])).configureListenerContainer(kafkaMessageListenerContainerSpec -> {
            kafkaMessageListenerContainerSpec.ackMode(ContainerProperties.AckMode.RECORD).ackOnError(true).idleEventInterval(100L).id("messageListenerContainer");
        })).channel(sentToTransformer()).transform(transformer()).channel(listeningFromTransformer()).get();
    }

    @Bean
    private GenericTransformer<Message, MessageObject> transformer() {
        return this::parse;
    }

    @Transformer
    public MessageObject parse(Message<?> message) {
        MessageObject messageObject = new MessageObject();
        try {
            messageObject.setJsonObject((JSONObject) new JSONTokener(message.getPayload().toString()).nextValue());
            messageObject.setInboundTopic(message.getHeaders().get("kafka_receivedTopic").toString());
            return messageObject;
        } catch (JSONException e) {
            log.warn("parse {}", e.getCause());
            return messageObject;
        }
    }

    @Bean
    private KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public IntegrationFlow outboundGateFlow() {
        return IntegrationFlows.from(listeningFromTransformer()).handle(getResponseMessage()).handle(postMessage()).get();
    }

    @Bean
    private ResponseMessage getResponseMessage() {
        return messageObject -> {
            VirtualServiceRequest virtualServiceRequest = new VirtualServiceRequest();
            virtualServiceRequest.setInput(messageObject.getJsonObject().toString());
            virtualServiceRequest.setOperationId(messageObject.getInboundTopic());
            virtualServiceRequest.setResource(messageObject.getInboundTopic());
            ReturnMockResponse matchingRecord = this.messageUtil.getMatchingRecord(virtualServiceRequest);
            if (matchingRecord == null || matchingRecord.getMockResponse() == null) {
                log.info("No response configured for the given input");
                return null;
            }
            messageObject.setOutputMessage(matchingRecord.getMockResponse().getOutput());
            messageObject.setOutboundTopic(matchingRecord.getMockRequest().getMethod());
            if (messageObject.getOutputMessage() == null || messageObject.getOutboundTopic() == null) {
                log.info("No outputMessage response configured..");
                return null;
            }
            log.info("Response configured.. with ({}) : {}", messageObject.getOutboundTopic(), messageObject.getOutputMessage());
            return messageObject;
        };
    }

    @Bean
    private SendMessage postMessage() {
        return messageObject -> {
            if (messageObject.getOutboundTopic() == null) {
                return null;
            }
            kafkaTemplate().send(MessageBuilder.withPayload(messageObject.getJsonObject().toString()).setHeader("kafka_topic", messageObject.getOutboundTopic()).setHeader("kafka_messageKey", messageObject.getMessageKey()).setHeader("kafka_partitionId", 0).setHeader("X-Virtualan-Header", "Mock-Service-Response").build());
            return null;
        };
    }
}
