package org.rx.net.transport;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import lombok.NonNull;
import org.rx.core.Constants;
import org.rx.core.Delegate;
import org.rx.core.Disposable;
import org.rx.core.EventArgs;
import org.rx.core.EventPublisher;
import org.rx.core.Extends;
import org.rx.core.Linq;
import org.rx.core.NEventArgs;
import org.rx.core.RunFlag;
import org.rx.core.StringBuilder;
import org.rx.core.ThreadPool;
import org.rx.exception.InvalidException;
import org.rx.exception.TraceHandler;
import org.rx.net.Sockets;
import org.rx.net.transport.protocol.PingPacket;
import org.rx.util.IdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/rx/net/transport/TcpServer.class */
public class TcpServer extends Disposable implements EventPublisher<TcpServer> {
    private static final Logger log = LoggerFactory.getLogger(TcpServer.class);
    static final ThreadPool SCHEDULER = new ThreadPool(Sockets.ReactorNames.RPC);
    final TcpServerConfig config;
    ServerBootstrap bootstrap;
    volatile Channel serverChannel;
    public final Delegate<TcpServer, TcpServerEventArgs<Serializable>> onConnected = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<Serializable>> onDisconnected = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<Serializable>> onSend = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<Serializable>> onReceive = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<PingPacket>> onPing = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<Throwable>> onError = Delegate.create();
    public final Delegate<TcpServer, EventArgs> onClosed = Delegate.create();
    final Map<InetSocketAddress, ClientImpl> clients = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/rx/net/transport/TcpServer$ClientImpl.class */
    public class ClientImpl extends ChannelInboundHandlerAdapter implements TcpClient {
        final Delegate<TcpClient, NEventArgs<Serializable>> onReceive = Delegate.create();
        Channel channel;
        InetSocketAddress remoteEndpoint;

        ClientImpl() {
        }

        @Override // org.rx.net.transport.TcpClient
        public boolean isConnected() {
            return this.channel != null && this.channel.isActive();
        }

        @Override // org.rx.net.transport.TcpClient
        public void send(Serializable serializable) {
            TcpServer.this.checkNotClosed();
            TcpServerEventArgs tcpServerEventArgs = new TcpServerEventArgs(this, serializable);
            TcpServer.this.raiseEvent(TcpServer.this.onSend, (Delegate<TcpServer, TcpServerEventArgs<Serializable>>) tcpServerEventArgs);
            if (tcpServerEventArgs.isCancel() || !isConnected()) {
                TcpServer.log.warn("Send cancelled or client {} disconnected", this.remoteEndpoint);
            } else {
                this.channel.writeAndFlush(serializable);
                TcpServer.log.debug("serverWrite {} {}", this.channel.remoteAddress(), serializable.getClass());
            }
        }

