package org.rx.socks.tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.apache.commons.collections4.CollectionUtils;
import org.rx.core.Contract;
import org.rx.core.Disposable;
import org.rx.core.EventArgs;
import org.rx.core.EventTarget;
import org.rx.core.InvalidOperationException;
import org.rx.core.NQuery;
import org.rx.core.Reflects;
import org.rx.core.Tasks;
import org.rx.socks.Sockets;
import org.rx.socks.tcp.packet.ErrorPacket;
import org.rx.socks.tcp.packet.HandshakePacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/rx/socks/tcp/TcpServer.class */
public class TcpServer<T extends Serializable> extends Disposable implements EventTarget<TcpServer<T>> {
    private static final Logger log = LoggerFactory.getLogger(TcpServer.class);
    public volatile BiConsumer<TcpServer<T>, PackEventArgs<T>> onConnected;
    public volatile BiConsumer<TcpServer<T>, PackEventArgs<T>> onDisconnected;
    public volatile BiConsumer<TcpServer<T>, PackEventArgs<T>> onSend;
    public volatile BiConsumer<TcpServer<T>, PackEventArgs<T>> onReceive;
    public volatile BiConsumer<TcpServer<T>, ErrorEventArgs<T>> onError;
    public volatile BiConsumer<TcpServer<T>, EventArgs> onClosed;
    private final TcpConfig config;
    private final Class<T> stateType;
    private ServerBootstrap bootstrap;
    private SslContext sslCtx;
    private volatile boolean isStarted;
    private final Map<String, Set<SessionClient<T>>> clients = new ConcurrentHashMap();
    private volatile int capacity = 1000000;

    /* loaded from: input_file:org/rx/socks/tcp/TcpServer$PacketServerHandler.class */
    private class PacketServerHandler extends ChannelInboundHandlerAdapter {
        private SessionClient<T> client;

        private PacketServerHandler() {
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            TcpServer.log.debug("serverActive {}", channelHandlerContext.channel().remoteAddress());
            if (TcpServer.this.getClientSize() > TcpServer.this.getCapacity()) {
                TcpServer.log.warn("Not enough capacity");
                Sockets.closeOnFlushed(channelHandlerContext.channel());
                return;
            }
            this.client = new SessionClient<>(channelHandlerContext, TcpServer.this.getStateType() == null ? null : (Serializable) Reflects.newInstance(TcpServer.this.getStateType()));
            PackEventArgs packEventArgs = new PackEventArgs(this.client, null);
            TcpServer.this.raiseEvent(TcpServer.this.onConnected, (BiConsumer<TcpServer<T>, PackEventArgs<T>>) packEventArgs);
            if (packEventArgs.isCancel()) {
                TcpServer.log.warn("Close client");
                Sockets.closeOnFlushed(channelHandlerContext.channel());
            }
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
            TcpServer.log.debug("serverRead {} {}", channelHandlerContext.channel().remoteAddress(), obj.getClass());
            Serializable serializable = (Serializable) Contract.as(obj, Serializable.class);
            if (serializable == null) {
                channelHandlerContext.writeAndFlush(new ErrorPacket("Packet discard"));
                Sockets.closeOnFlushed(channelHandlerContext.channel());
            } else {
                if (Contract.tryAs(serializable, HandshakePacket.class, handshakePacket -> {
                    this.client.setGroupId(handshakePacket.getGroupId());
                    TcpServer.this.addClient(this.client);
                })) {
                    return;
                }
                if (this.client.getGroupId() == null) {
                    TcpServer.log.warn("ServerHandshake fail");
                } else {
                    Tasks.run(() -> {
                        TcpServer.this.raiseEvent(TcpServer.this.onReceive, (BiConsumer<TcpServer<T>, PackEventArgs<T>>) new PackEventArgs(this.client, serializable));
                    });
                }
            }
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            TcpServer.log.debug("serverInactive {}", channelHandlerContext.channel().remoteAddress());
            TcpServer.this.removeClient(this.client);
            TcpServer.this.raiseEvent(TcpServer.this.onDisconnected, (BiConsumer<TcpServer<T>, PackEventArgs<T>>) new PackEventArgs(this.client, null));
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            TcpServer.log.error("serverCaught {}", channelHandlerContext.channel().remoteAddress(), th);
            if (channelHandlerContext.channel().isActive()) {
                ErrorEventArgs errorEventArgs = new ErrorEventArgs(this.client, th);
                try {
                    TcpServer.this.raiseEvent(TcpServer.this.onError, (BiConsumer<TcpServer<T>, ErrorEventArgs<T>>) errorEventArgs);
                } catch (Exception e) {
                    TcpServer.log.error("serverCaught", e);
                }
                if (errorEventArgs.isCancel()) {
                    return;
                }
                Sockets.closeOnFlushed(channelHandlerContext.channel());
            }
        }
    }

