package info.xiancloud.mq.backed;

import com.alibaba.fastjson.JSONObject;
import info.xiancloud.core.Handler;
import info.xiancloud.core.Unit;
import info.xiancloud.core.distribution.LocalNodeManager;
import info.xiancloud.core.distribution.MessageType;
import info.xiancloud.core.distribution.UnitJudge;
import info.xiancloud.core.distribution.UnitProxy;
import info.xiancloud.core.distribution.exception.UnitUndefinedException;
import info.xiancloud.core.distribution.loadbalance.UnitRouter;
import info.xiancloud.core.distribution.service_discovery.UnitDiscovery;
import info.xiancloud.core.init.IStartService;
import info.xiancloud.core.init.shutdown.ShutdownHook;
import info.xiancloud.core.message.IdManager;
import info.xiancloud.core.message.SingleRxXian;
import info.xiancloud.core.message.UnitRequest;
import info.xiancloud.core.mq.IMqConsumerClient;
import info.xiancloud.core.mq.TransferQueueUtil;
import info.xiancloud.core.thread_pool.ThreadPoolManager;
import info.xiancloud.core.util.LOG;
import info.xiancloud.core.util.StringUtil;
import info.xiancloud.core.util.thread.MsgIdHolder;
import io.reactivex.Single;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Function;

/* loaded from: input_file:info/xiancloud/mq/backed/TransferManager.class */
public class TransferManager implements IStartService, ShutdownHook {
    private ScheduledFuture scheduledFuture;
    private static final Set<String> queueNames = Collections.synchronizedSet(new HashSet());
    private static final Function<JSONObject, Boolean> callback = jSONObject -> {
        try {
            try {
                IdManager.makeSureMsgId(jSONObject);
                if (!MessageType.isDefaultRequest(jSONObject)) {
                    LOG.warn(String.format("非%s消息忽略不处理：%s", MessageType.request, MessageType.getMessageType(jSONObject)));
                    MsgIdHolder.clear();
                    return true;
                }
                UnitRequest unitRequest = (UnitRequest) jSONObject.toJavaObject(UnitRequest.class);
                final String group = unitRequest.getContext().getGroup();
                final String unit = unitRequest.getContext().getUnit();
                if (StringUtil.isEmpty(group) && !StringUtil.isEmpty(unit)) {
                    LOG.warn("本持久化队列只支持unitRequest消息分发");
                    MsgIdHolder.clear();
                    return true;
                }
                while (!UnitJudge.available(group, unit)) {
                    LOG.info(new JSONObject() { // from class: info.xiancloud.mq.backed.TransferManager.1
                        {
                            put("type", "transfer");
                            put("event", "pileup");
                            put("queue", group);
                            put("unit", unit);
                            put("description", "目标unit不在线，队列堆积");
                        }
                    });
                    Thread.sleep(10000L);
                }
                Handler handler = unitResponse -> {
                    LOG.info("中转器收到回调:" + unitResponse + ";准备转发回调结果至原始发送端...");
                    LocalNodeManager.sendBack(unitResponse);
                };
                LOG.debug("发送给unit的消息必须使用Xian发送，它支持unit多样性发送方式，必须标记为transferredAlready否则会出现循环消息");
                unitRequest.getContext().setTransferredAlready(true);
                Single call = SingleRxXian.call(unitRequest);
                handler.getClass();
                call.subscribe((v1) -> {
                    r1.handle(v1);
                });
                MsgIdHolder.clear();
                return true;
            } catch (Throwable th) {
                LOG.error(th);
                MsgIdHolder.clear();
                return false;
            }
        } catch (Throwable th2) {
            MsgIdHolder.clear();
            throw th2;
        }
    };

    public boolean startup() {
        this.scheduledFuture = ThreadPoolManager.scheduleWithFixedDelay(() -> {
            for (String str : UnitDiscovery.singleton.queryForNames()) {
                try {
                    UnitProxy newestDefinition = UnitRouter.SINGLETON.newestDefinition(str);
                    String str2 = (String) Unit.parseFullName(str).fst;
                    if (newestDefinition.getMeta().isTransferable()) {
                        startIfNotStarted(str2);
                    }
                } catch (UnitUndefinedException e) {
                } catch (Throwable th) {
                    LOG.error("未知异常，跳过：" + str, th);
                }
            }
        }, 120000L);
        return true;
    }

    public boolean shutdown() {
        synchronized (queueNames) {
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
            }
            Iterator<String> it = queueNames.iterator();
            while (it.hasNext()) {
                stopIfStarted(it.next());
            }
            LOG.info("已取消订阅所有堆积队列");
        }
        return true;
    }

    private static void startIfNotStarted(String str) {
        synchronized (queueNames) {
            if (!queueNames.contains(str)) {
                try {
                    IMqConsumerClient.singleton.consumeStaticQueue(TransferQueueUtil.getTransferQueue(str), callback);
                    queueNames.add(str);
                } catch (Throwable th) {
                    LOG.warn("订阅静态队列失败：" + str, th);
                }
            }
        }
    }

    private static void stopIfStarted(String str) {
        synchronized (queueNames) {
            if (queueNames.contains(str)) {
                IMqConsumerClient.singleton.unconsume(TransferQueueUtil.getTransferQueue(str));
            }
        }
    }
}
