/*
 * Decompiled with CFR 0.152.
 */
package org.rx.net.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import org.rx.core.Constants;
import org.rx.core.Delegate;
import org.rx.core.Disposable;
import org.rx.core.EventArgs;
import org.rx.core.Extends;
import org.rx.core.FluentWait;
import org.rx.core.NEventArgs;
import org.rx.core.NtpClock;
import org.rx.core.Tasks;
import org.rx.core.ThreadPool;
import org.rx.exception.InvalidException;
import org.rx.exception.TraceHandler;
import org.rx.net.SocketConfig;
import org.rx.net.Sockets;
import org.rx.net.transport.ClientDisconnectedException;
import org.rx.net.transport.TcpClient;
import org.rx.net.transport.TcpClientConfig;
import org.rx.net.transport.TcpServer;
import org.rx.net.transport.protocol.ErrorPacket;
import org.rx.net.transport.protocol.PingPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

public class StatefulTcpClient
extends Disposable
implements TcpClient {
    private static final Logger log = LoggerFactory.getLogger(StatefulTcpClient.class);
    static final TcpClientConfig NULL_CONF = new TcpClientConfig();
    public final Delegate<TcpClient, EventArgs> onConnected = Delegate.create();
    public final Delegate<TcpClient, EventArgs> onDisconnected = Delegate.create();
    public final Delegate<TcpClient, NEventArgs<InetSocketAddress>> onReconnecting = Delegate.create();
    public final Delegate<TcpClient, NEventArgs<InetSocketAddress>> onReconnected = Delegate.create();
    public final Delegate<TcpClient, NEventArgs<Serializable>> onSend = Delegate.create();
    public final Delegate<TcpClient, NEventArgs<Serializable>> onReceive = Delegate.create();
    public final Delegate<TcpClient, NEventArgs<PingPacket>> onPong = Delegate.create();
    public final Delegate<TcpClient, NEventArgs<Throwable>> onError = Delegate.create();
    final TcpClientConfig config;
    long sendWaitConnectMillis = 4000L;
    InetSocketAddress remoteEndpoint;
    InetSocketAddress localEndpoint;
    Bootstrap bootstrap;
    volatile Channel channel;
    volatile ChannelFuture connectingFuture;
    volatile InetSocketAddress connectingEp;

    @Override
    @NonNull
    public ThreadPool asyncScheduler() {
        return TcpServer.SCHEDULER;
    }

    @Override
    public boolean isConnected() {
        return this.channel != null && this.channel.isActive();
    }

    protected boolean isShouldReconnect() {
        return this.config.isEnableReconnect() && !this.isConnected();
    }

    public StatefulTcpClient(@NonNull TcpClientConfig config) {
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        this.config = config;
    }

    protected StatefulTcpClient() {
        this.config = NULL_CONF;
    }

    @Override
    protected void freeObjects() {
        this.config.setEnableReconnect(false);
        Sockets.closeOnFlushed(this.channel);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void connect(@NonNull InetSocketAddress remoteEp) throws TimeoutException {
        if (remoteEp == null) {
            throw new NullPointerException("remoteEp is marked non-null but is null");
        }
        if (this.isConnected()) {
            throw new InvalidException("Client has connected", new Object[0]);
        }
        this.config.setServerEndpoint(remoteEp);
        this.bootstrap = Sockets.bootstrap("RPC", (SocketConfig)this.config, channel -> {
            ChannelPipeline pipeline = channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(this.config.getHeartbeatTimeout(), this.config.getHeartbeatTimeout() / 2, 0)});
            Sockets.addBackendHandler((Channel)channel, this.config, this.config.getServerEndpoint());
            pipeline.addLast(new ChannelHandler[]{TcpClientConfig.DEFAULT_ENCODER, new ObjectDecoder(0x1000000, TcpClientConfig.DEFAULT_CLASS_RESOLVER), new ClientHandler()});
        });
        TcpClientConfig syncRoot = this.config;
        this.doConnect(false, syncRoot);
        TcpClientConfig tcpClientConfig = syncRoot;
        synchronized (tcpClientConfig) {
            try {
                syncRoot.wait(this.config.getConnectTimeoutMillis());
            }
            catch (InterruptedException e) {
                throw InvalidException.sneaky(e);
            }
        }
        if (!this.isConnected()) {
            throw new TimeoutException(MessageFormatter.format((String)"Client connect {} timeout", (Object)this.config.getServerEndpoint()).getMessage());
        }
    }

    @Override
    public synchronized Future<Void> connectAsync(@NonNull InetSocketAddress remoteEp) {
        if (remoteEp == null) {
            throw new NullPointerException("remoteEp is marked non-null but is null");
        }
        if (this.isConnected()) {
            throw new InvalidException("Client has connected", new Object[0]);
        }
        this.config.setServerEndpoint(remoteEp);
        this.bootstrap = Sockets.bootstrap("RPC", (SocketConfig)this.config, channel -> {
            ChannelPipeline pipeline = channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(this.config.getHeartbeatTimeout(), this.config.getHeartbeatTimeout() / 2, 0)});
            Sockets.addBackendHandler((Channel)channel, this.config, this.config.getServerEndpoint());
            pipeline.addLast(new ChannelHandler[]{TcpClientConfig.DEFAULT_ENCODER, new ObjectDecoder(0x1000000, TcpClientConfig.DEFAULT_CLASS_RESOLVER), new ClientHandler()});
        });
        this.doConnect(false, null);
        return new Future<Void>(){

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                StatefulTcpClient.this.config.setEnableReconnect(false);
                return true;
            }

            @Override
            public boolean isCancelled() {
                return !StatefulTcpClient.this.config.isEnableReconnect();
            }

            @Override
            public boolean isDone() {
                return StatefulTcpClient.this.isConnected();
            }

            @Override
            public Void get() throws InterruptedException, ExecutionException {
                ChannelFuture f = StatefulTcpClient.this.connectingFuture;
                if (f == null) {
                    return null;
                }
                return (Void)f.get();
            }

            @Override
            public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                ChannelFuture f = StatefulTcpClient.this.connectingFuture;
                if (f == null) {
                    return null;
                }
                return (Void)f.get(timeout, unit);
            }
        };
    }

    synchronized void doConnect(boolean reconnect, Object syncRoot) {
        InetSocketAddress ep;
        if (reconnect) {
            if (!this.isShouldReconnect()) {
                return;
            }
            NEventArgs<InetSocketAddress> args = new NEventArgs<InetSocketAddress>(Extends.ifNull(this.connectingEp, this.config.getServerEndpoint()));
            this.raiseEvent(this.onReconnecting, args);
            ep = this.connectingEp = args.getValue();
        } else {
            ep = this.config.getServerEndpoint();
        }
        this.connectingFuture = this.bootstrap.connect((SocketAddress)ep).addListeners(new GenericFutureListener[]{Sockets.logConnect(this.config.getServerEndpoint()), f -> {
            this.channel = f.channel();
            if (!f.isSuccess()) {
                if (this.isShouldReconnect()) {
                    Tasks.timer().setTimeout(() -> {
                        this.doConnect(true, syncRoot);
                        Extends.circuitContinue(this.isShouldReconnect());
                    }, d -> {
                        long delay = d >= 5000L ? 5000L : Math.max(d * 2L, 100L);
                        log.warn("{} reconnect {} failed will re-attempt in {}ms", new Object[]{this, ep, delay});
                        return delay;
                    }, (Object)this, Constants.TIMER_SINGLE_FLAG);
                } else {
                    log.warn("{} {} fail", (Object)(reconnect ? "reconnect" : "connect"), (Object)ep);
                }
                return;
            }
            this.connectingEp = null;
            this.connectingFuture = null;
            this.config.setServerEndpoint(ep);
            this.remoteEndpoint = (InetSocketAddress)this.channel.remoteAddress();
            this.localEndpoint = (InetSocketAddress)this.channel.localAddress();
            if (syncRoot != null) {
                Object object = syncRoot;
                synchronized (object) {
                    syncRoot.notifyAll();
                }
            }
            if (reconnect) {
                log.info("reconnect {} ok", (Object)ep);
                this.raiseEvent(this.onReconnected, new NEventArgs<InetSocketAddress>(ep));
            }
        }});
    }

    void reconnectAsync() {
        Tasks.setTimeout(() -> this.doConnect(true, null), 1000L, this.bootstrap, Constants.TIMER_REPLACE_FLAG);
    }

    ChannelId channelId() {
        return this.channel != null ? this.channel.id() : null;
    }

    @Override
    public synchronized void send(@NonNull Serializable pack) {
        if (pack == null) {
            throw new NullPointerException("pack is marked non-null but is null");
        }
        if (!this.isConnected()) {
            if (this.isShouldReconnect() && !FluentWait.polling(this.sendWaitConnectMillis).awaitTrue(w -> this.isConnected())) {
                this.reconnectAsync();
                throw new ClientDisconnectedException(this.channelId());
            }
            if (!this.isConnected()) {
                throw new ClientDisconnectedException(this.channelId());
            }
        }
        NEventArgs<Serializable> args = new NEventArgs<Serializable>(pack);
        this.raiseEvent(this.onSend, args);
        if (args.isCancel()) {
            return;
        }
        this.channel.writeAndFlush((Object)pack);
        log.debug("clientWrite {} {}", (Object)this.config.getServerEndpoint(), (Object)pack);
    }

    @Override
    public Delegate<TcpClient, NEventArgs<Serializable>> onReceive() {
        return this.onReceive;
    }

    public TcpClientConfig getConfig() {
        return this.config;
    }

    public void setSendWaitConnectMillis(long sendWaitConnectMillis) {
        this.sendWaitConnectMillis = sendWaitConnectMillis;
    }

    @Override
    public InetSocketAddress getRemoteEndpoint() {
        return this.remoteEndpoint;
    }

    public InetSocketAddress getLocalEndpoint() {
        return this.localEndpoint;
    }

    @Override
    public Channel getChannel() {
        return this.channel;
    }

    class ClientHandler
    extends ChannelInboundHandlerAdapter {
        ClientHandler() {
        }

        public void channelActive(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            log.debug("clientActive {}", (Object)channel.remoteAddress());
            StatefulTcpClient.this.raiseEventAsync(StatefulTcpClient.this.onConnected, EventArgs.EMPTY);
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            Channel channel = ctx.channel();
            log.debug("clientRead {} {}", (Object)channel.remoteAddress(), msg.getClass());
            Serializable pack = Extends.as(msg, Serializable.class);
            if (pack == null) {
                log.warn("clientRead discard {} {}", (Object)channel.remoteAddress(), msg.getClass());
                return;
            }
            if (Extends.tryAs(pack, ErrorPacket.class, p -> this.exceptionCaught(ctx, (Throwable)((Object)new InvalidException("Server error: {}", p.getErrorMessage()))))) {
                return;
            }
            if (Extends.tryAs(pack, PingPacket.class, p -> {
                log.info("clientHeartbeat pong {} {}ms", (Object)channel.remoteAddress(), (Object)(NtpClock.UTC.millis() - p.getTimestamp()));
                StatefulTcpClient.this.raiseEventAsync(StatefulTcpClient.this.onPong, new NEventArgs<PingPacket>((PingPacket)p));
            })) {
                return;
            }
            StatefulTcpClient.this.raiseEventAsync(StatefulTcpClient.this.onReceive, new NEventArgs<Serializable>(pack));
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            log.info("clientInactive {}", (Object)channel.remoteAddress());
            StatefulTcpClient.this.raiseEvent(StatefulTcpClient.this.onDisconnected, EventArgs.EMPTY);
            StatefulTcpClient.this.reconnectAsync();
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            Channel channel = ctx.channel();
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent)evt;
                switch (e.state()) {
                    case READER_IDLE: {
                        log.warn("clientHeartbeat loss {}", (Object)channel.remoteAddress());
                        ctx.close();
                        break;
                    }
                    case WRITER_IDLE: {
                        log.debug("clientHeartbeat ping {}", (Object)channel.remoteAddress());
                        ctx.writeAndFlush((Object)new PingPacket());
                    }
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            Channel channel = ctx.channel();
            TraceHandler.INSTANCE.log("clientCaught {}", channel.remoteAddress(), cause);
            if (!channel.isActive()) {
                return;
            }
            NEventArgs<Throwable> args = new NEventArgs<Throwable>(cause);
            Extends.quietly(() -> StatefulTcpClient.this.raiseEvent(StatefulTcpClient.this.onError, args));
            if (args.isCancel()) {
                return;
            }
            StatefulTcpClient.this.close();
        }
    }
}

