/*
 * Decompiled with CFR 0.152.
 */
package org.rx.net;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.rx.bean.WeakIdentityMap;
import org.rx.core.Disposable;
import org.rx.core.Extends;
import org.rx.core.Linq;
import org.rx.core.ResetEventWait;
import org.rx.core.Strings;
import org.rx.core.Tasks;
import org.rx.core.WaitHandle;
import org.rx.exception.TraceHandler;
import org.rx.io.Bytes;
import org.rx.net.MemoryMode;
import org.rx.net.Sockets;
import org.rx.net.http.HttpClient;
import org.rx.util.function.PredicateFunc;
import org.rx.util.function.TripleFunc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class NetEventWait
extends Disposable
implements WaitHandle {
    private static final Logger log = LoggerFactory.getLogger(NetEventWait.class);
    public static final String DEFAULT_URL_SUFFIX = "/mx/httpSignal";
    public static TripleFunc<InetSocketAddress, String, Set<String>> HTTP_SIGNAL_HANDLER;
    static final AttributeKey<Set<NetEventWait>> REF;
    static final Map<InetSocketAddress, NioDatagramChannel> channels;
    final String group;
    final String idString;
    final InetSocketAddress multicastEndpoint;
    final NioDatagramChannel channel;
    final ResetEventWait wait = new ResetEventWait();
    int multicastCount = 1;

    public static void appendDefaultUrlSuffix(Set<String> set) {
        List<String> list = Linq.from(set).select(u -> u + DEFAULT_URL_SUFFIX).toList();
        set.clear();
        set.addAll(list);
    }

    public static void multicastLocal(InetSocketAddress multicastEndpoint, String group, int mcId) {
        NioDatagramChannel channel = channels.get(multicastEndpoint);
        if (channel == null) {
            return;
        }
        NetEventWait.multicastLocal((Channel)channel, group, mcId);
    }

    static void multicastLocal(Channel channel, String group, int mcId) {
        for (NetEventWait w : (Set)channel.attr(REF).get()) {
            if (!Strings.hashEquals(w.group, group)) {
                log.info("multicast skip {} ~ {}@{}", new Object[]{w.idString, group, Integer.toHexString(mcId)});
                continue;
            }
            log.info("multicast signal {} <- {}", (Object)w.idString, (Object)Integer.toHexString(mcId));
            w.wait.set();
        }
    }

    public NetEventWait(String group) {
        this(group, new InetSocketAddress("239.0.0.2", 80));
    }

    public NetEventWait(@NonNull String group, @NonNull InetSocketAddress multicastEndpoint) {
        if (group == null) {
            throw new NullPointerException("group is marked non-null but is null");
        }
        if (multicastEndpoint == null) {
            throw new NullPointerException("multicastEndpoint is marked non-null but is null");
        }
        this.group = group;
        this.multicastEndpoint = multicastEndpoint;
        this.idString = group + "@" + Integer.toHexString(this.hashCode());
        this.channel = channels.computeIfAbsent(multicastEndpoint, k -> (NioDatagramChannel)Sockets.udpBootstrap(MemoryMode.LOW, true, c -> {
            c.attr(REF).set(Collections.newSetFromMap(new WeakIdentityMap()));
            c.pipeline().addLast(new ChannelHandler[]{Handler.DEFAULT});
        }).bind(multicastEndpoint.getPort()).addListener((GenericFutureListener)((ChannelFutureListener)f -> {
            if (!f.isSuccess()) {
                TraceHandler.INSTANCE.log("multicast bind error {}", this.idString, f.cause());
                return;
            }
            NioDatagramChannel c = (NioDatagramChannel)f.channel();
            c.joinGroup(multicastEndpoint.getAddress());
            log.info("multicast join {} -> {}", (Object)this.idString, (Object)multicastEndpoint);
        })).syncUninterruptibly().channel());
        Set refs = (Set)this.channel.attr(REF).get();
        if (refs != null) {
            log.info("multicast ref {}", (Object)this.idString);
            refs.add(this);
        }
    }

    @Override
    protected void freeObjects() {
        ((Set)this.channel.attr(REF).get()).remove(this);
    }

    public boolean await() {
        return this.await(-1L);
    }

    @Override
    public boolean await(long timeoutMillis) {
        return this.await(timeoutMillis, -1L, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean await(long timeoutMillis, long intervalMillis, PredicateFunc<Integer> isTrue) {
        if (intervalMillis >= 0L && isTrue != null) {
            long deadline = System.nanoTime() + timeoutMillis * 1000000L;
            ResetEventWait resetEventWait = this.wait;
            synchronized (resetEventWait) {
                this.wait.reset();
                int i = 1;
                do {
                    if (this.wait.waitOne(intervalMillis) || isTrue.test(i)) {
                        return true;
                    }
                    ++i;
                } while (System.nanoTime() < deadline);
                return false;
            }
        }
        ResetEventWait resetEventWait = this.wait;
        synchronized (resetEventWait) {
            this.wait.reset();
            return this.wait.waitOne(timeoutMillis);
        }
    }

    @Override
    public void signalAll() {
        this.signalAll(HTTP_SIGNAL_HANDLER != null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void signalAll(boolean httpSignal) {
        ResetEventWait resetEventWait = this.wait;
        synchronized (resetEventWait) {
            int mcId = this.hashCode();
            NetEventWait.multicastLocal((Channel)this.channel, this.group, mcId);
            ByteBuf buf = Bytes.directBuffer();
            buf.writeInt(mcId);
            buf.writeCharSequence((CharSequence)this.group, StandardCharsets.UTF_8);
            DatagramPacket packet = new DatagramPacket(buf, this.multicastEndpoint);
            int last = this.multicastCount - 1;
            for (int i = 0; i < this.multicastCount; ++i) {
                if (i < last) {
                    packet.retain();
                }
                this.channel.writeAndFlush((Object)packet);
            }
            if (httpSignal && HTTP_SIGNAL_HANDLER != null) {
                Set<String> urls = HTTP_SIGNAL_HANDLER.apply(this.multicastEndpoint, this.group);
                if (CollectionUtils.isEmpty(urls)) {
                    return;
                }
                Tasks.run(() -> {
                    HttpClient client = new HttpClient();
                    HashMap<String, Object> params = new HashMap<String, Object>();
                    params.put("multicast", Sockets.toString(this.multicastEndpoint));
                    params.put("group", this.group);
                    params.put("mcId", mcId);
                    Extends.eachQuietly(urls, u -> client.get(HttpClient.buildUrl(u, params)));
                });
            }
        }
    }

    public void setMulticastCount(int multicastCount) {
        this.multicastCount = multicastCount;
    }

    static {
        REF = AttributeKey.valueOf((String)"Ref");
        channels = new ConcurrentHashMap<InetSocketAddress, NioDatagramChannel>(8);
    }

    @ChannelHandler.Sharable
    static class Handler
    extends SimpleChannelInboundHandler<DatagramPacket> {
        static final Handler DEFAULT = new Handler();

        Handler() {
        }

        protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
            ByteBuf buf = (ByteBuf)packet.content();
            int mcId = buf.readInt();
            String group = buf.toString(StandardCharsets.UTF_8);
            NetEventWait.multicastLocal(ctx.channel(), group, mcId);
        }
    }
}

