package work.ready.cloud.cluster;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.lang.IgniteBiPredicate;
import work.ready.cloud.ReadyCloud;
import work.ready.cloud.client.ClientConfig;
import work.ready.cloud.cluster.common.MessageBody;
import work.ready.cloud.cluster.common.MessageCmd;
import work.ready.cloud.cluster.common.MessageException;
import work.ready.cloud.cluster.common.MessageState;
import work.ready.core.log.Log;
import work.ready.core.log.LogFactory;
import work.ready.core.server.Ready;
import work.ready.core.tools.define.CheckedSupplier;
import work.ready.core.tools.define.LambdaFinal;

/* loaded from: input_file:work/ready/cloud/cluster/ReliableMessage.class */
public class ReliableMessage {
    public static final String ReliableMessageChannel = "ReliableMessageChannel";
    private boolean messageLogger = ReadyCloud.getConfig().isReliableMessageLogger();
    private int cacheLiveSeconds = 60;
    private int rpcTimeout;
    private Duration defaultTimeout;
    private final Map<String, List<Listener>> listenerMap;
    private final Map<String, Cache<String, Map<UUID, MessageBody>>> messageCache;
    private static final int poolNumber = 5;
    private static final Log logger = LogFactory.getLog(ReliableMessage.class);
    public static final MessageBody DONE = new MessageBody("", "", "", MessageState.STATE_OK);
    private static final Map<String, ThreadPoolExecutor> channelPool = new HashMap();
    private static final Map<String, Long> poolTaskCounter = new TreeMap();
    private static final ScheduledThreadPoolExecutor scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(100);
    private static final int threadNumber = Math.max(Runtime.getRuntime().availableProcessors() * 2, ReadyCloud.getConfig().getTransaction().getConcurrentLevel());

    @FunctionalInterface
    /* loaded from: input_file:work/ready/cloud/cluster/ReliableMessage$Listener.class */
    public interface Listener {
        boolean handle(MessageCmd messageCmd, MessageBody[] messageBodyArr);
    }

    public ReliableMessage() {
        this.rpcTimeout = ReadyCloud.getConfig().getHttpClient().getTimeout() > 0 ? ReadyCloud.getConfig().getHttpClient().getTimeout() : ClientConfig.DEFAULT_TIMEOUT;
        this.defaultTimeout = Duration.ofMillis(ReadyCloud.getConfig().getReliableMessageTimeout());
        this.listenerMap = new ConcurrentHashMap();
        this.messageCache = new HashMap();
    }

    public boolean isMessageLogger() {
        return this.messageLogger;
    }

    public ReliableMessage setMessageLogger(boolean z) {
        this.messageLogger = z;
        return this;
    }

    public int getCacheLiveSeconds() {
        return this.cacheLiveSeconds;
    }

    public ReliableMessage setCacheLiveSeconds(int i) {
        this.cacheLiveSeconds = i;
        return this;
    }

    public Duration getDefaultTimeout() {
        return this.defaultTimeout;
    }

    public ReliableMessage setDefaultTimeout(Duration duration) {
        this.defaultTimeout = duration;
        return this;
    }

