package org.sdn.api.event;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.sdn.api.DefaultOpenClient;
import org.sdn.api.OpenApiException;
import org.sdn.api.domain.OpenClientDO;
import org.sdn.api.request.SubscribeEventRequest;
import org.sdn.api.response.SubscribeEventResponse;
import org.sdn.api.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ReflectionUtils;

/* loaded from: input_file:org/sdn/api/event/MessageEvent.class */
public class MessageEvent implements BeanPostProcessor {
    private Object instance;
    private Method method;
    private static MessageEvent defaultExecutor;
    private static Listener listener;
    private static final Logger LOG = LoggerFactory.getLogger(MessageEvent.class);
    private static Map<String, MessageEvent> executors = new HashMap();
    private static Map<Conn, List<String>> groups = new HashMap();
    private static boolean isInit = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/sdn/api/event/MessageEvent$Conn.class */
    public static class Conn {
        String servers;
        String groupId;
        String username;
        String password;

        Conn(String str, String str2, String str3, String str4) {
            this.servers = StringUtils.parseServers(str);
            this.groupId = str2;
            this.username = str3;
            this.password = str4;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Conn conn = (Conn) obj;
            return Objects.equals(this.servers, conn.servers) && Objects.equals(this.groupId, conn.groupId) && Objects.equals(this.username, conn.username) && Objects.equals(this.password, conn.password);
        }

        public int hashCode() {
            return Objects.hash(this.servers, this.groupId, this.username, this.password);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/sdn/api/event/MessageEvent$EventConsumerRunner.class */
    public static class EventConsumerRunner implements Runnable {
        private Conn conn;
        private List<String> topics;

        EventConsumerRunner(Conn conn, List<String> list) {
            this.conn = conn;
            this.topics = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            String format = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", this.conn.username, this.conn.password);
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.conn.servers);
            properties.put("group.id", this.conn.groupId);
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.mechanism", "PLAIN");
            properties.put("sasl.jaas.config", format);
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            Throwable th = null;
            try {
                try {
                    kafkaConsumer.subscribe(new ArrayList(this.topics));
                    long j = 1;
                    while (true) {
                        ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(j));
                        j = poll.isEmpty() ? 1000L : 1L;
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            String str = consumerRecord.topic();
                            if (MessageEvent.listener != null) {
                                MessageEvent.listener.handle((String) consumerRecord.value(), str);
                            }
                            MessageEvent messageEvent = (MessageEvent) MessageEvent.executors.get(str);
                            if (messageEvent != null) {
                                invoke(messageEvent.instance, messageEvent.method, consumerRecord.value());
                            } else if (MessageEvent.defaultExecutor != null) {
                                invoke(MessageEvent.defaultExecutor.instance, MessageEvent.defaultExecutor.method, consumerRecord.value(), str);
                            } else if (MessageEvent.listener == null) {
                                MessageEvent.LOG.warn("no `{}` event handler defined", str);
                            }
                        }
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (kafkaConsumer != null) {
                    if (th != null) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                throw th2;
            }
        }

        private void invoke(Object obj, Method method, Object... objArr) {
            try {
                method.invoke(obj, objArr);
            } catch (IllegalAccessException | InvocationTargetException e) {
                MessageEvent.LOG.warn("Event process error, ", e);
            }
        }
    }

    private MessageEvent(Object obj, Method method) {
        this.instance = obj;
        this.method = method;
    }

    public MessageEvent() {
    }

    public static synchronized void start(DefaultOpenClient defaultOpenClient, String str) throws OpenApiException {
        if (isInit) {
            return;
        }
        isInit = true;
        subscribeEvent(defaultOpenClient, str);
        startEventListener();
    }

    private static void subscribeEvent(DefaultOpenClient defaultOpenClient, String str) throws OpenApiException {
        OpenClientDO openClientDO = defaultOpenClient.getOpenClientDO();
        String authEvent = openClientDO.getAuthEvent();
        if (isNullOrEmpty(authEvent) || isNullOrEmpty(str)) {
            return;
        }
        try {
            JSONObject parseObject = JSON.parseObject(authEvent);
            ArrayList arrayList = new ArrayList();
            String appSecret = openClientDO.getAppSecret();
            for (String str2 : parseObject.keySet()) {
                JSONObject jSONObject = parseObject.getJSONObject(str2);
                JSONObject jSONObject2 = jSONObject.getJSONObject("condition");
                Conn conn = new Conn(jSONObject2.getString("server"), appSecret, jSONObject2.getString("username"), jSONObject2.getString("password"));
                if (groups.containsKey(conn)) {
                    groups.get(conn).add(str2);
                } else {
                    ArrayList arrayList2 = new ArrayList();
                    arrayList2.add(str2);
                    groups.put(conn, arrayList2);
                }
                if (jSONObject.getString("event") != null) {
                    jSONObject2.getJSONArray("sub").forEach(obj -> {
                        HashMap hashMap = new HashMap();
                        try {
                            hashMap.put("servicename", str2);
                            ((JSONObject) obj).forEach((str3, obj) -> {
                                if (!(obj instanceof JSONArray)) {
                                    hashMap.put(str3, obj.toString());
                                    return;
                                }
                                ArrayList arrayList3 = new ArrayList();
                                ((JSONArray) obj).forEach(obj -> {
                                    HashMap hashMap2 = new HashMap();
                                    ((JSONObject) obj).forEach((str3, obj) -> {
                                        hashMap2.put(str3, obj instanceof JSONObject ? ((JSONObject) obj).toJSONString() : obj.toString());
                                    });
                                    arrayList3.add(hashMap2);
                                });
                                hashMap.put(str3, arrayList3);
                            });
                        } catch (Exception e) {
                            LOG.warn("parse error : {}", authEvent);
                        }
                        arrayList.add(hashMap);
                    });
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                SubscribeEventResponse subscribeEventResponse = (SubscribeEventResponse) defaultOpenClient.defaultExecute(new SubscribeEventRequest((Map) it.next()), str);
                if (subscribeEventResponse.isSuccess() && !subscribeEventResponse.isOk()) {
                    throw new OpenApiException("subscribe event error");
                }
            }
        } catch (Exception e) {
            throw new OpenApiException("subscribe event error", e);
        }
    }

    private static void startEventListener() {
        groups.forEach((conn, list) -> {
            new Thread(new EventConsumerRunner(conn, list)).start();
        });
    }

    public static void setListener(Listener listener2) {
        listener = listener2;
    }

    public Object postProcessBeforeInitialization(Object obj, String str) throws BeansException {
        return obj;
    }

    public Object postProcessAfterInitialization(Object obj, String str) throws BeansException {
        for (Method method : ReflectionUtils.getAllDeclaredMethods(obj.getClass())) {
            EventListener eventListener = (EventListener) AnnotationUtils.findAnnotation(method, EventListener.class);
            if (eventListener != null) {
                String value = eventListener.value();
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (isNullOrEmpty(value)) {
                    if (parameterTypes.length > 1 && Objects.equals(parameterTypes[0].getName(), "java.lang.String") && Objects.equals(parameterTypes[1].getName(), "java.lang.String")) {
                        defaultExecutor = new MessageEvent(obj, method);
                    }
                } else if (parameterTypes.length >= 1 && Objects.equals(parameterTypes[0].getName(), "java.lang.String") && !isNullOrEmpty(value)) {
                    executors.put(value, new MessageEvent(obj, method));
                }
            }
        }
        return obj;
    }

    private static boolean isNullOrEmpty(String str) {
        return str == null || str.trim().equals("");
    }
}
