package org.rx.net.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import org.rx.core.Constants;
import org.rx.core.Delegate;
import org.rx.core.Disposable;
import org.rx.core.EventArgs;
import org.rx.core.Extends;
import org.rx.core.FluentWait;
import org.rx.core.NEventArgs;
import org.rx.core.NtpClock;
import org.rx.core.Tasks;
import org.rx.core.ThreadPool;
import org.rx.exception.InvalidException;
import org.rx.exception.TraceHandler;
import org.rx.net.Sockets;
import org.rx.net.support.SocksSupport;
import org.rx.net.transport.protocol.ErrorPacket;
import org.rx.net.transport.protocol.PingPacket;
import org.rx.util.function.BiAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

/* loaded from: input_file:org/rx/net/transport/StatefulTcpClient.class */
public class StatefulTcpClient extends Disposable implements TcpClient {
    private static final Logger log = LoggerFactory.getLogger(StatefulTcpClient.class);
    static final TcpClientConfig NULL_CONF = new TcpClientConfig();
    public final Delegate<TcpClient, EventArgs> onConnected;
    public final Delegate<TcpClient, EventArgs> onDisconnected;
    public final Delegate<TcpClient, NEventArgs<InetSocketAddress>> onReconnecting;
    public final Delegate<TcpClient, NEventArgs<InetSocketAddress>> onReconnected;
    public final Delegate<TcpClient, NEventArgs<Serializable>> onSend;
    public final Delegate<TcpClient, NEventArgs<Serializable>> onReceive;
    public final Delegate<TcpClient, PingPacket> onPong;
    public final Delegate<TcpClient, NEventArgs<Throwable>> onError;
    final TcpClientConfig config;
    long sendWaitConnectMillis;
    InetSocketAddress remoteEndpoint;
    InetSocketAddress localEndpoint;
    Bootstrap bootstrap;
    volatile Channel channel;
    volatile ChannelFuture connectingFuture;
    volatile InetSocketAddress connectingEp;

