package org.rx.net.rpc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.NonNull;
import org.rx.bean.DateTime;
import org.rx.bean.IdGenerator;
import org.rx.core.App;
import org.rx.core.Constants;
import org.rx.core.Delegate;
import org.rx.core.Disposable;
import org.rx.core.EventArgs;
import org.rx.core.EventTarget;
import org.rx.core.Extends;
import org.rx.core.NQuery;
import org.rx.core.RunFlag;
import org.rx.core.StringBuilder;
import org.rx.core.ThreadPool;
import org.rx.exception.ExceptionHandler;
import org.rx.exception.InvalidException;
import org.rx.net.Sockets;
import org.rx.net.TransportUtil;
import org.rx.net.rpc.protocol.HandshakePacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/rx/net/rpc/RpcServer.class */
public class RpcServer extends Disposable implements EventTarget<RpcServer> {
    private static final Logger log = LoggerFactory.getLogger(RpcServer.class);
    public static final ThreadPool SCHEDULER = new ThreadPool(RpcServerConfig.REACTOR_NAME);
    final RpcServerConfig config;
    ServerBootstrap bootstrap;
    volatile Channel serverChannel;
    public final Delegate<RpcServer, RpcServerEventArgs<Serializable>> onConnected = Delegate.create();
    public final Delegate<RpcServer, RpcServerEventArgs<Serializable>> onDisconnected = Delegate.create();
    public final Delegate<RpcServer, RpcServerEventArgs<Serializable>> onSend = Delegate.create();
    public final Delegate<RpcServer, RpcServerEventArgs<Serializable>> onReceive = Delegate.create();
    public final Delegate<RpcServer, RpcServerEventArgs<Long>> onPing = Delegate.create();
    public final Delegate<RpcServer, RpcServerEventArgs<Throwable>> onError = Delegate.create();
    public final Delegate<RpcServer, EventArgs> onClosed = Delegate.create();
    final Map<InetSocketAddress, ClientHandler> clients = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/rx/net/rpc/RpcServer$ClientHandler.class */
    public class ClientHandler extends ChannelInboundHandlerAdapter implements RpcClientMeta {
        final HandshakePacket handshakePacket = new HandshakePacket();
        transient Channel channel;
        InetSocketAddress remoteEndpoint;
        DateTime connectedTime;

        ClientHandler() {
        }

