/*
 * Decompiled with CFR 0.152.
 */
package org.drasyl.handler.pubsub;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
import org.drasyl.channel.OverlayAddressedMessage;
import org.drasyl.handler.pubsub.PubSubMessage;
import org.drasyl.handler.pubsub.PubSubPublish;
import org.drasyl.handler.pubsub.PubSubPublished;
import org.drasyl.handler.pubsub.PubSubSubscribe;
import org.drasyl.handler.pubsub.PubSubSubscribed;
import org.drasyl.handler.pubsub.PubSubUnsubscribe;
import org.drasyl.handler.pubsub.PubSubUnsubscribed;
import org.drasyl.identity.DrasylAddress;
import org.drasyl.util.HashSetMultimap;
import org.drasyl.util.Multimap;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

public class PubSubBrokerHandler
extends SimpleChannelInboundHandler<OverlayAddressedMessage<PubSubMessage>> {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubBrokerHandler.class);
    private final Multimap<String, DrasylAddress> subscriptions;

    PubSubBrokerHandler(Multimap<String, DrasylAddress> subscriptions) {
        this.subscriptions = Objects.requireNonNull(subscriptions);
    }

    public PubSubBrokerHandler() {
        this((Multimap<String, DrasylAddress>)new HashSetMultimap());
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        for (String topic : this.subscriptions.keySet()) {
            Collection subscribers = this.subscriptions.get((Object)topic);
            for (DrasylAddress subscriber : subscribers) {
                ctx.writeAndFlush((Object)new OverlayAddressedMessage((Object)PubSubUnsubscribe.of(topic), subscriber));
            }
        }
        if (!this.subscriptions.isEmpty()) {
            ctx.flush();
            this.subscriptions.clear();
        }
        ctx.fireChannelInactive();
    }

    public boolean acceptInboundMessage(Object msg) throws Exception {
        return msg instanceof OverlayAddressedMessage && (((OverlayAddressedMessage)msg).content() instanceof PubSubPublish || ((OverlayAddressedMessage)msg).content() instanceof PubSubSubscribe || ((OverlayAddressedMessage)msg).content() instanceof PubSubUnsubscribe);
    }

    protected void channelRead0(ChannelHandlerContext ctx, OverlayAddressedMessage<PubSubMessage> msg) throws Exception {
        LOG.trace("Got `{}` from `{}`.", msg.content(), (Object)msg.sender());
        if (msg.content() instanceof PubSubPublish) {
            this.handlePublish(ctx, (PubSubPublish)msg.content(), (DrasylAddress)msg.sender());
        } else if (msg.content() instanceof PubSubSubscribe) {
            this.handleSubscribe(ctx, (PubSubSubscribe)msg.content(), (DrasylAddress)msg.sender());
        } else if (msg.content() instanceof PubSubUnsubscribe) {
            this.handleUnsubscribe(ctx, (PubSubUnsubscribe)msg.content(), (DrasylAddress)msg.sender());
        }
    }

    private void handlePublish(ChannelHandlerContext ctx, PubSubPublish msg, DrasylAddress sender) {
        Collection subscribers = this.subscriptions.get((Object)msg.getTopic());
        if (subscribers.isEmpty()) {
            LOG.debug("Topic `{}` got new publication from `{}`. But as there are no subscribers, publication is dropped.", msg::getTopic, () -> sender);
        } else {
            Supplier[] supplierArray = new Supplier[3];
            supplierArray[0] = msg::getTopic;
            supplierArray[1] = () -> sender;
            supplierArray[2] = subscribers::size;
            LOG.debug("Topic `{}` got new publication from `{}`. Forward to {} subscriber(s).", supplierArray);
            for (DrasylAddress subscriber : subscribers) {
                ctx.writeAndFlush((Object)new OverlayAddressedMessage((Object)msg.retain(), subscriber));
            }
        }
        ctx.writeAndFlush((Object)new OverlayAddressedMessage((Object)PubSubPublished.of(msg.getId()), sender));
    }

    private void handleSubscribe(ChannelHandlerContext ctx, PubSubSubscribe msg, DrasylAddress sender) {
        if (this.subscriptions.put((Object)msg.getTopic(), (Object)sender)) {
            LOG.debug("Topic `{}` got new subscriber: `{}`", (Object)msg.getTopic(), (Object)sender);
        }
        ctx.writeAndFlush((Object)new OverlayAddressedMessage((Object)PubSubSubscribed.of(msg.getId()), sender));
    }

    private void handleUnsubscribe(ChannelHandlerContext ctx, PubSubUnsubscribe msg, DrasylAddress sender) {
        if (this.subscriptions.remove((Object)msg.getTopic(), (Object)sender)) {
            LOG.debug("Topic `{}` lost subscriber: `{}`", (Object)msg.getTopic(), (Object)sender);
        }
        ctx.writeAndFlush((Object)new OverlayAddressedMessage((Object)PubSubUnsubscribed.of(msg.getId()), sender));
    }
}

