/*
 * Decompiled with CFR 0.152.
 */
package org.drasyl.handler.traffic;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.drasyl.util.Preconditions;
import org.drasyl.util.TokenBucket;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

public class OutboundMessagesThrottlingHandler
extends ChannelOutboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(OutboundMessagesThrottlingHandler.class);
    private final RateLimitedQueue queue;

    OutboundMessagesThrottlingHandler(RateLimitedQueue queue) {
        this.queue = Objects.requireNonNull(queue);
    }

    public OutboundMessagesThrottlingHandler(long maxEventsPerSecond) {
        this(new RateLimitedQueue(maxEventsPerSecond));
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        this.queue.add(ctx, () -> ctx.writeAndFlush(msg, promise));
    }

    public static class QueueConsumer
    implements Runnable {
        private final RateLimitedQueue queue;

        QueueConsumer(RateLimitedQueue queue) {
            this.queue = Objects.requireNonNull(queue);
        }

        @Override
        public void run() {
            LOG.trace("Queue Consumer started.");
            while (this.queue.tryConsume()) {
            }
            this.queue.queueConsumer.set(false);
            LOG.trace("Queue Consumer is done.");
        }
    }

    public static class RateLimitedQueue {
        public final Queue<Runnable> queue;
        public final TokenBucket tokenBucket;
        private final AtomicBoolean queueConsumer;

        RateLimitedQueue(Queue<Runnable> queue, TokenBucket tokenBucket, AtomicBoolean queueConsumer) {
            this.queue = Objects.requireNonNull(queue);
            this.tokenBucket = Objects.requireNonNull(tokenBucket);
            this.queueConsumer = Objects.requireNonNull(queueConsumer);
        }

        public RateLimitedQueue(long maxEventsPerSecond) {
            this.queue = new LinkedList<Runnable>();
            Duration refillInterval = Duration.ofSeconds(1L).dividedBy(Preconditions.requirePositive((long)maxEventsPerSecond, (String)"maxEventsPerSecond must be a positive number"));
            boolean doBusyWait = refillInterval.toMillis() < 20L;
            this.tokenBucket = new TokenBucket(1L, refillInterval, doBusyWait);
            this.queueConsumer = new AtomicBoolean(false);
        }

        public synchronized void add(ChannelHandlerContext ctx, Runnable value) {
            this.queue.add(value);
            LOG.trace("New message has been enqueued. Messages in queue: {}", this.queue::size);
            if (this.queueConsumer.compareAndSet(false, true)) {
                ctx.executor().execute((Runnable)new QueueConsumer(this));
            }
        }

        public boolean tryConsume() {
            Runnable runnable = this.queue.poll();
            if (runnable != null) {
                this.tokenBucket.consume();
                LOG.trace("Consume message. Messages in queue: {}", this.queue::size);
                runnable.run();
                return true;
            }
            return false;
        }
    }
}