    private static ExecutorService getPool(String str) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        synchronized (poolTaskCounter) {
            for (int i = 0; i < poolNumber; i++) {
                String str2 = str + i;
                ThreadPoolExecutor threadPoolExecutor = channelPool.get(str2);
                if (threadPoolExecutor == null) {
                    synchronized (channelPool) {
                        threadPoolExecutor = channelPool.get(str2);
                        if (threadPoolExecutor == null) {
                            threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new CloudThreadFactory("ReliableMessage-" + str2, 7));
                            channelPool.put(str2, threadPoolExecutor);
                        }
                    }
                }
                long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
                long taskCount = threadPoolExecutor.getTaskCount();
                Long l = poolTaskCounter.get(str2);
                if (l == null || completedTaskCount > l.longValue() || threadPoolExecutor.getActiveCount() == 0) {
                    arrayList2.add(str2);
                } else if (taskCount > 0) {
                    arrayList.add(str2);
                }
                poolTaskCounter.put(str2, Long.valueOf(completedTaskCount));
            }
        }
        return arrayList2.size() > 0 ? channelPool.get(arrayList2.get(ThreadLocalRandom.current().nextInt(arrayList2.size()))) : arrayList.size() == poolNumber ? channelPool.get(poolTaskCounter.entrySet().stream().sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())).findFirst().get().getKey()) : channelPool.entrySet().stream().filter(entry -> {
            return !arrayList.contains(entry.getKey());
        }).findAny().get().getValue();
    }

    public ReliableMessage addListener(String str, Listener listener) {
        synchronized (this.listenerMap) {
            this.listenerMap.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(listener);
        }
        return this;
    }

    private void insertListener(String str, Listener listener) {
        synchronized (this.listenerMap) {
            this.listenerMap.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(0, listener);
        }
    }

    public void removeListener(String str, Listener listener) {
        synchronized (this.listenerMap) {
            List<Listener> list = this.listenerMap.get(str);
            if (list != null) {
                list.remove(listener);
            }
        }
    }

    private Cache<String, Map<UUID, MessageBody>> getOrCreateCache(String str) {
        Cache<String, Map<UUID, MessageBody>> cache = this.messageCache.get(str);
        if (cache == null) {
            synchronized (this.messageCache) {
                cache = this.messageCache.get(str);
                if (cache == null) {
                    cache = Caffeine.newBuilder().expireAfterWrite(this.cacheLiveSeconds, TimeUnit.SECONDS).build();
                    this.messageCache.put(str, cache);
                }
            }
        }
        return cache;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void listen(Cloud cloud) {
        Cloud.message().localListen(ReliableMessageChannel, new IgniteBiPredicate<UUID, MessageCmd>() { // from class: work.ready.cloud.cluster.ReliableMessage.1
            public boolean apply(UUID uuid, MessageCmd messageCmd) {
                if (ReliableMessage.this.messageLogger && ReliableMessage.logger.isTraceEnabled()) {
                    ReliableMessage.logger.trace("node received reliable message [msg=" + messageCmd + ", from remote=" + uuid + "]", new Object[0]);
                }
                String sendToChannel = messageCmd.getSendToChannel();
                String messageId = messageCmd.getMessageId();
                messageCmd.setNodeId(uuid);
                Map map = (Map) ReliableMessage.this.getOrCreateCache(sendToChannel).getIfPresent(messageId);
                if (map != null) {
                    synchronized (map) {
                        map.put(uuid, messageCmd.getMessage());
                        map.notifyAll();
                    }
                }
                List<Listener> list = ReliableMessage.this.listenerMap.get(sendToChannel);
                if (list == null) {
                    return true;
                }
                ReliableMessage.getPool(sendToChannel).execute(() -> {
                    MessageBody[] messageBodyArr = new MessageBody[1];
                    Iterator it = list.iterator();
                    while (it.hasNext() && ((Listener) it.next()).handle(messageCmd, messageBodyArr)) {
                    }
                    if (messageBodyArr[0] != null) {
                        MessageCmd messageCmd2 = new MessageCmd();
                        messageCmd2.setNodeId(uuid).setSendToChannel(messageCmd.getReceiveChannel()).setReceiveChannel(messageCmd.getSendToChannel()).setMessageId(messageCmd.getMessageId()).setMessage(messageBodyArr[0]);
                        Cloud.message(Cloud.cluster().forNodeId(uuid, new UUID[0])).send(ReliableMessage.ReliableMessageChannel, messageCmd2);
                    }
                });
                return true;
            }
        });
    }

    public void finalReply(UUID uuid, String str, String str2) throws MessageException {
        finalReply(Cloud.cluster().forNodeId(uuid, new UUID[0]), str, str2, DONE);
    }

    public void finalReply(UUID uuid, String str, String str2, String str3) throws MessageException {
        finalReply(Cloud.cluster().forNodeId(uuid, new UUID[0]), str, str2, str3, DONE);
    }

    public void finalReply(UUID uuid, String str, String str2, MessageBody messageBody) throws MessageException {
        finalReply(Cloud.cluster().forNodeId(uuid, new UUID[0]), str, str2, messageBody);
    }

    public void finalReply(UUID uuid, String str, String str2, String str3, MessageBody messageBody) throws MessageException {
        finalReply(Cloud.cluster().forNodeId(uuid, new UUID[0]), str, str2, str3, messageBody);
    }

    public void finalReply(ClusterGroup clusterGroup, String str, String str2) throws MessageException {
        finalReply(clusterGroup, str, str2, DONE);
    }

    public void finalReply(ClusterGroup clusterGroup, String str, String str2, String str3) throws MessageException {
        finalReply(clusterGroup, str, str2, str3, DONE);
    }

    public void finalReply(ClusterGroup clusterGroup, String str, String str2, MessageBody messageBody) throws MessageException {
        finalReply(clusterGroup, str, str, str2, messageBody);
    }

    public void finalReply(ClusterGroup clusterGroup, String str, String str2, String str3, MessageBody messageBody) throws MessageException {
        try {
            MessageCmd messageCmd = new MessageCmd();
            messageCmd.setChannel(str).setReceiveChannel(str2).setMessageId(str3).setMessage(messageBody);
            Cloud.message(clusterGroup).send(ReliableMessageChannel, messageCmd);
        } catch (Exception e) {
            logger.warn(e, "ReliableMessage exception: ", new Object[0]);
            throw new MessageException("ReliableMessage exception: ", e);
        }
    }

    public MessageBody reply(UUID uuid, String str, String str2, MessageBody messageBody) throws MessageException {
        return reply(uuid, str, str, str2, messageBody, this.defaultTimeout);
    }

    public MessageBody reply(UUID uuid, String str, String str2, MessageBody messageBody, Duration duration) throws MessageException {
        return reply(uuid, str, str, str2, messageBody, duration);
    }

    public MessageBody reply(UUID uuid, String str, String str2, String str3, MessageBody messageBody) throws MessageException {
        return reply(uuid, str, str2, str3, messageBody, this.defaultTimeout);
    }

    public MessageBody reply(UUID uuid, String str, String str2, String str3, MessageBody messageBody, Duration duration) throws MessageException {
        Map<UUID, MessageBody> send = send(Cloud.cluster().forNodeId(uuid, new UUID[0]), str, str2, str3, messageBody, duration);
        if (send != null) {
            return send.get(uuid);
        }
        return null;
    }

    public Map<UUID, MessageBody> reply(ClusterGroup clusterGroup, String str, String str2, MessageBody messageBody) throws MessageException {
        return send(clusterGroup, str, str, str2, messageBody, this.defaultTimeout);
    }

    public Map<UUID, MessageBody> reply(ClusterGroup clusterGroup, String str, String str2, MessageBody messageBody, Duration duration) throws MessageException {
        return send(clusterGroup, str, str, str2, messageBody, duration);
    }

    public Map<UUID, MessageBody> reply(ClusterGroup clusterGroup, String str, String str2, String str3, MessageBody messageBody) throws MessageException {
        return send(clusterGroup, str, str2, str3, messageBody, this.defaultTimeout);
    }

    public Map<UUID, MessageBody> reply(ClusterGroup clusterGroup, String str, String str2, String str3, MessageBody messageBody, Duration duration) throws MessageException {
        return send(clusterGroup, str, str2, str3, messageBody, duration);
    }

    public MessageBody send(UUID uuid, String str, MessageBody messageBody) throws MessageException {
        return send(uuid, str, str, messageBody, this.defaultTimeout);
    }

    public MessageBody send(UUID uuid, String str, MessageBody messageBody, Duration duration) throws MessageException {
        return send(uuid, str, str, messageBody, duration);
    }

    public MessageBody send(UUID uuid, String str, String str2, MessageBody messageBody) throws MessageException {
        return send(uuid, str, str2, messageBody, this.defaultTimeout);
    }

    public MessageBody send(UUID uuid, String str, String str2, MessageBody messageBody, Duration duration) throws MessageException {
        Map<UUID, MessageBody> send = send(Cloud.cluster().forNodeId(uuid, new UUID[0]), str, str2, String.valueOf(Ready.getId()), messageBody, duration);
        if (send != null) {
            return send.get(uuid);
        }
        return null;
    }

    public Map<UUID, MessageBody> send(ClusterGroup clusterGroup, String str, MessageBody messageBody) throws MessageException {
        return send(clusterGroup, str, str, String.valueOf(Ready.getId()), messageBody, this.defaultTimeout);
    }

    public Map<UUID, MessageBody> send(ClusterGroup clusterGroup, String str, MessageBody messageBody, Duration duration) throws MessageException {
        return send(clusterGroup, str, str, String.valueOf(Ready.getId()), messageBody, duration);
    }

    public Map<UUID, MessageBody> send(ClusterGroup clusterGroup, String str, String str2, MessageBody messageBody) throws MessageException {
        return send(clusterGroup, str, str2, String.valueOf(Ready.getId()), messageBody, this.defaultTimeout);
    }

    public Map<UUID, MessageBody> send(ClusterGroup clusterGroup, String str, String str2, MessageBody messageBody, Duration duration) throws MessageException {
        return send(clusterGroup, str, str2, String.valueOf(Ready.getId()), messageBody, duration);
    }

    public Map<UUID, MessageBody> send(ClusterGroup clusterGroup, String str, String str2, String str3, MessageBody messageBody, Duration duration) throws MessageException {
        Collection nodes = clusterGroup.nodes();
        CompletableFuture supplyAsync = supplyAsync(() -> {
            HashMap hashMap = new HashMap();
            Cache<String, Map<UUID, MessageBody>> orCreateCache = getOrCreateCache(str2);
            synchronized (orCreateCache) {
                orCreateCache.put(str3, hashMap);
            }
            while (true) {
                if (nodes.size() != 1) {
                    boolean z = true;
                    Iterator it = nodes.iterator();
                    while (it.hasNext()) {
                        if (!hashMap.containsKey(((ClusterNode) it.next()).id())) {
                            z = false;
                        }
                    }
                    if (z) {
                        return hashMap;
                    }
                } else if (hashMap.containsKey(((ClusterNode) nodes.iterator().next()).id())) {
                    return hashMap;
                }
                synchronized (hashMap) {
                    hashMap.wait();
                }
            }
        }, completableFuture -> {
            getOrCreateCache(str2).invalidate(str3);
        }, duration == null ? this.defaultTimeout : duration, null);
        try {
            MessageCmd messageCmd = new MessageCmd();
            messageCmd.setSendToChannel(str).setReceiveChannel(str2).setMessageId(str3).setMessage(messageBody);
            Cloud.message(clusterGroup).send(ReliableMessageChannel, messageCmd);
            return (Map) supplyAsync.get();
        } catch (Exception e) {
            logger.warn(e, "ReliableMessage exception, sendToChannel=%s, receiveChannel=%s, messageId=%s, message=%s", new Object[]{str, str2, str3, messageBody});
            throw new MessageException("ReliableMessage exception: ", e);
        }
    }

    public static <T, E extends Exception> CompletableFuture<T> supplyAsync(CheckedSupplier<T, E> checkedSupplier, Consumer<CompletableFuture<T>> consumer, Duration duration, T t) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        LambdaFinal lambdaFinal = new LambdaFinal();
        Future<?> submit = getPool("AsyncCompletableFuture").submit(() -> {
            try {
                try {
                    completableFuture.complete(checkedSupplier.get());
                    if (consumer != null) {
                        consumer.accept(completableFuture);
                    }
                    if (lambdaFinal.get() != null) {
                        ((ScheduledFuture) lambdaFinal.get()).cancel(true);
                    }
                } catch (Throwable th) {
                    completableFuture.completeExceptionally(th);
                    if (consumer != null) {
                        consumer.accept(completableFuture);
                    }
                    if (lambdaFinal.get() != null) {
                        ((ScheduledFuture) lambdaFinal.get()).cancel(true);
                    }
                }
            } catch (Throwable th2) {
                if (consumer != null) {
                    consumer.accept(completableFuture);
                }
                if (lambdaFinal.get() != null) {
                    ((ScheduledFuture) lambdaFinal.get()).cancel(true);
                }
                throw th2;
            }
        });
        if (duration != null) {
            lambdaFinal.set(scheduler.schedule(() -> {
                if (completableFuture.isDone()) {
                    return;
                }
                if (t != null) {
                    completableFuture.complete(t);
                } else {
                    logger.warn("request time out", new Object[0]);
                    completableFuture.completeExceptionally(new TimeoutException());
                }
                submit.cancel(true);
            }, duration.toMillis(), TimeUnit.MILLISECONDS));
        }
        return completableFuture;
    }

    private static <T> CompletableFuture<T> timeoutFuture(Duration duration) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        scheduler.schedule(() -> {
            return Boolean.valueOf(completableFuture.completeExceptionally(new TimeoutException("Timeout after " + duration)));
        }, duration.toMillis(), TimeUnit.MILLISECONDS);
        return completableFuture;
    }

    public static <T> CompletableFuture<T> supplyAsync(CompletableFuture<T> completableFuture, Duration duration) {
        return (CompletableFuture<T>) completableFuture.applyToEitherAsync((CompletionStage) timeoutFuture(duration), (Function) Function.identity());
    }

    static {
        scheduler.setRemoveOnCancelPolicy(true);
        Ready.shutdownHook.add(6, j -> {
            channelPool.forEach((str, threadPoolExecutor) -> {
                threadPoolExecutor.shutdown();
            });
            scheduler.shutdown();
        });
    }
}
