package org.drasyl.pipeline.handler;

import io.reactivex.rxjava3.disposables.Disposable;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.drasyl.pipeline.HandlerContext;
import org.drasyl.pipeline.address.Address;
import org.drasyl.pipeline.skeleton.HandlerAdapter;
import org.drasyl.util.Preconditions;
import org.drasyl.util.TokenBucket;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

/* loaded from: input_file:org/drasyl/pipeline/handler/OutboundMessagesThrottlingHandler.class */
public class OutboundMessagesThrottlingHandler extends HandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) OutboundMessagesThrottlingHandler.class);
    private final RateLimitedQueue queue;

    /* loaded from: input_file:org/drasyl/pipeline/handler/OutboundMessagesThrottlingHandler$QueueConsumer.class */
    public static class QueueConsumer implements Runnable {
        private final RateLimitedQueue queue;

        QueueConsumer(RateLimitedQueue rateLimitedQueue) {
            this.queue = (RateLimitedQueue) Objects.requireNonNull(rateLimitedQueue);
        }

        @Override // java.lang.Runnable
        public void run() {
            OutboundMessagesThrottlingHandler.LOG.trace("Queue Consumer started.");
            do {
            } while (this.queue.tryConsume());
            this.queue.queueConsumer = null;
            OutboundMessagesThrottlingHandler.LOG.trace("Queue Consumer is done.");
        }
    }

    /* loaded from: input_file:org/drasyl/pipeline/handler/OutboundMessagesThrottlingHandler$RateLimitedQueue.class */
    public static class RateLimitedQueue {
        public final Queue<Runnable> queue = new LinkedList();
        public final TokenBucket tokenBucket;
        private Disposable queueConsumer;

        public RateLimitedQueue(long j) {
            Duration dividedBy = Duration.ofSeconds(1L).dividedBy(Preconditions.requirePositive(j, "maxEventsPerSecond must be a positive number"));
            this.tokenBucket = new TokenBucket(1L, dividedBy, dividedBy.toMillis() < 20);
        }

        public synchronized void add(HandlerContext handlerContext, Runnable runnable) {
            this.queue.add(runnable);
            Logger logger = OutboundMessagesThrottlingHandler.LOG;
            Queue<Runnable> queue = this.queue;
            Objects.requireNonNull(queue);
            logger.trace("New message has been enqueued. Messages in queue: {}", queue::size);
            if (this.queueConsumer == null) {
                this.queueConsumer = handlerContext.dependentScheduler().scheduleDirect(new QueueConsumer(this));
            }
        }

        public boolean tryConsume() {
            Runnable poll = this.queue.poll();
            if (poll == null) {
                return false;
            }
            this.tokenBucket.consume();
            Logger logger = OutboundMessagesThrottlingHandler.LOG;
            Queue<Runnable> queue = this.queue;
            Objects.requireNonNull(queue);
            logger.trace("Consume message. Messages in queue: {}", queue::size);
            poll.run();
            return true;
        }
    }

    OutboundMessagesThrottlingHandler(RateLimitedQueue rateLimitedQueue) {
        this.queue = (RateLimitedQueue) Objects.requireNonNull(rateLimitedQueue);
    }

    public OutboundMessagesThrottlingHandler(long j) {
        this(new RateLimitedQueue(j));
    }

    @Override // org.drasyl.pipeline.skeleton.HandlerAdapter, org.drasyl.pipeline.Handler
    public void onOutbound(HandlerContext handlerContext, Address address, Object obj, CompletableFuture<Void> completableFuture) {
        this.queue.add(handlerContext, () -> {
            handlerContext.passOutbound(address, obj, completableFuture);
        });
    }
}