        @Override // org.rx.net.rpc.RpcClientMeta
        public boolean isConnected() {
            return this.channel != null && this.channel.isActive();
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            this.channel = channelHandlerContext.channel();
            RpcServer.log.debug("serverActive {}", this.channel.remoteAddress());
            if (RpcServer.this.clients.size() > RpcServer.this.config.getCapacity()) {
                RpcServer.log.warn("Force close client, Not enough capacity {}/{}.", Integer.valueOf(RpcServer.this.clients.size()), Integer.valueOf(RpcServer.this.config.getCapacity()));
                Sockets.closeOnFlushed(this.channel);
                return;
            }
            Map<InetSocketAddress, ClientHandler> map = RpcServer.this.clients;
            InetSocketAddress inetSocketAddress = (InetSocketAddress) this.channel.remoteAddress();
            this.remoteEndpoint = inetSocketAddress;
            map.put(inetSocketAddress, this);
            this.connectedTime = DateTime.now();
            RpcServerEventArgs rpcServerEventArgs = new RpcServerEventArgs(this, null);
            RpcServer.this.raiseEvent(RpcServer.this.onConnected, (Delegate<RpcServer, RpcServerEventArgs<Serializable>>) rpcServerEventArgs);
            if (rpcServerEventArgs.isCancel()) {
                RpcServer.log.warn("Force close client");
                Sockets.closeOnFlushed(this.channel);
            }
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
            Channel channel = channelHandlerContext.channel();
            RpcServer.log.debug("serverRead {} {}", channel.remoteAddress(), obj.getClass());
            Serializable serializable = (Serializable) Extends.as(obj, Serializable.class);
            if (serializable == null) {
                RpcServer.log.warn("serverRead discard");
                Sockets.closeOnFlushed(channel);
            } else if (Extends.tryAs(serializable, HandshakePacket.class, handshakePacket -> {
                getHandshakePacket().setEventVersion(handshakePacket.getEventVersion());
            })) {
                RpcServer.log.debug("Handshake: {}", App.toJsonString(getHandshakePacket()));
            } else {
                if (Extends.tryAs(serializable, Long.class, l -> {
                    channelHandlerContext.writeAndFlush(l);
                    RpcServer.this.raiseEventAsync((Delegate<RpcServer, Delegate<RpcServer, RpcServerEventArgs<Long>>>) RpcServer.this.onPing, (Delegate<RpcServer, RpcServerEventArgs<Long>>) new RpcServerEventArgs(this, l));
                    RpcServer.log.debug("serverHeartbeat pong {}", channel.remoteAddress());
                })) {
                    return;
                }
                RpcServer.this.raiseEventAsync((Delegate<RpcServer, Delegate<RpcServer, RpcServerEventArgs<Serializable>>>) RpcServer.this.onReceive, (Delegate<RpcServer, RpcServerEventArgs<Serializable>>) new RpcServerEventArgs(this, serializable));
            }
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            RpcServer.log.info("serverInactive {}", channelHandlerContext.channel().remoteAddress());
            RpcServer.this.clients.remove(getRemoteEndpoint());
            RpcServer.this.raiseEventAsync((Delegate<RpcServer, Delegate<RpcServer, RpcServerEventArgs<Serializable>>>) RpcServer.this.onDisconnected, (Delegate<RpcServer, RpcServerEventArgs<Serializable>>) new RpcServerEventArgs(this, null));
        }

        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) {
            if ((obj instanceof IdleStateEvent) && ((IdleStateEvent) obj).state() == IdleState.READER_IDLE) {
                RpcServer.log.warn("serverHeartbeat loss {}", channelHandlerContext.channel().remoteAddress());
                channelHandlerContext.close();
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            Channel channel = channelHandlerContext.channel();
            ExceptionHandler.INSTANCE.log("serverCaught {}", channel.remoteAddress(), th);
            if (channel.isActive()) {
                RpcServerEventArgs rpcServerEventArgs = new RpcServerEventArgs(this, th);
                Extends.quietly(() -> {
                    RpcServer.this.raiseEvent(RpcServer.this.onError, (Delegate<RpcServer, RpcServerEventArgs<Throwable>>) rpcServerEventArgs);
                });
                if (rpcServerEventArgs.isCancel()) {
                    return;
                }
                Sockets.closeOnFlushed(channel);
            }
        }

        @Override // org.rx.net.rpc.RpcClientMeta
        public HandshakePacket getHandshakePacket() {
            return this.handshakePacket;
        }

        @Override // org.rx.net.rpc.RpcClientMeta
        public InetSocketAddress getRemoteEndpoint() {
            return this.remoteEndpoint;
        }

        @Override // org.rx.net.rpc.RpcClientMeta
        public DateTime getConnectedTime() {
            return this.connectedTime;
        }
    }

    @Override // org.rx.core.EventTarget
    @NonNull
    public ThreadPool asyncScheduler() {
        return SCHEDULER;
    }

    public boolean isStarted() {
        return this.serverChannel != null;
    }

    @Override // org.rx.core.EventTarget
    public <TArgs extends EventArgs> CompletableFuture<Void> raiseEventAsync(Delegate<RpcServer, TArgs> delegate, TArgs targs) {
        return asyncScheduler().runAsync(() -> {
            raiseEvent((Delegate<TSender, Delegate>) delegate, (Delegate) targs);
        }, String.format("ServerEvent%s", Integer.valueOf(IdGenerator.DEFAULT.increment())), RunFlag.PRIORITY.flags());
    }

    public Map<InetSocketAddress, RpcClientMeta> getClients() {
        return Collections.unmodifiableMap(this.clients);
    }

    @Override // org.rx.core.Disposable
    protected void freeObjects() {
        if (isStarted()) {
            Sockets.closeOnFlushed(this.serverChannel);
        }
        Sockets.closeBootstrap(this.bootstrap);
        raiseEvent(this.onClosed, (Delegate<RpcServer, EventArgs>) EventArgs.EMPTY);
    }