    /* renamed from: org.rx.net.transport.StatefulTcpClient$2, reason: invalid class name */
    /* loaded from: input_file:org/rx/net/transport/StatefulTcpClient$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$timeout$IdleState = new int[IdleState.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$timeout$IdleState[IdleState.READER_IDLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$timeout$IdleState[IdleState.WRITER_IDLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/rx/net/transport/StatefulTcpClient$ClientHandler.class */
    class ClientHandler extends ChannelInboundHandlerAdapter {
        ClientHandler() {
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            StatefulTcpClient.log.debug("clientActive {}", channelHandlerContext.channel().remoteAddress());
            StatefulTcpClient.this.raiseEventAsync(StatefulTcpClient.this.onConnected, (Delegate<TcpClient, EventArgs>) EventArgs.EMPTY);
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
            Channel channel = channelHandlerContext.channel();
            StatefulTcpClient.log.debug("clientRead {} {}", channel.remoteAddress(), obj.getClass());
            Serializable serializable = (Serializable) Extends.as(obj, Serializable.class);
            if (serializable == null) {
                StatefulTcpClient.log.warn("clientRead discard {} {}", channel.remoteAddress(), obj.getClass());
            } else {
                if (Extends.tryAs(serializable, ErrorPacket.class, errorPacket -> {
                    exceptionCaught(channelHandlerContext, new InvalidException("Server error: {}", errorPacket.getErrorMessage()));
                }) || Extends.tryAs(serializable, PingPacket.class, pingPacket -> {
                    StatefulTcpClient.log.info("clientHeartbeat pong {} {}ms", channel.remoteAddress(), Long.valueOf(NtpClock.UTC.millis() - pingPacket.getTimestamp()));
                    StatefulTcpClient.this.raiseEventAsync(StatefulTcpClient.this.onPong, (Delegate<TcpClient, PingPacket>) pingPacket);
                })) {
                    return;
                }
                StatefulTcpClient.this.raiseEventAsync(StatefulTcpClient.this.onReceive, (Delegate<TcpClient, NEventArgs<Serializable>>) new NEventArgs(serializable));
            }
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            StatefulTcpClient.log.info("clientInactive {}", channelHandlerContext.channel().remoteAddress());
            StatefulTcpClient.this.raiseEvent(StatefulTcpClient.this.onDisconnected, (Delegate<TcpClient, EventArgs>) EventArgs.EMPTY);
            StatefulTcpClient.this.reconnectAsync();
        }

        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) {
            Channel channel = channelHandlerContext.channel();
            if (obj instanceof IdleStateEvent) {
                switch (AnonymousClass2.$SwitchMap$io$netty$handler$timeout$IdleState[((IdleStateEvent) obj).state().ordinal()]) {
                    case 1:
                        StatefulTcpClient.log.warn("clientHeartbeat loss {}", channel.remoteAddress());
                        channelHandlerContext.close();
                        return;
                    case 2:
                        StatefulTcpClient.log.debug("clientHeartbeat ping {}", channel.remoteAddress());
                        channelHandlerContext.writeAndFlush(new PingPacket());
                        return;
                    default:
                        return;
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            Channel channel = channelHandlerContext.channel();
            TraceHandler.INSTANCE.log("clientCaught {}", channel.remoteAddress(), th);
            if (channel.isActive()) {
                NEventArgs nEventArgs = new NEventArgs(th);
                Extends.quietly(() -> {
                    StatefulTcpClient.this.raiseEvent(StatefulTcpClient.this.onError, (Delegate<TcpClient, NEventArgs<Throwable>>) nEventArgs);
                });
                if (nEventArgs.isCancel()) {
                    return;
                }
                StatefulTcpClient.this.close();
            }
        }
    }

    @Override // org.rx.core.EventPublisher
    @NonNull
    public ThreadPool asyncScheduler() {
        return TcpServer.SCHEDULER;
    }

    @Override // org.rx.net.transport.TcpClient
    public boolean isConnected() {
        return this.channel != null && this.channel.isActive();
    }

    protected boolean isShouldReconnect() {
        return this.config.isEnableReconnect() && !isConnected();
    }

    public StatefulTcpClient(@NonNull TcpClientConfig tcpClientConfig) {
        this.onConnected = Delegate.create();
        this.onDisconnected = Delegate.create();
        this.onReconnecting = Delegate.create();
        this.onReconnected = Delegate.create();
        this.onSend = Delegate.create();
        this.onReceive = Delegate.create();
        this.onPong = Delegate.create();
        this.onError = Delegate.create();
        this.sendWaitConnectMillis = SocksSupport.ASYNC_TIMEOUT;
        if (tcpClientConfig == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        this.config = tcpClientConfig;
    }

    protected StatefulTcpClient() {
        this.onConnected = Delegate.create();
        this.onDisconnected = Delegate.create();
        this.onReconnecting = Delegate.create();
        this.onReconnected = Delegate.create();
        this.onSend = Delegate.create();
        this.onReceive = Delegate.create();
        this.onPong = Delegate.create();
        this.onError = Delegate.create();
        this.sendWaitConnectMillis = SocksSupport.ASYNC_TIMEOUT;
        this.config = NULL_CONF;
    }

    @Override // org.rx.core.Disposable
    protected void freeObjects() {
        this.config.setEnableReconnect(false);
        Sockets.closeOnFlushed(this.channel);
    }

    @Override // org.rx.net.transport.TcpClient
    public synchronized void connect(@NonNull InetSocketAddress inetSocketAddress) throws TimeoutException {
        if (inetSocketAddress == null) {
            throw new NullPointerException("remoteEp is marked non-null but is null");
        }
        if (isConnected()) {
            throw new InvalidException("Client has connected", new Object[0]);
        }
        this.config.setServerEndpoint(inetSocketAddress);
        this.bootstrap = Sockets.bootstrap(Sockets.ReactorNames.RPC, this.config, (BiAction<SocketChannel>) socketChannel -> {
            ChannelPipeline addLast = socketChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(this.config.getHeartbeatTimeout(), this.config.getHeartbeatTimeout() / 2, 0)});
            Sockets.addBackendHandler(socketChannel, this.config, this.config.getServerEndpoint());
            addLast.addLast(new ChannelHandler[]{TcpClientConfig.DEFAULT_ENCODER, new ObjectDecoder(Constants.MAX_HEAP_BUF_SIZE, TcpClientConfig.DEFAULT_CLASS_RESOLVER), new ClientHandler()});
        });
        TcpClientConfig tcpClientConfig = this.config;
        doConnect(false, tcpClientConfig);
        synchronized (tcpClientConfig) {
            try {
                tcpClientConfig.wait(this.config.getConnectTimeoutMillis());
            } catch (InterruptedException e) {
                throw InvalidException.sneaky(e);
            }
        }
        if (!isConnected()) {
            throw new TimeoutException(MessageFormatter.format("Client connect {} timeout", this.config.getServerEndpoint()).getMessage());
        }
    }

