/*
 * Decompiled with CFR 0.152.
 */
package org.rx.net.rpc;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ThreadLocalRandom;
import java.io.Serializable;
import java.lang.invoke.LambdaMetafactory;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import lombok.NonNull;
import org.apache.commons.lang3.BooleanUtils;
import org.rx.bean.$;
import org.rx.bean.DynamicProxyBean;
import org.rx.bean.ProceedEventArgs;
import org.rx.core.EventArgs;
import org.rx.core.EventPublisher;
import org.rx.core.Extends;
import org.rx.core.Linq;
import org.rx.core.Reflects;
import org.rx.core.ResetEventWait;
import org.rx.core.RxConfig;
import org.rx.core.Sys;
import org.rx.core.Tasks;
import org.rx.core.ThreadPool;
import org.rx.exception.TraceHandler;
import org.rx.net.Sockets;
import org.rx.net.rpc.NonClientPool;
import org.rx.net.rpc.RemotingContext;
import org.rx.net.rpc.RemotingEventArgs;
import org.rx.net.rpc.RemotingException;
import org.rx.net.rpc.RpcClientConfig;
import org.rx.net.rpc.RpcClientPool;
import org.rx.net.rpc.RpcServerConfig;
import org.rx.net.rpc.TcpClientPool;
import org.rx.net.rpc.protocol.EventFlag;
import org.rx.net.rpc.protocol.EventMessage;
import org.rx.net.rpc.protocol.MetadataMessage;
import org.rx.net.rpc.protocol.MethodMessage;
import org.rx.net.transport.ClientDisconnectedException;
import org.rx.net.transport.StatefulTcpClient;
import org.rx.net.transport.TcpClient;
import org.rx.net.transport.TcpServer;
import org.rx.net.transport.TcpServerConfig;
import org.rx.net.transport.protocol.ErrorPacket;
import org.rx.util.BeanMapper;
import org.rx.util.IdGenerator;
import org.rx.util.Snowflake;
import org.rx.util.function.TripleAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Remoting {
    private static final Logger log = LoggerFactory.getLogger(Remoting.class);
    static final String HANDSHAKE_META_KEY = "HandshakeMeta";
    static final String M_0 = "raiseEvent";
    static final String M_1 = "raiseEventAsync";
    static final String M_2 = "attachEvent";
    static final Map<Object, ServerBean> serverBeans = new ConcurrentHashMap<Object, ServerBean>();
    static final Map<RpcClientConfig, TcpClientPool> clientPools = new ConcurrentHashMap<RpcClientConfig, TcpClientPool>();
    static final IdGenerator generator = new IdGenerator();
    static final Map<StatefulTcpClient, Map<Integer, ClientBean>> clientBeans = new ConcurrentHashMap<StatefulTcpClient, Map<Integer, ClientBean>>();

    public static <T> T createFacade(@NonNull Class<T> contract, @NonNull RpcClientConfig<T> config) {
        if (contract == null) {
            throw new NullPointerException("contract is marked non-null but is null");
        }
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        FastThreadLocal isCompute = new FastThreadLocal();
        $ sync = $.$();
        return Sys.proxy(contract, (m, p) -> {
            if (Reflects.OBJECT_METHODS.contains(m)) {
                return p.fastInvokeSuper();
            }
            if (Reflects.isCloseMethod(m)) {
                $ $2 = sync;
                synchronized ($2) {
                    if (sync.v != null) {
                        ((StatefulTcpClient)sync.v).close();
                        sync.v = null;
                    }
                }
                return null;
            }
            Serializable pack = null;
            Object[] args = p.arguments;
            ClientBean clientBean = new ClientBean();
            switch (m.getName()) {
                case "raiseEvent": 
                case "raiseEventAsync": {
                    if (args.length != 2) break;
                    if (!(args[0] instanceof String) || BooleanUtils.isTrue((Boolean)((Boolean)isCompute.get()))) {
                        return Remoting.invokeSuper(m, p);
                    }
                    isCompute.remove();
                    Remoting.setReturnValue(clientBean, Remoting.invokeSuper(m, p));
                    EventMessage eventMessage = new EventMessage((String)args[0], EventFlag.PUBLISH);
                    eventMessage.eventArgs = (EventArgs)args[1];
                    pack = eventMessage;
                    log.info("clientSide event {} -> PUBLISH", (Object)eventMessage.eventName);
                    break;
                }
                case "attachEvent": {
                    String eventName;
                    switch (args.length) {
                        case 2: {
                            return Remoting.invokeSuper(m, p);
                        }
                        case 3: {
                            Remoting.setReturnValue(clientBean, Remoting.invokeSuper(m, p));
                            eventName = (String)args[0];
                            pack = new EventMessage(eventName, EventFlag.SUBSCRIBE);
                            log.info("clientSide event {} -> SUBSCRIBE", (Object)eventName);
                        }
                    }
                    break;
                }
                case "detachEvent": {
                    if (args.length != 2) break;
                    Remoting.setReturnValue(clientBean, Remoting.invokeSuper(m, p));
                    String eventName = (String)args[0];
                    pack = new EventMessage(eventName, EventFlag.UNSUBSCRIBE);
                    log.info("clientSide event {} -> UNSUBSCRIBE", (Object)eventName);
                    break;
                }
                case "eventFlags": 
                case "asyncScheduler": {
                    if (args.length != 0) break;
                    return Remoting.invokeSuper(m, p);
                }
            }
            if (pack == null) {
                pack = clientBean.pack = new MethodMessage(generator.increment(), m.getName(), args, ThreadPool.traceId());
            }
            TcpClientPool pool = clientPools.computeIfAbsent(config, k -> {
                log.info("RpcClientPool {}", (Object)Sys.toJsonString(k));
                if (!config.isUsePool()) {
                    return new NonClientPool(config.getTcpConfig());
                }
                return new RpcClientPool(config);
            });
            if (sync.v == null) {
                $ $3 = sync;
                synchronized ($3) {
                    if (sync.v == null) {
                        sync.v = pool.borrowClient();
                        Remoting.init((StatefulTcpClient)sync.v, p.getProxyObject(), (FastThreadLocal<Boolean>)isCompute);
                        TripleAction<Object, StatefulTcpClient> initFn = (o, c) -> {
                            c.send(new MetadataMessage(config.getEventVersion()));
                            TripleAction initHandler = config.getInitHandler();
                            if (initHandler != null) {
                                initHandler.invoke(o, (StatefulTcpClient)c);
                            }
                        };
                        ((StatefulTcpClient)sync.v).onReconnected.combine((s, e) -> {
                            initFn.invoke(p.getProxyObject(), (StatefulTcpClient)s);
                            s.asyncScheduler().runAsync(() -> {
                                for (ClientBean val : Remoting.getClientBeans((StatefulTcpClient)s).values()) {
                                    if (val.syncRoot.getHoldCount() == 0) continue;
                                    log.info("clientSide resent pack[{}] {}", (Object)val.pack.id, (Object)val.pack.methodName);
                                    try {
                                        s.send(val.pack);
                                    }
                                    catch (ClientDisconnectedException ex) {
                                        log.warn("clientSide resent pack[{}] fail", (Object)val.pack.id);
                                    }
                                }
                            });
                        });
                        initFn.invoke(p.getProxyObject(), (StatefulTcpClient)sync.v);
                        if (sync.v == null) {
                            sync.v = pool.borrowClient();
                            Remoting.init((StatefulTcpClient)sync.v, p.getProxyObject(), (FastThreadLocal<Boolean>)isCompute);
                        }
                    }
                }
            }
            StatefulTcpClient client = (StatefulTcpClient)sync.v;
            Map<Integer, ClientBean> waitBeans = null;
            MethodMessage methodMessage = Extends.as(pack, MethodMessage.class);
            ProceedEventArgs eventArgs = methodMessage != null ? new ProceedEventArgs(contract, methodMessage.parameters, false) : null;
            try {
                client.send(pack);
                if (eventArgs != null) {
                    waitBeans = Remoting.getClientBeans(client);
                    waitBeans.put(clientBean.pack.id, clientBean);
                    if (!clientBean.syncRoot.waitOne(client.getConfig().getConnectTimeoutMillis())) {
                        if (!client.isConnected()) {
                            throw new ClientDisconnectedException(client);
                        }
                        if (clientBean.pack.returnValue == null) {
                            throw new TimeoutException(String.format("The method %s read timeout", clientBean.pack.methodName));
                        }
                    }
                    clientBean.syncRoot.reset();
                }
                if (clientBean.pack.errorMessage != null) {
                    throw new RemotingException(clientBean.pack.errorMessage);
                }
            }
            catch (ClientDisconnectedException e2) {
                if (!client.getConfig().isEnableReconnect()) {
                    pool.returnClient(client);
                    sync.v = null;
                    throw e2;
                }
                if (eventArgs == null) {
                    throw e2;
                }
                waitBeans = Remoting.getClientBeans(client);
                waitBeans.put(clientBean.pack.id, clientBean);
                if (!clientBean.syncRoot.waitOne(client.getConfig().getConnectTimeoutMillis()) && clientBean.pack.returnValue == null) {
                    eventArgs.setError((Throwable)((Object)e2));
                    throw e2;
                }
                clientBean.syncRoot.reset();
            }
            catch (Throwable e3) {
                if (eventArgs != null) {
                    eventArgs.setError(e3);
                }
                throw e3;
            }
            finally {
                if (eventArgs != null) {
                    Sys.log(eventArgs, msg -> {
                        msg.appendLine("Client invoke %s.%s [%s -> %s]", contract.getSimpleName(), methodMessage.methodName, Sockets.toString(client.getLocalEndpoint()), Sockets.toString(client.getConfig().getServerEndpoint()));
                        msg.appendLine("Request:\t%s", Sys.toJsonString(methodMessage.parameters)).appendLine("Response:\t%s", clientBean.pack == null ? "NULL" : Sys.toJsonString(clientBean.pack.returnValue));
                        if (eventArgs.getError() != null) {
                            msg.appendLine("Error:\t%s", eventArgs.getError().getMessage());
                        }
                    });
                }
                if (waitBeans != null) {
                    waitBeans.remove(clientBean.pack.id);
                    if (waitBeans.isEmpty()) {
                        $ e2 = sync;
                        synchronized (e2) {
                            sync.v = pool.returnClient(client);
                        }
                    }
                }
            }
            return clientBean.pack != null ? clientBean.pack.returnValue : null;
        });
    }

    private static void init(StatefulTcpClient client, Object proxyObject, FastThreadLocal<Boolean> isCompute) {
        client.onError.combine((s, e) -> e.setCancel(true));
        client.onReceive.combine((s, e) -> {
            if (Extends.tryAs(e.getValue(), EventMessage.class, x -> {
                switch (x.flag) {
                    case COMPUTE_ARGS: 
                    case BROADCAST: {
                        try {
                            isCompute.set((Object)true);
                            EventPublisher target = (EventPublisher)proxyObject;
                            target.raiseEvent(x.eventName, x.eventArgs);
                            log.info("clientSide event {} -> {} OK & args={}", new Object[]{x.eventName, x.flag, Sys.toJsonString(x.eventArgs)});
                            if (x.flag != EventFlag.COMPUTE_ARGS) break;
                            s.send((Serializable)x);
                            break;
                        }
                        catch (Exception ex) {
                            try {
                                TraceHandler.INSTANCE.log("clientSide event {} -> {}", new Object[]{x.eventName, x.flag, ex});
                                if (x.flag != EventFlag.COMPUTE_ARGS) break;
                                s.send((Serializable)x);
                                break;
                            }
                            catch (Throwable throwable) {
                                if (x.flag == EventFlag.COMPUTE_ARGS) {
                                    s.send((Serializable)x);
                                }
                                throw throwable;
                            }
                        }
                    }
                }
            })) {
                return;
            }
            MethodMessage svrPack = (MethodMessage)e.getValue();
            ClientBean clientBean = Remoting.getClientBeans(client).get(svrPack.id);
            if (clientBean == null) {
                log.warn("clientSide callback pack[{}] fail", (Object)svrPack.id);
                return;
            }
            clientBean.pack = svrPack;
            clientBean.syncRoot.set();
        });
    }

    private static Map<Integer, ClientBean> getClientBeans(StatefulTcpClient client) {
        return clientBeans.computeIfAbsent(client, k -> new ConcurrentHashMap());
    }

    private static void setReturnValue(ClientBean clientBean, Object value) {
        if (clientBean.pack == null) {
            clientBean.pack = new MethodMessage(generator.increment(), null, null, ThreadPool.traceId());
        }
        clientBean.pack.returnValue = value;
    }

    private static Object invokeSuper(Method m, DynamicProxyBean p) {
        if (m.isDefault()) {
            return Reflects.invokeDefaultMethod(m, p.getProxyObject(), p.arguments);
        }
        return p.fastInvokeSuper();
    }

    public static TcpServer register(Object contractInstance, int listenPort, boolean enableEventCompute) {
        RpcServerConfig conf = new RpcServerConfig(new TcpServerConfig(listenPort));
        if (enableEventCompute) {
            conf.setEventComputeVersion(0);
        }
        return Remoting.register(contractInstance, conf);
    }

    public static TcpServer register(@NonNull Object contractInstance, @NonNull RpcServerConfig config) {
        if (contractInstance == null) {
            throw new NullPointerException("contractInstance is marked non-null but is null");
        }
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        return Remoting.serverBeans.computeIfAbsent((Object)contractInstance, (Function<Object, ServerBean>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$register$24(org.rx.net.rpc.RpcServerConfig java.lang.Object java.lang.Object ), (Ljava/lang/Object;)Lorg/rx/net/rpc/Remoting$ServerBean;)((RpcServerConfig)config, (Object)contractInstance)).server;
    }

    private static void broadcast(ServerBean s, EventMessage p, ServerBean.EventBean eventBean, ServerBean.EventContext context) {
        List<Integer> allow = s.config.getEventBroadcastVersions();
        EventMessage pack = new EventMessage(p.eventName, EventFlag.BROADCAST);
        pack.eventArgs = context.computedArgs;
        Extends.tryAs(pack.eventArgs, RemotingEventArgs.class, x -> x.setBroadcastVersions(allow));
        for (TcpClient client : eventBean.subscribe) {
            MetadataMessage meta;
            if (!client.isConnected()) {
                eventBean.subscribe.remove(client);
                continue;
            }
            if (client == context.computingClient || (meta = (MetadataMessage)client.attr(HANDSHAKE_META_KEY)) == null || !allow.isEmpty() && !allow.contains(meta.getEventVersion())) continue;
            client.send(pack);
            log.info("serverSide event {} {} -> BROADCAST", (Object)pack.eventName, (Object)client.getRemoteEndpoint());
        }
    }

    private static /* synthetic */ ServerBean lambda$register$24(RpcServerConfig config, Object contractInstance, Object k) {
        ServerBean bean = new ServerBean(config, new TcpServer(config.getTcpConfig()));
        bean.server.onClosed.combine((s, e) -> serverBeans.remove(contractInstance));
        bean.server.onError.combine((s, e) -> {
            e.setCancel(true);
            e.getClient().send(new ErrorPacket(String.format("server error: %s", ((Throwable)e.getValue()).toString())));
        });
        bean.server.onReceive.combine((s, e) -> {
            if (Extends.tryAs(e.getValue(), EventMessage.class, p -> {
                ServerBean.EventBean eventBean = bean.eventBeans.computeIfAbsent(p.eventName, x -> new ServerBean.EventBean());
                switch (p.flag) {
                    case SUBSCRIBE: {
                        EventPublisher eventTarget = (EventPublisher)contractInstance;
                        eventTarget.attachEvent(p.eventName, (sender, args) -> {
                            ServerBean.EventBean eventBean2 = eventBean;
                            synchronized (eventBean2) {
                                ServerBean.EventContext eCtx = new ServerBean.EventContext((EventArgs)args);
                                if (config.getEventComputeVersion() == -1) {
                                    eCtx.computingClient = null;
                                } else {
                                    Linq<TcpClient> subscribes = Linq.from(eventBean.subscribe).where(x -> x.attr(HANDSHAKE_META_KEY) != null);
                                    TcpClient computingClient = config.getEventComputeVersion() == 0 ? subscribes.groupBy(x -> ((MetadataMessage)x.attr(HANDSHAKE_META_KEY)).getEventVersion(), (p1, p2) -> {
                                        int i = ThreadLocalRandom.current().nextInt(0, p2.count());
                                        return (TcpClient)p2.skip(i).first();
                                    }).orderByDescending(x -> ((MetadataMessage)x.attr(HANDSHAKE_META_KEY)).getEventVersion()).firstOrDefault() : subscribes.where(x -> ((MetadataMessage)x.attr(HANDSHAKE_META_KEY)).getEventVersion() == config.getEventComputeVersion()).orderByRand().firstOrDefault();
                                    if (computingClient == null) {
                                        log.warn("serverSide event {} subscribe empty", (Object)p.eventName);
                                    } else {
                                        eCtx.computingClient = computingClient;
                                        EventMessage pack = new EventMessage(p.eventName, EventFlag.COMPUTE_ARGS);
                                        pack.computeId = Snowflake.DEFAULT.nextId();
                                        pack.eventArgs = args;
                                        eventBean.contextMap.put(pack.computeId, eCtx);
                                        try {
                                            computingClient.send(pack);
                                            log.info("serverSide event {} {} -> COMPUTE_ARGS WAIT {}", new Object[]{pack.eventName, computingClient.getRemoteEndpoint(), s.getConfig().getConnectTimeoutMillis()});
                                            eventBean.wait(s.getConfig().getConnectTimeoutMillis());
                                        }
                                        catch (Exception ex) {
                                            TraceHandler.INSTANCE.log("serverSide event {} {} -> COMPUTE_ARGS ERROR", pack.eventName, computingClient.getRemoteEndpoint(), ex);
                                        }
                                        finally {
                                            Tasks.setTimeout(() -> eventBean.contextMap.remove(pack.computeId), (long)s.getConfig().getConnectTimeoutMillis() * 2L);
                                        }
                                    }
                                }
                                Remoting.broadcast(bean, p, eventBean, eCtx);
                            }
                        }, false);
                        log.info("serverSide event {} {} -> SUBSCRIBE", (Object)p.eventName, (Object)e.getClient().getRemoteEndpoint());
                        eventBean.subscribe.add(e.getClient());
                        break;
                    }
                    case UNSUBSCRIBE: {
                        log.info("serverSide event {} {} -> UNSUBSCRIBE", (Object)p.eventName, (Object)e.getClient().getRemoteEndpoint());
                        eventBean.subscribe.remove(e.getClient());
                        break;
                    }
                    case PUBLISH: {
                        ServerBean.EventBean eventBean2 = eventBean;
                        synchronized (eventBean2) {
                            log.info("serverSide event {} {} -> PUBLISH", (Object)p.eventName, (Object)e.getClient().getRemoteEndpoint());
                            Remoting.broadcast(bean, p, eventBean, new ServerBean.EventContext(p.eventArgs, e.getClient()));
                            break;
                        }
                    }
                    case COMPUTE_ARGS: {
                        ServerBean.EventBean eventBean3 = eventBean;
                        synchronized (eventBean3) {
                            ServerBean.EventContext ctx = eventBean.contextMap.get(p.computeId);
                            if (ctx == null) {
                                log.warn("serverSide event {} [{}] -> COMPUTE_ARGS FAIL", (Object)p.eventName, (Object)p.computeId);
                            } else {
                                BeanMapper.DEFAULT.map((Object)p.eventArgs, ctx.computedArgs);
                                log.info("serverSide event {} {} -> COMPUTE_ARGS OK & args={}", new Object[]{p.eventName, ctx.computingClient.getRemoteEndpoint(), Sys.toJsonString(ctx.computedArgs)});
                            }
                            eventBean.notifyAll();
                            break;
                        }
                    }
                }
            })) {
                return;
            }
            if (Extends.tryAs(e.getValue(), MetadataMessage.class, p -> e.getClient().attr(HANDSHAKE_META_KEY, p))) {
                log.debug("Handshake: {}", (Object)Sys.toJsonString(e.getValue()));
                return;
            }
            MethodMessage pack = (MethodMessage)e.getValue();
            ProceedEventArgs args = new ProceedEventArgs(contractInstance.getClass(), pack.parameters, false);
            try {
                pack.returnValue = RemotingContext.invoke(() -> args.proceed(() -> {
                    String tn = RxConfig.INSTANCE.getThreadPool().getTraceName();
                    if (tn != null) {
                        ThreadPool.startTrace(pack.traceId, true);
                    }
                    try {
                        Object t = Reflects.invokeMethod(contractInstance, pack.methodName, pack.parameters);
                        return t;
                    }
                    finally {
                        ThreadPool.endTrace();
                    }
                }), s, e.getClient());
            }
            catch (Throwable ex) {
                Throwable cause = Extends.ifNull(ex.getCause(), ex);
                args.setError(ex);
                pack.errorMessage = String.format("%s %s", cause.getClass().getSimpleName(), cause.getMessage());
            }
            finally {
                Sys.log(args, msg -> {
                    msg.appendLine("Server invoke %s.%s [%s]-> %s", contractInstance.getClass().getSimpleName(), pack.methodName, s.getConfig().getListenPort(), Sockets.toString(e.getClient().getRemoteEndpoint()));
                    msg.appendLine("Request:\t%s", Sys.toJsonString(args.getParameters())).appendLine("Response:\t%s", Sys.toJsonString(args.getReturnValue()));
                    if (args.getError() != null) {
                        msg.appendLine("Error:\t%s", pack.errorMessage);
                    }
                });
            }
            Arrays.fill(pack.parameters, null);
            e.getClient().send(pack);
        });
        bean.server.start();
        return bean;
    }

    public static class ServerBean {
        final RpcServerConfig config;
        final TcpServer server;
        final Map<String, EventBean> eventBeans = new ConcurrentHashMap<String, EventBean>();

        public ServerBean(RpcServerConfig config, TcpServer server) {
            this.config = config;
            this.server = server;
        }

        static class EventBean {
            final Set<TcpClient> subscribe = ConcurrentHashMap.newKeySet();
            final Map<Long, EventContext> contextMap = new ConcurrentHashMap<Long, EventContext>();

            EventBean() {
            }
        }

        static class EventContext {
            final EventArgs computedArgs;
            volatile TcpClient computingClient;

            public EventContext(EventArgs computedArgs, TcpClient computingClient) {
                this.computedArgs = computedArgs;
                this.computingClient = computingClient;
            }

            public EventContext(EventArgs computedArgs) {
                this.computedArgs = computedArgs;
            }
        }
    }

    public static class ClientBean {
        final ResetEventWait syncRoot = new ResetEventWait();
        MethodMessage pack;
    }
}