    public int getClientSize() {
        return this.clients.size();
    }

    @Override // org.rx.core.Disposable
    protected void freeObjects() {
        Sockets.closeBootstrap(this.bootstrap);
        this.isStarted = false;
        raiseEvent(this.onClosed, (BiConsumer<TcpServer<T>, EventArgs>) EventArgs.Empty);
    }

    public void start() {
        start(false);
    }

    public void start(boolean z) {
        if (this.isStarted) {
            throw new InvalidOperationException("Server has started", new Object[0]);
        }
        if (this.config.isEnableSsl()) {
            SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
            this.sslCtx = SslContextBuilder.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()).build();
        }
        this.bootstrap = Sockets.serverBootstrap(1, this.config.getWorkThread(), this.config.getMemoryMode(), socketChannel -> {
            ChannelPipeline pipeline = socketChannel.pipeline();
            if (this.sslCtx != null) {
                pipeline.addLast(new ChannelHandler[]{this.sslCtx.newHandler(socketChannel.alloc())});
            }
            if (this.config.isEnableCompress()) {
                pipeline.addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)});
                pipeline.addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)});
            }
            pipeline.addLast(new ChannelHandler[]{new ObjectEncoder(), new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(TcpConfig.class.getClassLoader())), new PacketServerHandler()});
        }).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.config.getConnectTimeout()));
        ChannelFuture addListeners = this.bootstrap.bind(this.config.getEndpoint()).addListeners(new GenericFutureListener[]{Sockets.FireExceptionThenCloseOnFailure, future -> {
            if (future.isSuccess()) {
                this.isStarted = true;
                log.debug("Listened on port {}..", this.config.getEndpoint());
            }
        }});
        if (z) {
            addListeners.channel().closeFuture().sync();
        }
    }

    protected void addClient(SessionClient<T> sessionClient) {
        Contract.require(sessionClient.getGroupId());
        this.clients.computeIfAbsent(sessionClient.getGroupId(), str -> {
            return Collections.synchronizedSet(new HashSet());
        }).add(sessionClient);
    }

    protected void removeClient(SessionClient<T> sessionClient) {
        Set<SessionClient<T>> clients = getClients(sessionClient.getGroupId());
        if (clients.removeIf(sessionClient2 -> {
            return sessionClient2 == sessionClient;
        }) && clients.isEmpty()) {
            this.clients.remove(sessionClient.getGroupId());
        }
    }

    public Set<SessionClient<T>> getClients(String str) {
        Contract.require(str);
        Set<SessionClient<T>> set = this.clients.get(str);
        if (CollectionUtils.isEmpty(set)) {
            throw new InvalidOperationException(String.format("GroupId %s not found", str), new Object[0]);
        }
        return set;
    }

    public SessionClient<T> getClient(String str, ChannelId channelId) {
        SessionClient<T> sessionClient = (SessionClient) NQuery.of((Collection) getClients(str)).where(sessionClient2 -> {
            return sessionClient2.getId().equals(channelId);
        }).singleOrDefault();
        if (sessionClient == null) {
            throw new InvalidOperationException(String.format("GroupId %s with ClientId %s not found", str, channelId), new Object[0]);
        }
        return sessionClient;
    }

    public void send(SessionClient<T> sessionClient, Serializable serializable) {
        checkNotClosed();
        Contract.require(sessionClient, serializable);
        PackEventArgs packEventArgs = new PackEventArgs(sessionClient, serializable);
        raiseEvent(this.onSend, (BiConsumer<TcpServer<T>, PackEventArgs<T>>) packEventArgs);
        if (packEventArgs.isCancel()) {
            return;
        }
        sessionClient.ctx.writeAndFlush(serializable).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
    }

    public TcpServer(TcpConfig tcpConfig, Class<T> cls) {
        this.config = tcpConfig;
        this.stateType = cls;
    }

    public TcpConfig getConfig() {
        return this.config;
    }

    public Class<T> getStateType() {
        return this.stateType;
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    public int getCapacity() {
        return this.capacity;
    }

    public void setCapacity(int i) {
        this.capacity = i;
    }
}
