package cn.benma666.sjzt.mqtt;

import cn.benma666.constants.UtilConstInstance;
import cn.benma666.domain.SysSjglSjzt;
import cn.benma666.domain.SysSjglZnjh;
import cn.benma666.exception.MyException;
import cn.benma666.iframe.InterfaceLog;
import cn.benma666.iframe.Result;
import cn.benma666.myutils.ClassUtil;
import cn.benma666.myutils.FileUtil;
import cn.benma666.sjzt.BasicSjzt;
import cn.benma666.sjzt.IFile;
import cn.benma666.sjzt.SjztBzcwjdxExecption;
import cn.benma666.sjzt.SjztExecRunnable;
import cn.benma666.sjzt.SjztPooledObjectFactory;
import com.alibaba.druid.util.Utils;
import com.alibaba.fastjson.JSONObject;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/* loaded from: input_file:cn/benma666/sjzt/mqtt/Mqtt.class */
public class Mqtt extends BasicSjzt {
    private static Mqtt mqtt = null;
    private final GenericObjectPool objectPool;

    protected Mqtt(String str, SysSjglSjzt sysSjglSjzt) {
        super(str, sysSjglSjzt);
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setTestOnCreate(true);
        genericObjectPoolConfig.setTestOnBorrow(true);
        genericObjectPoolConfig.setMinIdle(1);
        genericObjectPoolConfig.setMaxIdle(2);
        genericObjectPoolConfig.setMaxTotal(5);
        genericObjectPoolConfig.setMaxWait(Duration.ofSeconds(300L));
        ClassUtil.plMethodInvoke(genericObjectPoolConfig, getSjzt().getKzxxObj().getJSONObject("ljcpz"));
        this.objectPool = new GenericObjectPool(new SjztPooledObjectFactory(sysSjglSjzt), genericObjectPoolConfig);
        MqttClient mqttClient = null;
        try {
            try {
                mqttClient = borrowClient();
                returnClient(mqttClient);
                if (mqtt == null) {
                    mqtt = this;
                }
                cache.put(str, this);
            } catch (Exception e) {
                throw new MyException(str + "初始化失败", (Throwable) e);
            }
        } catch (Throwable th) {
            returnClient(mqttClient);
            throw th;
        }
    }

    public void returnClient(MqttClient mqttClient) {
        this.log.trace("释放回连接池：{}", this.name);
        if (mqttClient != null) {
            this.objectPool.returnObject(mqttClient);
        }
    }

    public MqttClient borrowClient() throws Exception {
        this.log.trace("从连接池获取：{}", this.name);
        return (MqttClient) this.objectPool.borrowObject();
    }

    public static Mqtt use(String str) {
        return use(str, getSjzt(str));
    }

    public static Mqtt use(String str, SysSjglSjzt sysSjglSjzt) {
        Mqtt mqtt2 = (Mqtt) cache.get(str);
        if (mqtt2 == null) {
            mqtt2 = new Mqtt(str, sysSjglSjzt);
        }
        return mqtt2;
    }

    public static Result cszt(SysSjglSjzt sysSjglSjzt) {
        try {
            Result success = success("测试成功");
            MqttClient createClient = createClient(sysSjglSjzt);
            if (!validateClient(sysSjglSjzt, createClient)) {
                success = failed("测试失败");
            }
            destroyClient(sysSjglSjzt, createClient);
            return success;
        } catch (Throwable th) {
            slog.debug("{}测试失败", sysSjglSjzt, th);
            return failed("载体测试不通过：" + th.getMessage());
        }
    }

    public static boolean validateClient(SysSjglSjzt sysSjglSjzt, Object obj) {
        try {
            return ((MqttClient) obj).isConnected();
        } catch (Throwable th) {
            slog.debug("验证无效：{}", sysSjglSjzt.getMc(), th);
            return false;
        }
    }

    public static void destroyClient(SysSjglSjzt sysSjglSjzt, Object obj) throws Exception {
        MqttClient mqttClient = (MqttClient) obj;
        mqttClient.disconnect();
        mqttClient.close();
    }

    public static MqttClient createClient(SysSjglSjzt sysSjglSjzt) {
        try {
            MqttClient mqttClient = new MqttClient(sysSjglSjzt.getLjc(), sysSjglSjzt.getDm(), new MemoryPersistence());
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setCleanSession(false);
            mqttConnectOptions.setUserName(sysSjglSjzt.getYhm());
            mqttConnectOptions.setPassword(sysSjglSjzt.getMm().toCharArray());
            mqttConnectOptions.setConnectionTimeout(100);
            mqttConnectOptions.setKeepAliveInterval(20);
            mqttConnectOptions.setWill("willTopic", (sysSjglSjzt.getDm() + "与服务器断开连接").getBytes(), 0, false);
            mqttClient.setCallback(new MqttCallBack());
            JSONObject jSONObject = sysSjglSjzt.getKzxxObj().getJSONObject("mttpz");
            if (jSONObject != null) {
                ClassUtil.plMethodInvoke(mqttClient, jSONObject.getJSONObject("MqttClient"));
                ClassUtil.plMethodInvoke(mqttConnectOptions, jSONObject.getJSONObject("MqttConnectOptions"));
            }
            mqttClient.connect(mqttConnectOptions);
            if (mqttClient.isConnected()) {
                return mqttClient;
            }
            throw new MyException("mqtt连接失败，未知异常");
        } catch (MqttException e) {
            throw new MyException("mqtt数据载体创建失败：" + sysSjglSjzt.getDm(), (Throwable) e);
        }
    }

