package org.drasyl.handler.pubsub;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.drasyl.channel.OverlayAddressedMessage;
import org.drasyl.identity.DrasylAddress;
import org.drasyl.util.Preconditions;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

/* loaded from: input_file:org/drasyl/handler/pubsub/PubSubPublishHandler.class */
public class PubSubPublishHandler extends ChannelDuplexHandler {
    public static final Duration DEFAULT_PUBLISH_TIMEOUT = Duration.ofMillis(5000);
    private static final Logger LOG = LoggerFactory.getLogger(PubSubPublishHandler.class);
    private final Duration publishTimeout;
    private final Map<UUID, Promise<Void>> requests;
    private final DrasylAddress broker;

    PubSubPublishHandler(Duration duration, Map<UUID, Promise<Void>> map, DrasylAddress drasylAddress) {
        this.publishTimeout = Preconditions.requireNonNegative(duration);
        this.requests = (Map) Objects.requireNonNull(map);
        this.broker = (DrasylAddress) Objects.requireNonNull(drasylAddress);
    }

    public PubSubPublishHandler(Duration duration, DrasylAddress drasylAddress) {
        this(duration, new HashMap(), drasylAddress);
    }

    public PubSubPublishHandler(DrasylAddress drasylAddress) {
        this(DEFAULT_PUBLISH_TIMEOUT, drasylAddress);
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
        if (obj instanceof PubSubPublish) {
            doPublish(channelHandlerContext, (PubSubPublish) obj, channelPromise);
        } else {
            channelHandlerContext.write(obj, channelPromise);
        }
    }

    private void doPublish(ChannelHandlerContext channelHandlerContext, PubSubPublish pubSubPublish, ChannelPromise channelPromise) {
        LOG.trace("Send `{}` to broker `{}`.", pubSubPublish, this.broker);
        channelHandlerContext.write(new OverlayAddressedMessage(pubSubPublish, this.broker)).addListener(future -> {
            if (this.publishTimeout.isZero() || !future.isSuccess()) {
                PromiseNotifier.cascade(future, channelPromise);
                return;
            }
            this.requests.put(pubSubPublish.getId(), channelPromise);
            channelPromise.addListener(future -> {
                this.requests.remove(pubSubPublish.getId());
            });
            channelHandlerContext.executor().schedule(() -> {
                return Boolean.valueOf(channelPromise.tryFailure(new Exception("Got no confirmation from broker within " + this.publishTimeout.toMillis() + "ms.")));
            }, this.publishTimeout.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if ((obj instanceof OverlayAddressedMessage) && (((OverlayAddressedMessage) obj).content() instanceof PubSubPublished) && this.broker.equals(((OverlayAddressedMessage) obj).sender())) {
            handlePublished((PubSubPublished) ((OverlayAddressedMessage) obj).content());
        } else {
            channelHandlerContext.fireChannelRead(obj);
        }
    }

    private void handlePublished(PubSubPublished pubSubPublished) {
        LOG.trace("Got `{}` from broker `{}`.", pubSubPublished, this.broker);
        Promise<Void> remove = this.requests.remove(pubSubPublished.getId());
        if (remove != null) {
            remove.trySuccess((Object) null);
        }
    }
}
