/*
 * Decompiled with CFR 0.152.
 */
package org.rx.net.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.rx.core.Delegate;
import org.rx.core.EventPublisher;
import org.rx.core.Extends;
import org.rx.core.NEventArgs;
import org.rx.core.ResetEventWait;
import org.rx.core.Tasks;
import org.rx.exception.TraceHandler;
import org.rx.io.MemoryStream;
import org.rx.io.Serializer;
import org.rx.net.Sockets;
import org.rx.net.transport.protocol.Ack;
import org.rx.net.transport.protocol.AckSync;
import org.rx.net.transport.protocol.UdpMessage;
import org.rx.util.IdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpClient
implements EventPublisher<UdpClient> {
    private static final Logger log = LoggerFactory.getLogger(UdpClient.class);
    static final AttributeKey<UdpClient> OWNER = AttributeKey.valueOf((String)"UdpClient");
    static final Handler HANDLER = new Handler();
    static final IdGenerator generator = new IdGenerator();
    static final Map<Integer, Context> queue = new ConcurrentHashMap<Integer, Context>();
    static final Set<Integer> record = ConcurrentHashMap.newKeySet();
    public final Delegate<UdpClient, NEventArgs<UdpMessage>> onReceive = Delegate.create();
    final Bootstrap bootstrap = Sockets.udpBootstrap(null, ch -> ch.pipeline().addLast(new ChannelHandler[]{HANDLER}));
    final Channel channel;
    int waitAckTimeoutMillis = 15000;
    boolean fullSync;
    int maxResend = 2;

    public UdpClient(int bindPort) {
        this.channel = this.bootstrap.bind(bindPort).addListener((GenericFutureListener)Sockets.logBind(bindPort)).channel();
        this.channel.attr(OWNER).set((Object)this);
    }

    DatagramPacket serialize(InetSocketAddress remoteAddress, Object message) {
        MemoryStream stream = new MemoryStream(true);
        Serializer.DEFAULT.serialize(message, stream);
        ByteBuf buf = stream.getBuffer().readerIndex(0);
        if (buf.readableBytes() > 1024) {
            log.warn("Too large packet size 4 udp. {} > 1024", (Object)buf.readableBytes());
        }
        return new DatagramPacket(buf, remoteAddress);
    }

    void sendAck(InetSocketAddress remoteAddress, UdpMessage message) {
        record.add(message.id);
        Tasks.setTimeout(() -> record.remove(message.id), message.alive);
        this.channel.writeAndFlush((Object)this.serialize(remoteAddress, new Ack(message.id)));
    }

    public <T> ChannelFuture sendAsync(InetSocketAddress remoteAddress, T packet) {
        return this.sendAsync(remoteAddress, packet, this.waitAckTimeoutMillis, this.fullSync);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> ChannelFuture sendAsync(InetSocketAddress remoteAddress, T packet, int waitAckTimeoutMillis, boolean fullSync) throws TimeoutException {
        AckSync as = fullSync ? AckSync.FULL : (waitAckTimeoutMillis > 0 ? AckSync.SEMI : AckSync.NONE);
        UdpMessage message = new UdpMessage(generator.increment(), as, waitAckTimeoutMillis, remoteAddress, packet);
        if (message.ack != AckSync.NONE) {
            Context ctx = new Context(message);
            queue.put(message.id, ctx);
            ctx.future = Tasks.setTimeout(() -> {
                this.channel.writeAndFlush((Object)this.serialize(remoteAddress, message));
                Extends.circuitContinue(++ctx.resend <= this.maxResend);
            }, waitAckTimeoutMillis / this.maxResend);
        }
        ChannelFuture future = this.channel.writeAndFlush((Object)this.serialize(remoteAddress, message));
        if (message.ack != AckSync.NONE) {
            Context ctx = queue.get(message.id);
            if (ctx == null) {
                return future;
            }
            try {
                ctx.syncRoot.waitOne(waitAckTimeoutMillis);
            }
            finally {
                queue.remove(message.id);
            }
        }
        return future;
    }

    public int getWaitAckTimeoutMillis() {
        return this.waitAckTimeoutMillis;
    }

    public void setWaitAckTimeoutMillis(int waitAckTimeoutMillis) {
        this.waitAckTimeoutMillis = waitAckTimeoutMillis;
    }

    public boolean isFullSync() {
        return this.fullSync;
    }

    public void setFullSync(boolean fullSync) {
        this.fullSync = fullSync;
    }

    public int getMaxResend() {
        return this.maxResend;
    }

    public void setMaxResend(int maxResend) {
        this.maxResend = maxResend;
    }

    @ChannelHandler.Sharable
    static class Handler
    extends SimpleChannelInboundHandler<DatagramPacket> {
        Handler() {
        }

        protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
            UdpClient client = (UdpClient)ctx.channel().attr(OWNER).get();
            Object pack = Serializer.DEFAULT.deserialize(new MemoryStream(((ByteBuf)msg.content()).retain(), false));
            if (Extends.tryAs(pack, Ack.class, ack -> {
                Context sr = queue.remove(ack.id);
                if (sr == null) {
                    return;
                }
                sr.syncRoot.set();
                sr.future.cancel(true);
                log.debug("Receive Ack {}", (Object)ack.id);
            })) {
                return;
            }
            UdpMessage message = (UdpMessage)pack;
            if (record.contains(message.id)) {
                log.debug("Consumed just send Ack {}", (Object)message.id);
                client.sendAck((InetSocketAddress)msg.sender(), message);
                return;
            }
            if (message.ack == AckSync.SEMI) {
                client.sendAck((InetSocketAddress)msg.sender(), message);
            }
            client.raiseEventAsync(client.onReceive, new NEventArgs<UdpMessage>(message)).whenComplete((r, e) -> {
                if (e != null) {
                    return;
                }
                if (message.ack == AckSync.FULL) {
                    client.sendAck((InetSocketAddress)msg.sender(), message);
                }
            });
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            TraceHandler.INSTANCE.log(cause);
        }
    }

    static class Context {
        public final UdpMessage message;
        public final ResetEventWait syncRoot = new ResetEventWait();
        public int resend;
        public Future<?> future;

        public Context(UdpMessage message) {
            this.message = message;
        }
    }
}