    public Object exec(SjztExecRunnable<MqttClient> sjztExecRunnable) {
        MqttClient mqttClient = null;
        try {
            try {
                mqttClient = borrowClient();
                Object exec = sjztExecRunnable.exec(mqttClient);
                if (mqttClient != null) {
                    this.objectPool.returnObject(mqttClient);
                }
                return exec;
            } catch (Exception e) {
                throw new MyException("执行异常", (Throwable) e);
            }
        } catch (Throwable th) {
            if (mqttClient != null) {
                this.objectPool.returnObject(mqttClient);
            }
            throw th;
        }
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public List<IFile> listFiles(SysSjglZnjh sysSjglZnjh) throws Exception {
        throw new MyException("mqtt不支持获取文件列表");
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public InputStream getInputStream(IFile iFile) throws Exception {
        if (iFile instanceof MqttFile) {
            return new ByteArrayInputStream(((MqttFile) iFile).getFile().getPayload());
        }
        throw new MyException("不支持非mqttfile");
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public boolean delete(IFile iFile) throws Exception {
        if (iFile instanceof MqttFile) {
            return true;
        }
        throw new MyException("不支持非Mqttfile");
    }

    public void subscribe(String str, IMqttMessageListener iMqttMessageListener) {
        subscribe(str, MqttQos.ZHYC.getCode(), iMqttMessageListener);
    }

    public void subscribe(String str, int i, IMqttMessageListener iMqttMessageListener) {
        exec(mqttClient -> {
            mqttClient.subscribe(str, i, iMqttMessageListener);
            return null;
        });
    }

    public boolean pub(String str, String str2) throws Exception {
        return pub(str, str2, MqttQos.ZSYC);
    }

    public boolean pub(String str, String str2, MqttQos mqttQos) throws Exception {
        return save(new ByteArrayInputStream(str2.getBytes(StandardCharsets.UTF_8)), new MqttFile(str, mqttQos));
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public boolean save(InputStream inputStream, IFile iFile) throws Exception {
        return ((Boolean) exec(mqttClient -> {
            MqttQos mqttQos = MqttQos.ZSYC;
            if (iFile instanceof MqttFile) {
                mqttQos = ((MqttFile) iFile).getQos();
            }
            String parent = iFile.getParent();
            try {
                try {
                    MqttMessage mqttMessage = new MqttMessage();
                    mqttMessage.setQos(mqttQos.getCode());
                    mqttMessage.setPayload(Utils.readByteArray(inputStream));
                    mqttClient.getTopic(parent).publish(mqttMessage).waitForCompletion();
                    FileUtil.closeStream(inputStream);
                    return true;
                } catch (Throwable th) {
                    throw new MyException("消息推送异常：" + parent, th);
                }
            } catch (Throwable th2) {
                FileUtil.closeStream(inputStream);
                throw th2;
            }
        })).booleanValue();
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public String getRootPath() {
        return this.sjzt.getDxgsNew();
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public long getSize(IFile iFile) throws Exception {
        if (iFile instanceof MqttFile) {
            return ((MqttFile) iFile).getFile().getPayload().length;
        }
        throw new SjztBzcwjdxExecption("不支持非mqttfile");
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public void sjztjt(SysSjglZnjh sysSjglZnjh, InterfaceLog interfaceLog) {
        exec(mqttClient -> {
            String srml = sysSjglZnjh.getSrml();
            try {
                mqttClient.subscribe(srml, MqttQos.ZHYC.getCode(), (str, mqttMessage) -> {
                    MqttFile mqttFile = new MqttFile(srml, mqttMessage, this, str.replace(UtilConstInstance.FXG, "_") + "_" + mqttMessage.getId() + ".json");
                    mqttFile.setGzml(srml);
                    sysSjglZnjh.getTp().run(() -> {
                        this.znjh.znjh(sysSjglZnjh, interfaceLog, mqttFile);
                    });
                });
                while (mqttClient.isConnected()) {
                    Thread.sleep(100000L);
                }
            } catch (InterruptedException e) {
                interfaceLog.info("退出遍历监听：{}", sysSjglZnjh.getJhmc());
                mqttClient.unsubscribe(srml);
            } catch (Throwable th) {
                interfaceLog.error("遍历监听异常：" + sysSjglZnjh.getJhmc() + "，" + th.getMessage(), th);
                mqttClient.unsubscribe(srml);
            }
            jtqMap.remove(sysSjglZnjh.getId());
            return null;
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (!this.objectPool.isClosed()) {
            this.objectPool.close();
        }
        cache.remove(this.name);
        if (this == mqtt) {
            mqtt = null;
        }
    }
}
