/*
 * Decompiled with CFR 0.152.
 */
package org.rx.net.transport;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import lombok.NonNull;
import org.rx.core.Delegate;
import org.rx.core.Disposable;
import org.rx.core.EventArgs;
import org.rx.core.EventPublisher;
import org.rx.core.Extends;
import org.rx.core.Linq;
import org.rx.core.NEventArgs;
import org.rx.core.RunFlag;
import org.rx.core.StringBuilder;
import org.rx.core.ThreadPool;
import org.rx.exception.InvalidException;
import org.rx.exception.TraceHandler;
import org.rx.net.Sockets;
import org.rx.net.transport.ClientDisconnectedException;
import org.rx.net.transport.TcpClient;
import org.rx.net.transport.TcpClientConfig;
import org.rx.net.transport.TcpServerConfig;
import org.rx.net.transport.TcpServerEventArgs;
import org.rx.net.transport.protocol.PingPacket;
import org.rx.util.IdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpServer
extends Disposable
implements EventPublisher<TcpServer> {
    private static final Logger log = LoggerFactory.getLogger(TcpServer.class);
    static final ThreadPool SCHEDULER = new ThreadPool("RPC");
    public final Delegate<TcpServer, TcpServerEventArgs<Serializable>> onConnected = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<Serializable>> onDisconnected = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<Serializable>> onSend = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<Serializable>> onReceive = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<PingPacket>> onPing = Delegate.create();
    public final Delegate<TcpServer, TcpServerEventArgs<Throwable>> onError = Delegate.create();
    public final Delegate<TcpServer, EventArgs> onClosed = Delegate.create();
    final TcpServerConfig config;
    final Map<InetSocketAddress, ClientImpl> clients = new ConcurrentHashMap<InetSocketAddress, ClientImpl>();
    ServerBootstrap bootstrap;
    volatile Channel serverChannel;

    @Override
    @NonNull
    public ThreadPool asyncScheduler() {
        return SCHEDULER;
    }

    public boolean isStarted() {
        return this.serverChannel != null;
    }

    @Override
    public <TArgs extends EventArgs> CompletableFuture<Void> raiseEventAsync(Delegate<TcpServer, TArgs> event, TArgs args) {
        ThreadPool scheduler = this.asyncScheduler();
        return scheduler.runAsync(() -> this.raiseEvent(event, args), (Object)String.format("ServerEvent%s", IdGenerator.DEFAULT.increment()), RunFlag.PRIORITY.flags());
    }

    public Map<InetSocketAddress, TcpClient> getClients() {
        return Collections.unmodifiableMap(this.clients);
    }

    @Override
    protected void freeObjects() {
        if (this.isStarted()) {
            Sockets.closeOnFlushed(this.serverChannel);
        }
        Sockets.closeBootstrap(this.bootstrap);
        this.raiseEvent(this.onClosed, EventArgs.EMPTY);
    }

    public synchronized void start() {
        if (this.isStarted()) {
            throw new InvalidException("Server has started", new Object[0]);
        }
        if (this.bootstrap != null) {
            Sockets.closeBootstrap(this.bootstrap);
        }
        this.bootstrap = (ServerBootstrap)Sockets.serverBootstrap(this.config, channel -> {
            ChannelPipeline pipeline = channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(this.config.getHeartbeatTimeout(), 0, 0)});
            Sockets.addFrontendHandler((Channel)channel, this.config);
            pipeline.addLast(new ChannelHandler[]{TcpClientConfig.DEFAULT_ENCODER, new ObjectDecoder(0x1000000, TcpClientConfig.DEFAULT_CLASS_RESOLVER), new ClientImpl()});
        }).option(ChannelOption.SO_REUSEADDR, (Object)true);
        this.bootstrap.bind(this.config.getListenPort()).addListeners(new GenericFutureListener[]{Sockets.logBind(this.config.getListenPort()), f -> {
            if (!f.isSuccess()) {
                return;
            }
            this.serverChannel = f.channel();
        }});
    }

    public String dumpClients() {
        StringBuilder buf = new StringBuilder();
        int i = 1;
        for (TcpClient tcpClient : Linq.from(this.clients.values()).orderBy(p -> p.remoteEndpoint)) {
            buf.append("\t%s", tcpClient.getRemoteEndpoint());
            if (i++ % 3 != 0) continue;
            buf.appendLine();
        }
        return buf.toString();
    }

    public TcpClient getClient(InetSocketAddress remoteEndpoint) {
        return this.getClient(remoteEndpoint, true);
    }

    public TcpClient getClient(InetSocketAddress remoteEp, boolean throwOnEmpty) {
        this.checkNotClosed();
        ClientImpl handler = this.clients.get(remoteEp);
        if (handler == null && throwOnEmpty) {
            throw new ClientDisconnectedException(remoteEp);
        }
        return handler;
    }

    public void send(InetSocketAddress remoteEndpoint, Serializable pack) {
        this.getClient(remoteEndpoint).send(pack);
    }

    public TcpServer(TcpServerConfig config) {
        this.config = config;
    }

    public TcpServerConfig getConfig() {
        return this.config;
    }

    class ClientImpl
    extends ChannelInboundHandlerAdapter
    implements TcpClient {
        final Delegate<TcpClient, NEventArgs<Serializable>> onReceive = Delegate.create();
        Channel channel;
        InetSocketAddress remoteEndpoint;

        ClientImpl() {
        }

        @Override
        public boolean isConnected() {
            return this.channel != null && this.channel.isActive();
        }

        @Override
        public void send(Serializable pack) {
            TcpServer.this.checkNotClosed();
            TcpServerEventArgs<Serializable> args = new TcpServerEventArgs<Serializable>(this, pack);
            TcpServer.this.raiseEvent(TcpServer.this.onSend, args);
            if (args.isCancel() || !this.isConnected()) {
                log.warn("Send cancelled or client {} disconnected", (Object)this.remoteEndpoint);
                return;
            }
            this.channel.writeAndFlush((Object)pack);
            log.debug("serverWrite {} {}", (Object)this.channel.remoteAddress(), pack.getClass());
        }

        @Override
        public Delegate<TcpClient, NEventArgs<Serializable>> onReceive() {
            return this.onReceive;
        }

        @Override
        public void close() {
            Sockets.closeOnFlushed(this.channel);
        }

        public void channelActive(ChannelHandlerContext ctx) {
            this.channel = ctx.channel();
            log.debug("serverActive {}", (Object)this.channel.remoteAddress());
            if (TcpServer.this.clients.size() > TcpServer.this.config.getCapacity()) {
                log.warn("Force close client, Not enough capacity {}/{}.", (Object)TcpServer.this.clients.size(), (Object)TcpServer.this.config.getCapacity());
                Sockets.closeOnFlushed(this.channel);
                return;
            }
            this.remoteEndpoint = (InetSocketAddress)this.channel.remoteAddress();
            TcpServer.this.clients.put(this.remoteEndpoint, this);
            TcpServerEventArgs<Object> args = new TcpServerEventArgs<Object>(this, null);
            TcpServer.this.raiseEvent(TcpServer.this.onConnected, args);
            if (args.isCancel()) {
                log.warn("Force close client");
                Sockets.closeOnFlushed(this.channel);
            }
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            Channel channel = ctx.channel();
            log.debug("serverRead {} {}", (Object)channel.remoteAddress(), msg.getClass());
            Serializable pack = Extends.as(msg, Serializable.class);
            if (pack == null) {
                log.warn("serverRead discard");
                Sockets.closeOnFlushed(channel);
                return;
            }
            if (Extends.tryAs(pack, PingPacket.class, p -> {
                ctx.writeAndFlush(p);
                TcpServer.this.raiseEventAsync(TcpServer.this.onPing, new TcpServerEventArgs<PingPacket>(this, (PingPacket)p));
                log.debug("serverHeartbeat pong {}", (Object)channel.remoteAddress());
            })) {
                return;
            }
            TcpServerEventArgs<Serializable> args = new TcpServerEventArgs<Serializable>(this, pack);
            this.raiseEvent(this.onReceive, args);
            TcpServer.this.raiseEventAsync(TcpServer.this.onReceive, args);
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            log.debug("serverInactive {}", (Object)ctx.channel().remoteAddress());
            TcpServer.this.clients.remove(this.getRemoteEndpoint());
            TcpServer.this.raiseEventAsync(TcpServer.this.onDisconnected, new TcpServerEventArgs<Object>(this, null));
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            IdleStateEvent e;
            if (evt instanceof IdleStateEvent && (e = (IdleStateEvent)evt).state() == IdleState.READER_IDLE) {
                log.warn("serverHeartbeat loss {}", (Object)ctx.channel().remoteAddress());
                ctx.close();
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            Channel channel = ctx.channel();
            TraceHandler.INSTANCE.log("serverCaught {}", channel.remoteAddress(), cause);
            if (!channel.isActive()) {
                return;
            }
            TcpServerEventArgs<Throwable> args = new TcpServerEventArgs<Throwable>(this, cause);
            Extends.quietly(() -> TcpServer.this.raiseEvent(TcpServer.this.onError, args));
            if (args.isCancel()) {
                return;
            }
            Sockets.closeOnFlushed(channel);
        }

        @Override
        public void connect(InetSocketAddress remoteEp) {
            TcpServer.this.checkNotClosed();
            if (this.isConnected()) {
                return;
            }
            throw new UnsupportedOperationException();
        }

        @Override
        public Future<Void> connectAsync(InetSocketAddress remoteEp) {
            TcpServer.this.checkNotClosed();
            if (this.isConnected()) {
                return CompletableFuture.completedFuture(null);
            }
            throw new UnsupportedOperationException();
        }

        @Override
        public Channel getChannel() {
            return this.channel;
        }

        @Override
        public InetSocketAddress getRemoteEndpoint() {
            return this.remoteEndpoint;
        }
    }
}