        @Override // org.rx.net.transport.TcpClient
        public Delegate<TcpClient, NEventArgs<Serializable>> onReceive() {
            return this.onReceive;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            Sockets.closeOnFlushed(this.channel);
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            this.channel = channelHandlerContext.channel();
            TcpServer.log.debug("serverActive {}", this.channel.remoteAddress());
            if (TcpServer.this.clients.size() > TcpServer.this.config.getCapacity()) {
                TcpServer.log.warn("Force close client, Not enough capacity {}/{}.", Integer.valueOf(TcpServer.this.clients.size()), Integer.valueOf(TcpServer.this.config.getCapacity()));
                Sockets.closeOnFlushed(this.channel);
                return;
            }
            Map<InetSocketAddress, ClientImpl> map = TcpServer.this.clients;
            InetSocketAddress inetSocketAddress = (InetSocketAddress) this.channel.remoteAddress();
            this.remoteEndpoint = inetSocketAddress;
            map.put(inetSocketAddress, this);
            TcpServerEventArgs tcpServerEventArgs = new TcpServerEventArgs(this, null);
            TcpServer.this.raiseEvent(TcpServer.this.onConnected, (Delegate<TcpServer, TcpServerEventArgs<Serializable>>) tcpServerEventArgs);
            if (tcpServerEventArgs.isCancel()) {
                TcpServer.log.warn("Force close client");
                Sockets.closeOnFlushed(this.channel);
            }
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
            Channel channel = channelHandlerContext.channel();
            TcpServer.log.debug("serverRead {} {}", channel.remoteAddress(), obj.getClass());
            Serializable serializable = (Serializable) Extends.as(obj, Serializable.class);
            if (serializable == null) {
                TcpServer.log.warn("serverRead discard");
                Sockets.closeOnFlushed(channel);
            } else {
                if (Extends.tryAs(serializable, PingPacket.class, pingPacket -> {
                    channelHandlerContext.writeAndFlush(pingPacket);
                    TcpServer.this.raiseEventAsync((Delegate<TcpServer, Delegate<TcpServer, TcpServerEventArgs<PingPacket>>>) TcpServer.this.onPing, (Delegate<TcpServer, TcpServerEventArgs<PingPacket>>) new TcpServerEventArgs(this, pingPacket));
                    TcpServer.log.debug("serverHeartbeat pong {}", channel.remoteAddress());
                })) {
                    return;
                }
                TcpServerEventArgs tcpServerEventArgs = new TcpServerEventArgs(this, serializable);
                raiseEvent(this.onReceive, (Delegate<TcpClient, NEventArgs<Serializable>>) tcpServerEventArgs);
                TcpServer.this.raiseEventAsync((Delegate<TcpServer, Delegate<TcpServer, TcpServerEventArgs<Serializable>>>) TcpServer.this.onReceive, (Delegate<TcpServer, TcpServerEventArgs<Serializable>>) tcpServerEventArgs);
            }
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            TcpServer.log.debug("serverInactive {}", channelHandlerContext.channel().remoteAddress());
            TcpServer.this.clients.remove(getRemoteEndpoint());
            TcpServer.this.raiseEventAsync((Delegate<TcpServer, Delegate<TcpServer, TcpServerEventArgs<Serializable>>>) TcpServer.this.onDisconnected, (Delegate<TcpServer, TcpServerEventArgs<Serializable>>) new TcpServerEventArgs(this, null));
        }

        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) {
            if ((obj instanceof IdleStateEvent) && ((IdleStateEvent) obj).state() == IdleState.READER_IDLE) {
                TcpServer.log.warn("serverHeartbeat loss {}", channelHandlerContext.channel().remoteAddress());
                channelHandlerContext.close();
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            Channel channel = channelHandlerContext.channel();
            TraceHandler.INSTANCE.log("serverCaught {}", channel.remoteAddress(), th);
            if (channel.isActive()) {
                TcpServerEventArgs tcpServerEventArgs = new TcpServerEventArgs(this, th);
                Extends.quietly(() -> {
                    TcpServer.this.raiseEvent(TcpServer.this.onError, (Delegate<TcpServer, TcpServerEventArgs<Throwable>>) tcpServerEventArgs);
                });
                if (tcpServerEventArgs.isCancel()) {
                    return;
                }
                Sockets.closeOnFlushed(channel);
            }
        }

        @Override // org.rx.net.transport.TcpClient
        public void connect(InetSocketAddress inetSocketAddress) {
            TcpServer.this.checkNotClosed();
            if (!isConnected()) {
                throw new UnsupportedOperationException();
            }
        }

        @Override // org.rx.net.transport.TcpClient
        public Future<Void> connectAsync(InetSocketAddress inetSocketAddress) {
            TcpServer.this.checkNotClosed();
            if (isConnected()) {
                return CompletableFuture.completedFuture(null);
            }
            throw new UnsupportedOperationException();
        }

        @Override // org.rx.net.transport.TcpClient
        public Channel getChannel() {
            return this.channel;
        }

        @Override // org.rx.net.transport.TcpClient
        public InetSocketAddress getRemoteEndpoint() {
            return this.remoteEndpoint;
        }
    }

    @Override // org.rx.core.EventPublisher
    @NonNull
    public ThreadPool asyncScheduler() {
        return SCHEDULER;
    }