    @Override // org.rx.net.transport.TcpClient
    public synchronized Future<Void> connectAsync(@NonNull InetSocketAddress inetSocketAddress) {
        if (inetSocketAddress == null) {
            throw new NullPointerException("remoteEp is marked non-null but is null");
        }
        if (isConnected()) {
            throw new InvalidException("Client has connected", new Object[0]);
        }
        this.config.setServerEndpoint(inetSocketAddress);
        this.bootstrap = Sockets.bootstrap(Sockets.ReactorNames.RPC, this.config, (BiAction<SocketChannel>) socketChannel -> {
            ChannelPipeline addLast = socketChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(this.config.getHeartbeatTimeout(), this.config.getHeartbeatTimeout() / 2, 0)});
            Sockets.addBackendHandler(socketChannel, this.config, this.config.getServerEndpoint());
            addLast.addLast(new ChannelHandler[]{TcpClientConfig.DEFAULT_ENCODER, new ObjectDecoder(Constants.MAX_HEAP_BUF_SIZE, TcpClientConfig.DEFAULT_CLASS_RESOLVER), new ClientHandler()});
        });
        doConnect(false, null);
        return new Future<Void>() { // from class: org.rx.net.transport.StatefulTcpClient.1
            @Override // java.util.concurrent.Future
            public boolean cancel(boolean z) {
                StatefulTcpClient.this.config.setEnableReconnect(false);
                return true;
            }

            @Override // java.util.concurrent.Future
            public boolean isCancelled() {
                return !StatefulTcpClient.this.config.isEnableReconnect();
            }

            @Override // java.util.concurrent.Future
            public boolean isDone() {
                return StatefulTcpClient.this.isConnected();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public Void get() throws InterruptedException, ExecutionException {
                ChannelFuture channelFuture = StatefulTcpClient.this.connectingFuture;
                if (channelFuture == null) {
                    return null;
                }
                return (Void) channelFuture.get();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public Void get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                ChannelFuture channelFuture = StatefulTcpClient.this.connectingFuture;
                if (channelFuture == null) {
                    return null;
                }
                return (Void) channelFuture.get(j, timeUnit);
            }
        };
    }

    synchronized void doConnect(boolean z, Object obj) {
        InetSocketAddress serverEndpoint;
        if (!z) {
            serverEndpoint = this.config.getServerEndpoint();
        } else {
            if (!isShouldReconnect()) {
                return;
            }
            NEventArgs nEventArgs = new NEventArgs((InetSocketAddress) Extends.ifNull(this.connectingEp, this.config.getServerEndpoint()));
            raiseEvent(this.onReconnecting, (Delegate<TcpClient, NEventArgs<InetSocketAddress>>) nEventArgs);
            InetSocketAddress inetSocketAddress = (InetSocketAddress) nEventArgs.getValue();
            this.connectingEp = inetSocketAddress;
            serverEndpoint = inetSocketAddress;
        }
        InetSocketAddress inetSocketAddress2 = serverEndpoint;
        this.connectingFuture = this.bootstrap.connect(serverEndpoint).addListeners(new GenericFutureListener[]{Sockets.logConnect(this.config.getServerEndpoint()), channelFuture -> {
            this.channel = channelFuture.channel();
            if (!channelFuture.isSuccess()) {
                if (isShouldReconnect()) {
                    Tasks.timer().setTimeout(() -> {
                        doConnect(true, obj);
                        Extends.circuitContinue(isShouldReconnect());
                    }, j -> {
                        long max = j >= 5000 ? 5000L : Math.max(j * 2, 100L);
                        log.warn("{} reconnect {} failed will re-attempt in {}ms", new Object[]{this, inetSocketAddress2, Long.valueOf(max)});
                        return max;
                    }, this, Constants.TIMER_SINGLE_FLAG);
                    return;
                } else {
                    log.warn("{} {} fail", z ? "reconnect" : "connect", inetSocketAddress2);
                    return;
                }
            }
            this.connectingEp = null;
            this.connectingFuture = null;
            this.config.setServerEndpoint(inetSocketAddress2);
            this.remoteEndpoint = (InetSocketAddress) this.channel.remoteAddress();
            this.localEndpoint = (InetSocketAddress) this.channel.localAddress();
            if (obj != null) {
                synchronized (obj) {
                    obj.notifyAll();
                }
            }
            if (z) {
                log.info("reconnect {} ok", inetSocketAddress2);
                raiseEvent(this.onReconnected, (Delegate<TcpClient, NEventArgs<InetSocketAddress>>) new NEventArgs(inetSocketAddress2));
            }
        }});
    }

    void reconnectAsync() {
        Tasks.setTimeout(() -> {
            doConnect(true, null);
        }, 1000L, this.bootstrap, Constants.TIMER_REPLACE_FLAG);
    }

    ChannelId channelId() {
        if (this.channel != null) {
            return this.channel.id();
        }
        return null;
    }

    @Override // org.rx.net.transport.TcpClient
    public synchronized void send(@NonNull Serializable serializable) {
        if (serializable == null) {
            throw new NullPointerException("pack is marked non-null but is null");
        }
        if (!isConnected()) {
            if (isShouldReconnect() && !FluentWait.polling(this.sendWaitConnectMillis).awaitTrue(fluentWait -> {
                return isConnected();
            })) {
                reconnectAsync();
                throw new ClientDisconnectedException(channelId());
            }
            if (!isConnected()) {
                throw new ClientDisconnectedException(channelId());
            }
        }
        NEventArgs nEventArgs = new NEventArgs(serializable);
        raiseEvent(this.onSend, (Delegate<TcpClient, NEventArgs<Serializable>>) nEventArgs);
        if (nEventArgs.isCancel()) {
            return;
        }
        this.channel.writeAndFlush(serializable);
        log.debug("clientWrite {} {}", this.config.getServerEndpoint(), serializable);
    }

    @Override // org.rx.net.transport.TcpClient
    public Delegate<TcpClient, NEventArgs<Serializable>> onReceive() {
        return this.onReceive;
    }

    public TcpClientConfig getConfig() {
        return this.config;
    }

    public void setSendWaitConnectMillis(long j) {
        this.sendWaitConnectMillis = j;
    }

    @Override // org.rx.net.transport.TcpClient
    public InetSocketAddress getRemoteEndpoint() {
        return this.remoteEndpoint;
    }

    public InetSocketAddress getLocalEndpoint() {
        return this.localEndpoint;
    }

    @Override // org.rx.net.transport.TcpClient
    public Channel getChannel() {
        return this.channel;
    }
}
