package info.xiancloud.plugin.mqtt;

import com.alibaba.fastjson.JSONObject;
import info.xiancloud.plugin.distribution.LocalNodeManager;
import info.xiancloud.plugin.distribution.MessageType;
import info.xiancloud.plugin.message.IdManager;
import info.xiancloud.plugin.message.UnitRequest;
import info.xiancloud.plugin.message.UnitResponse;
import info.xiancloud.plugin.message.sender.local.DefaultLocalAsyncSender;
import info.xiancloud.plugin.support.mq.mqtt.handle.NotifyHandler;
import info.xiancloud.plugin.support.mq.mqtt.mqtt_callback.sequencer.ISequencer;
import info.xiancloud.plugin.thread_pool.ThreadPoolManager;
import info.xiancloud.plugin.util.LOG;
import info.xiancloud.plugin.util.Reflection;
import info.xiancloud.plugin.util.thread.MsgIdHolder;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/* loaded from: input_file:info/xiancloud/plugin/mqtt/NonblockingMqttCallBack.class */
public class NonblockingMqttCallBack extends MqttCallbackAdaptor {

    /* renamed from: info.xiancloud.plugin.mqtt.NonblockingMqttCallBack$5, reason: invalid class name */
    /* loaded from: input_file:info/xiancloud/plugin/mqtt/NonblockingMqttCallBack$5.class */
    static /* synthetic */ class AnonymousClass5 {
        static final /* synthetic */ int[] $SwitchMap$info$xiancloud$plugin$distribution$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$info$xiancloud$plugin$distribution$MessageType[MessageType.offline.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$info$xiancloud$plugin$distribution$MessageType[MessageType.request.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$info$xiancloud$plugin$distribution$MessageType[MessageType.response.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public NonblockingMqttCallBack(IMqttClient iMqttClient) {
        setOwner(iMqttClient);
    }

    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
        try {
            String mqttMessage2 = mqttMessage.toString();
            JSONObject parseObject = JSONObject.parseObject(mqttMessage2);
            IdManager.makeSureMsgId(parseObject);
            MessageType messageType = MessageType.getMessageType(parseObject);
            switch (AnonymousClass5.$SwitchMap$info$xiancloud$plugin$distribution$MessageType[messageType.ordinal()]) {
                case 1:
                    LOG.info(String.format("离线广播: %s", parseObject));
                    break;
                case 2:
                    UnitRequest unitRequest = (UnitRequest) Reflection.toType(parseObject, UnitRequest.class);
                    logMqttFly(unitRequest, mqttMessage2);
                    String group = unitRequest.getContext().getGroup();
                    String unit = unitRequest.getContext().getUnit();
                    unitRequest.getContext().setFromRemote(true);
                    ISequencer.build(group, unit, parseObject).sequence(() -> {
                        new DefaultLocalAsyncSender(unitRequest, new NotifyHandler() { // from class: info.xiancloud.plugin.mqtt.NonblockingMqttCallBack.2
                            protected void handle(UnitResponse unitResponse) {
                                LocalNodeManager.sendBack(unitResponse);
                            }
                        }).send();
                    }, new NotifyHandler() { // from class: info.xiancloud.plugin.mqtt.NonblockingMqttCallBack.1
                        protected void handle(UnitResponse unitResponse) {
                            LocalNodeManager.sendBack(unitResponse);
                        }
                    });
                    break;
                case 3:
                    LOG.debug("ssid只在远程请求和响应时才会有值");
                    UnitResponse unitResponse = (UnitResponse) Reflection.toType(parseObject, UnitResponse.class);
                    logMqttFly(unitResponse, mqttMessage2);
                    String ssid = unitResponse.getContext().getSsid();
                    NotifyHandler notifyHandler = (NotifyHandler) LocalNodeManager.handleMap.getIfPresent(ssid);
                    LocalNodeManager.handleMap.invalidate(ssid);
                    UnitResponse create = UnitResponse.create(parseObject);
                    if (notifyHandler != null) {
                        try {
                            ThreadPoolManager.execute(() -> {
                                notifyHandler.callback(create);
                            });
                            break;
                        } catch (RejectedExecutionException e) {
                            LOG.info("线程池已关闭，这里使用临时线程执行任务，针对停服务时线程池已关闭的情况。");
                            new Thread(() -> {
                                notifyHandler.callback(create);
                            }).start();
                            break;
                        }
                    } else {
                        LOG.error(String.format("ssid=%s的消息没有找到对应的notifyHandler!整个消息内容=%s,", ssid, parseObject), new Throwable());
                        break;
                    }
                default:
                    LOG.error("未知的mqtt消息类型:" + messageType, new RuntimeException());
                    break;
            }
        } catch (Throwable th) {
            LOG.error(th);
        } finally {
            MsgIdHolder.clear();
        }
    }

    private void logMqttFly(final UnitRequest unitRequest, final String str) {
        final long sentTimestamp = unitRequest.getContext().getSentTimestamp();
        LOG.debug(new JSONObject() { // from class: info.xiancloud.plugin.mqtt.NonblockingMqttCallBack.3
            {
                put("_cost", Long.valueOf(System.currentTimeMillis() - sentTimestamp));
                put("type", "mqttFly");
                put("ssid", unitRequest.getContext().getSsid());
                put("from", unitRequest.getContext().getSourceNodeId());
                put("length", Integer.valueOf(str.length()));
                put("msgType", unitRequest.getContext().getMessageType());
                put("payload", str);
            }
        });
    }

    private void logMqttFly(final UnitResponse unitResponse, final String str) {
        final long sentTimestamp = unitResponse.getContext().getSentTimestamp();
        LOG.debug(new JSONObject() { // from class: info.xiancloud.plugin.mqtt.NonblockingMqttCallBack.4
            {
                put("_cost", Long.valueOf(System.currentTimeMillis() - sentTimestamp));
                put("type", "mqttFly");
                put("ssid", unitResponse.getContext().getSsid());
                put("from", unitResponse.getContext().getSourceNodeId());
                put("length", Integer.valueOf(str.length()));
                put("msgType", unitResponse.getContext().getMessageType());
                put("payload", str);
            }
        });
    }
}
