package org.drasyl.handler.pubsub;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Supplier;
import org.drasyl.channel.OverlayAddressedMessage;
import org.drasyl.identity.DrasylAddress;
import org.drasyl.util.HashSetMultimap;
import org.drasyl.util.Multimap;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

/* loaded from: input_file:org/drasyl/handler/pubsub/PubSubBrokerHandler.class */
public class PubSubBrokerHandler extends SimpleChannelInboundHandler<OverlayAddressedMessage<PubSubMessage>> {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubBrokerHandler.class);
    private final Multimap<String, DrasylAddress> subscriptions;

    PubSubBrokerHandler(Multimap<String, DrasylAddress> multimap) {
        this.subscriptions = (Multimap) Objects.requireNonNull(multimap);
    }

    public PubSubBrokerHandler() {
        this(new HashSetMultimap());
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        for (String str : this.subscriptions.keySet()) {
            Iterator it = this.subscriptions.get(str).iterator();
            while (it.hasNext()) {
                channelHandlerContext.writeAndFlush(new OverlayAddressedMessage(PubSubUnsubscribe.of(str), (DrasylAddress) it.next()));
            }
        }
        if (!this.subscriptions.isEmpty()) {
            channelHandlerContext.flush();
            this.subscriptions.clear();
        }
        channelHandlerContext.fireChannelInactive();
    }

    public boolean acceptInboundMessage(Object obj) throws Exception {
        return (obj instanceof OverlayAddressedMessage) && ((((OverlayAddressedMessage) obj).content() instanceof PubSubPublish) || (((OverlayAddressedMessage) obj).content() instanceof PubSubSubscribe) || (((OverlayAddressedMessage) obj).content() instanceof PubSubUnsubscribe));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, OverlayAddressedMessage<PubSubMessage> overlayAddressedMessage) throws Exception {
        LOG.trace("Got `{}` from `{}`.", overlayAddressedMessage.content(), overlayAddressedMessage.sender());
        if (overlayAddressedMessage.content() instanceof PubSubPublish) {
            handlePublish(channelHandlerContext, (PubSubPublish) overlayAddressedMessage.content(), (DrasylAddress) overlayAddressedMessage.sender());
        } else if (overlayAddressedMessage.content() instanceof PubSubSubscribe) {
            handleSubscribe(channelHandlerContext, (PubSubSubscribe) overlayAddressedMessage.content(), (DrasylAddress) overlayAddressedMessage.sender());
        } else if (overlayAddressedMessage.content() instanceof PubSubUnsubscribe) {
            handleUnsubscribe(channelHandlerContext, (PubSubUnsubscribe) overlayAddressedMessage.content(), (DrasylAddress) overlayAddressedMessage.sender());
        }
    }

    private void handlePublish(ChannelHandlerContext channelHandlerContext, PubSubPublish pubSubPublish, DrasylAddress drasylAddress) {
        Collection collection = this.subscriptions.get(pubSubPublish.getTopic());
        if (collection.isEmpty()) {
            Logger logger = LOG;
            Objects.requireNonNull(pubSubPublish);
            logger.debug("Topic `{}` got new publication from `{}`. But as there are no subscribers, publication is dropped.", pubSubPublish::getTopic, () -> {
                return drasylAddress;
            });
        } else {
            Logger logger2 = LOG;
            Objects.requireNonNull(pubSubPublish);
            Objects.requireNonNull(collection);
            logger2.debug("Topic `{}` got new publication from `{}`. Forward to {} subscriber(s).", new Supplier[]{pubSubPublish::getTopic, () -> {
                return drasylAddress;
            }, collection::size});
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                channelHandlerContext.writeAndFlush(new OverlayAddressedMessage(pubSubPublish.retain(), (DrasylAddress) it.next()));
            }
        }
        channelHandlerContext.writeAndFlush(new OverlayAddressedMessage(PubSubPublished.of(pubSubPublish.getId()), drasylAddress));
    }

    private void handleSubscribe(ChannelHandlerContext channelHandlerContext, PubSubSubscribe pubSubSubscribe, DrasylAddress drasylAddress) {
        if (this.subscriptions.put(pubSubSubscribe.getTopic(), drasylAddress)) {
            LOG.debug("Topic `{}` got new subscriber: `{}`", pubSubSubscribe.getTopic(), drasylAddress);
        }
        channelHandlerContext.writeAndFlush(new OverlayAddressedMessage(PubSubSubscribed.of(pubSubSubscribe.getId()), drasylAddress));
    }

    private void handleUnsubscribe(ChannelHandlerContext channelHandlerContext, PubSubUnsubscribe pubSubUnsubscribe, DrasylAddress drasylAddress) {
        if (this.subscriptions.remove(pubSubUnsubscribe.getTopic(), drasylAddress)) {
            LOG.debug("Topic `{}` lost subscriber: `{}`", pubSubUnsubscribe.getTopic(), drasylAddress);
        }
        channelHandlerContext.writeAndFlush(new OverlayAddressedMessage(PubSubUnsubscribed.of(pubSubUnsubscribe.getId()), drasylAddress));
    }
}