    public boolean isStarted() {
        return this.serverChannel != null;
    }

    @Override // org.rx.core.EventPublisher
    public <TArgs extends EventArgs> CompletableFuture<Void> raiseEventAsync(Delegate<TcpServer, TArgs> delegate, TArgs targs) {
        return asyncScheduler().runAsync(() -> {
            raiseEvent((Delegate<TSender, Delegate>) delegate, (Delegate) targs);
        }, String.format("ServerEvent%s", Integer.valueOf(IdGenerator.DEFAULT.increment())), RunFlag.PRIORITY.flags());
    }

    public Map<InetSocketAddress, TcpClient> getClients() {
        return Collections.unmodifiableMap(this.clients);
    }

    @Override // org.rx.core.Disposable
    protected void freeObjects() {
        if (isStarted()) {
            Sockets.closeOnFlushed(this.serverChannel);
        }
        Sockets.closeBootstrap(this.bootstrap);
        raiseEvent(this.onClosed, (Delegate<TcpServer, EventArgs>) EventArgs.EMPTY);
    }

    public synchronized void start() {
        if (isStarted()) {
            throw new InvalidException("Server has started", new Object[0]);
        }
        if (this.bootstrap != null) {
            Sockets.closeBootstrap(this.bootstrap);
        }
        this.bootstrap = Sockets.serverBootstrap(this.config, socketChannel -> {
            ChannelPipeline addLast = socketChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(this.config.getHeartbeatTimeout(), 0, 0)});
            Sockets.addFrontendHandler(socketChannel, this.config);
            addLast.addLast(new ChannelHandler[]{TcpClientConfig.DEFAULT_ENCODER, new ObjectDecoder(Constants.MAX_HEAP_BUF_SIZE, TcpClientConfig.DEFAULT_CLASS_RESOLVER), new ClientImpl()});
        }).option(ChannelOption.SO_REUSEADDR, true);
        this.bootstrap.bind(this.config.getListenPort()).addListeners(new GenericFutureListener[]{Sockets.logBind(this.config.getListenPort()), channelFuture -> {
            if (channelFuture.isSuccess()) {
                this.serverChannel = channelFuture.channel();
            }
        }});
    }

    public String dumpClients() {
        StringBuilder stringBuilder = new StringBuilder();
        int i = 1;
        Iterator it = Linq.from((Iterable) this.clients.values()).orderBy(clientImpl -> {
            return clientImpl.remoteEndpoint;
        }).iterator();
        while (it.hasNext()) {
            stringBuilder.appendFormat("\t%s", ((TcpClient) it.next()).getRemoteEndpoint());
            int i2 = i;
            i++;
            if (i2 % 3 == 0) {
                stringBuilder.appendLine();
            }
        }
        return stringBuilder.toString();
    }

    public TcpClient getClient(InetSocketAddress inetSocketAddress) {
        return getClient(inetSocketAddress, true);
    }

    public TcpClient getClient(InetSocketAddress inetSocketAddress, boolean z) {
        checkNotClosed();
        ClientImpl clientImpl = this.clients.get(inetSocketAddress);
        if (clientImpl == null && z) {
            throw new ClientDisconnectedException(inetSocketAddress);
        }
        return clientImpl;
    }

    public void send(InetSocketAddress inetSocketAddress, Serializable serializable) {
        getClient(inetSocketAddress).send(serializable);
    }

    public TcpServer(TcpServerConfig tcpServerConfig) {
        this.config = tcpServerConfig;
    }

    public TcpServerConfig getConfig() {
        return this.config;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1022693298:
                if (implMethodName.equals("lambda$dumpClients$18f9a37c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/rx/util/function/BiFunc") && serializedLambda.getFunctionalInterfaceMethodName().equals("invoke") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/rx/net/transport/TcpServer") && serializedLambda.getImplMethodSignature().equals("(Lorg/rx/net/transport/TcpServer$ClientImpl;)Ljava/net/InetSocketAddress;")) {
                    return clientImpl -> {
                        return clientImpl.remoteEndpoint;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
