package org.fastquery.tcpserver;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.List;
import org.fastquery.bytes.CRC16;
import org.fastquery.bytes.ShortByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;

/* loaded from: input_file:org/fastquery/tcpserver/Service.class */
public class Service implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Service.class);
    private static final ZMQ.Context context = ZMQ.context(1);
    private ZMQ.Socket subscriber = context.socket(2);
    private Socket socket;
    private Conf conf;
    private boolean sub;

    public Service(Socket socket, Conf conf) {
        this.socket = socket;
        this.conf = conf;
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public void run() {
        this.subscriber.connect(this.conf.getMqSubConnectAddr());
        try {
            try {
                InputStream inputStream = this.socket.getInputStream();
                Throwable th = null;
                try {
                    OutputStream outputStream = this.socket.getOutputStream();
                    Throwable th2 = null;
                    try {
                        try {
                            LOG.info("已经与" + this.socket.getInetAddress().toString() + "建立连接");
                            while (!this.socket.isClosed()) {
                                List<Byte> readFixed12 = Convert.readFixed12(inputStream);
                                int byteValue = readFixed12.get(6).byteValue() & 255;
                                short s = ShortByteUtil.getShort(new byte[]{readFixed12.get(7).byteValue(), readFixed12.get(8).byteValue()});
                                int byteValue2 = readFixed12.get(9).byteValue() & 255;
                                short s2 = ShortByteUtil.getShort(new byte[]{readFixed12.get(10).byteValue(), readFixed12.get(11).byteValue()});
                                readFixed12.addAll(Convert.readFixed(inputStream, byteValue + s + s2 + 2));
                                int size = readFixed12.size();
                                byte[] updateCRC = CRC16.updateCRC(readFixed12.subList(0, size - 2));
                                if (updateCRC[0] == readFixed12.get(size - 2).byteValue() && updateCRC[1] == readFixed12.get(size - 1).byteValue()) {
                                    Transmission transmission = Convert.toTransmission(readFixed12, byteValue, s, byteValue2, s2);
                                    ProcessingRequest.receive(transmission, new Response(outputStream), this.conf);
                                    if (!this.sub) {
                                        String id = transmission.getId();
                                        OnLineStatus.addUser(id, this.conf);
                                        byte[] bytes = id.getBytes(Charset.forName("gb2312"));
                                        this.subscriber.subscribe(bytes);
                                        this.subscriber.setReceiveTimeOut(this.conf.getMqSubReceiveTimeOut());
                                        this.sub = true;
                                        LOG.info(id + " 已成功订阅消息");
                                        new Thread(() -> {
                                            while (!this.socket.isClosed()) {
                                                try {
                                                    byte[] recv = this.subscriber.recv(0);
                                                    if (recv == null || recv[0] != Byte.MAX_VALUE) {
                                                        LOG.debug(id + "订阅超时");
                                                        outputStream.write(255);
                                                    } else {
                                                        outputStream.write(recv);
                                                        OnLineStatus.addUser(id, this.conf);
                                                    }
                                                } catch (Exception e) {
                                                    OnLineStatus.delUser(id, this.conf);
                                                    LOG.error(e.getMessage(), e);
                                                    this.sub = false;
                                                    this.subscriber.unsubscribe(bytes);
                                                    try {
                                                        this.socket.close();
                                                    } catch (IOException e2) {
                                                        LOG.error(e2.getMessage(), e2);
                                                    }
                                                }
                                            }
                                            LOG.info(id + " 已取消订阅消息");
                                        }).start();
                                    }
                                } else {
                                    LOG.error(String.format("客户端IP:%s, 校验失败了,msg长度: %s, attachmentLength:%s, idLength=%s", this.socket.getInetAddress(), Integer.valueOf(s), Integer.valueOf(s2), Integer.valueOf(byteValue)));
                                }
                            }
                            if (outputStream != null) {
                                if (0 != 0) {
                                    try {
                                        outputStream.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    outputStream.close();
                                }
                            }
                            if (inputStream != null) {
                                if (0 != 0) {
                                    try {
                                        inputStream.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    inputStream.close();
                                }
                            }
                        } catch (Throwable th5) {
                            th2 = th5;
                            throw th5;
                        }
                    } catch (Throwable th6) {
                        if (outputStream != null) {
                            if (th2 != null) {
                                try {
                                    outputStream.close();
                                } catch (Throwable th7) {
                                    th2.addSuppressed(th7);
                                }
                            } else {
                                outputStream.close();
                            }
                        }
                        throw th6;
                    }
                } catch (Throwable th8) {
                    if (inputStream != null) {
                        if (0 != 0) {
                            try {
                                inputStream.close();
                            } catch (Throwable th9) {
                                th.addSuppressed(th9);
                            }
                        } else {
                            inputStream.close();
                        }
                    }
                    throw th8;
                }
            } finally {
                try {
                    this.socket.close();
                    LOG.info("会话结束");
                    LOG.info("已经与" + this.socket.getInetAddress().toString() + "断开连接");
                } catch (IOException e) {
                    LOG.error(e.getMessage(), e);
                }
            }
        } catch (Exception e2) {
            LOG.error(e2.getMessage(), e2);
            try {
                this.socket.close();
                LOG.info("会话结束");
                LOG.info("已经与" + this.socket.getInetAddress().toString() + "断开连接");
            } catch (IOException e3) {
                LOG.error(e3.getMessage(), e3);
            }
        }
    }
}