    public synchronized void start() {
        if (isStarted()) {
            throw new InvalidException("Server has started", new Object[0]);
        }
        if (this.bootstrap != null) {
            Sockets.closeBootstrap(this.bootstrap);
        }
        this.bootstrap = Sockets.serverBootstrap(this.config, socketChannel -> {
            ChannelPipeline addLast = socketChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(60, 0, 0)});
            TransportUtil.addFrontendHandler(socketChannel, this.config);
            addLast.addLast(new ChannelHandler[]{RpcClientConfig.DEFAULT_ENCODER, new ObjectDecoder(Constants.MAX_HEAP_BUF_SIZE, RpcClientConfig.DEFAULT_CLASS_RESOLVER), new ClientHandler()});
        }).option(ChannelOption.SO_REUSEADDR, true);
        this.bootstrap.bind(this.config.getListenPort()).addListeners(new GenericFutureListener[]{Sockets.logBind(this.config.getListenPort()), channelFuture -> {
            if (channelFuture.isSuccess()) {
                this.serverChannel = channelFuture.channel();
            }
        }});
    }

    public String dumpClients() {
        StringBuilder stringBuilder = new StringBuilder();
        int i = 1;
        Iterator it = NQuery.of((Iterable) this.clients.values()).orderByDescending(clientHandler -> {
            return Integer.valueOf(clientHandler.getHandshakePacket().getEventVersion());
        }).iterator();
        while (it.hasNext()) {
            stringBuilder.append("\t%s", ((RpcClientMeta) it.next()).getRemoteEndpoint());
            int i2 = i;
            i++;
            if (i2 % 3 == 0) {
                stringBuilder.appendLine();
            }
        }
        return stringBuilder.toString();
    }

    public RpcClientMeta getClient(InetSocketAddress inetSocketAddress) {
        return getHandler(inetSocketAddress, true);
    }

    protected ClientHandler getHandler(InetSocketAddress inetSocketAddress, boolean z) {
        checkNotClosed();
        ClientHandler clientHandler = this.clients.get(inetSocketAddress);
        if (clientHandler == null && z) {
            throw new ClientDisconnectedException(inetSocketAddress);
        }
        return clientHandler;
    }

    public void send(InetSocketAddress inetSocketAddress, Serializable serializable) {
        send(getClient(inetSocketAddress), serializable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void send(@NonNull RpcClientMeta rpcClientMeta, Serializable serializable) {
        ClientHandler handler;
        if (rpcClientMeta == null) {
            throw new NullPointerException("client is marked non-null but is null");
        }
        checkNotClosed();
        RpcServerEventArgs rpcServerEventArgs = new RpcServerEventArgs(rpcClientMeta, serializable);
        raiseEvent(this.onSend, (Delegate<RpcServer, RpcServerEventArgs<Serializable>>) rpcServerEventArgs);
        if (rpcServerEventArgs.isCancel() || (handler = getHandler(rpcClientMeta.getRemoteEndpoint(), false)) == null || !handler.isConnected()) {
            log.warn("The client {} disconnected", rpcClientMeta.getRemoteEndpoint());
        } else {
            handler.channel.writeAndFlush(serializable);
            log.debug("serverWrite {} {}", handler.channel.remoteAddress(), serializable.getClass());
        }
    }

    public void close(@NonNull RpcClientMeta rpcClientMeta) {
        if (rpcClientMeta == null) {
            throw new NullPointerException("client is marked non-null but is null");
        }
        ClientHandler handler = getHandler(rpcClientMeta.getRemoteEndpoint(), false);
        if (handler == null) {
            return;
        }
        Sockets.closeOnFlushed(handler.channel);
    }

    public RpcServer(RpcServerConfig rpcServerConfig) {
        this.config = rpcServerConfig;
    }

    public RpcServerConfig getConfig() {
        return this.config;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1022693298:
                if (implMethodName.equals("lambda$dumpClients$18f9a37c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/rx/util/function/BiFunc") && serializedLambda.getFunctionalInterfaceMethodName().equals("invoke") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/rx/net/rpc/RpcServer") && serializedLambda.getImplMethodSignature().equals("(Lorg/rx/net/rpc/RpcServer$ClientHandler;)Ljava/lang/Integer;")) {
                    return clientHandler -> {
                        return Integer.valueOf(clientHandler.getHandshakePacket().getEventVersion());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
