/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.messaging.impl;

import com.esotericsoftware.kryo.Serializer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.atomix.cluster.impl.AddressSerializer;
import io.atomix.cluster.messaging.ManagedUnicastService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.UnicastService;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultMaxBytesRecvByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyUnicastService
implements ManagedUnicastService {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyUnicastService.class);
    private static final io.atomix.utils.serializer.Serializer SERIALIZER = io.atomix.utils.serializer.Serializer.using((Namespace)new Namespace.Builder().register(Namespaces.BASIC).nextId(500).register(new Class[]{Message.class}).register((Serializer)new AddressSerializer(), new Class[]{Address.class}).build());
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Address address;
    private final MessagingConfig config;
    private final Map<String, Map<BiConsumer<Address, byte[]>, Executor>> listeners = Maps.newConcurrentMap();
    private final AtomicBoolean started = new AtomicBoolean();
    private EventLoopGroup group;
    private DatagramChannel channel;
    private final int preamble;

    public NettyUnicastService(String clusterId, Address address, MessagingConfig config) {
        this.address = address;
        this.config = config;
        this.preamble = clusterId.hashCode();
    }

    @Override
    public void unicast(Address address, String subject, byte[] payload) {
        if (!this.started.get()) {
            LOGGER.debug("Failed sending unicast message, unicast service was not started.");
            return;
        }
        InetAddress resolvedAddress = address.address();
        if (resolvedAddress == null) {
            LOGGER.debug("Failed sending unicast message (destination address {} cannot be resolved)", (Object)address);
            return;
        }
        Message message = new Message(this.address, subject, payload);
        byte[] bytes = SERIALIZER.encode((Object)message);
        ByteBuf buf = this.channel.alloc().buffer(8 + bytes.length);
        buf.writeInt(this.preamble);
        buf.writeInt(bytes.length).writeBytes(bytes);
        this.channel.writeAndFlush((Object)new DatagramPacket(buf, new InetSocketAddress(resolvedAddress, address.port())));
    }

    @Override
    public synchronized void addListener(String subject, BiConsumer<Address, byte[]> listener, Executor executor) {
        this.listeners.computeIfAbsent(subject, s -> Maps.newConcurrentMap()).put(listener, executor);
    }

    @Override
    public synchronized void removeListener(String subject, BiConsumer<Address, byte[]> listener) {
        Map<BiConsumer<Address, byte[]>, Executor> listeners = this.listeners.get(subject);
        if (listeners != null) {
            listeners.remove(listener);
            if (listeners.isEmpty()) {
                this.listeners.remove(subject);
            }
        }
    }

    private CompletableFuture<Void> bootstrap() {
        Bootstrap serverBootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.group)).channel(NioDatagramChannel.class)).handler((ChannelHandler)new SimpleChannelInboundHandler<DatagramPacket>(){

            protected void channelRead0(ChannelHandlerContext context, DatagramPacket packet) throws Exception {
                NettyUnicastService.this.handleReceivedPacket(packet);
            }
        })).option(ChannelOption.RCVBUF_ALLOCATOR, (Object)new DefaultMaxBytesRecvByteBufAllocator())).option(ChannelOption.SO_BROADCAST, (Object)true)).option(ChannelOption.SO_REUSEADDR, (Object)true);
        return this.bind(serverBootstrap);
    }

    private void handleReceivedPacket(DatagramPacket packet) {
        int preambleReceived = ((ByteBuf)packet.content()).readInt();
        if (preambleReceived != this.preamble) {
            this.log.warn("Received unicast message from {} which is outside of the cluster. Ignoring the message.", (Object)packet.sender());
            return;
        }
        byte[] payload = new byte[((ByteBuf)packet.content()).readInt()];
        ((ByteBuf)packet.content()).readBytes(payload);
        Message message = (Message)SERIALIZER.decode(payload);
        Map<BiConsumer<Address, byte[]>, Executor> subjectListeners = this.listeners.get(message.subject());
        if (subjectListeners != null) {
            subjectListeners.forEach((consumer, executor) -> executor.execute(() -> consumer.accept(message.source(), message.payload())));
        }
    }

    private CompletableFuture<Void> bind(Bootstrap bootstrap) {
        int port;
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        int n = port = this.config.getPort() != null ? this.config.getPort().intValue() : this.address.port();
        if (this.config.getInterfaces().isEmpty()) {
            this.bind(bootstrap, Lists.newArrayList((Object[])new String[]{"0.0.0.0"}).iterator(), port, future);
        } else {
            this.bind(bootstrap, this.config.getInterfaces().iterator(), port, future);
        }
        return future;
    }

    private void bind(Bootstrap bootstrap, Iterator<String> ifaces, int port, CompletableFuture<Void> future) {
        if (ifaces.hasNext()) {
            String iface = ifaces.next();
            bootstrap.bind(iface, port).addListener((GenericFutureListener)((ChannelFutureListener)f -> {
                if (f.isSuccess()) {
                    this.log.info("UDP server listening for connections on {}:{}", (Object)iface, (Object)port);
                    this.channel = (DatagramChannel)f.channel();
                    this.bind(bootstrap, ifaces, port, future);
                } else {
                    this.log.warn("Failed to bind TCP server to port {}:{} due to {}", new Object[]{iface, port, f.cause()});
                    future.completeExceptionally(f.cause());
                }
            }));
        } else {
            future.complete(null);
        }
    }

    public CompletableFuture<UnicastService> start() {
        this.group = new NioEventLoopGroup(0, Threads.namedThreads((String)"netty-unicast-event-nio-client-%d", (Logger)this.log));
        return ((CompletableFuture)this.bootstrap().thenRun(() -> this.started.set(true))).thenApply(v -> this);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (!this.started.compareAndSet(true, false)) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.channel != null) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            this.channel.close().addListener(f -> this.group.shutdownGracefully().addListener(f2 -> future.complete(null)));
            this.channel = null;
            return future;
        }
        return CompletableFuture.completedFuture(null);
    }

    static final class Message {
        private final Address source;
        private final String subject;
        private final byte[] payload;

        Message(Address source, String subject, byte[] payload) {
            this.source = source;
            this.subject = subject;
            this.payload = payload;
        }

        Address source() {
            return this.source;
        }

        String subject() {
            return this.subject;
        }

        byte[] payload() {
            return this.payload;
        }
    }
}

