/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.process.transformer.impl;

import io.activej.async.callback.Callback;
import io.activej.bytebuf.ByteBuf;
import io.activej.common.Checks;
import io.activej.common.Utils;
import io.activej.common.builder.AbstractBuilder;
import io.activej.csp.process.transformer.AbstractChannelTransformer;
import io.activej.promise.Promise;
import io.activej.reactor.schedule.ScheduledRunnable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import org.jetbrains.annotations.Nullable;

public final class RateLimiter<T>
extends AbstractChannelTransformer<RateLimiter<T>, T, T> {
    private static final Duration MILLIS_DURATION = ChronoUnit.MILLIS.getDuration();
    public final double refillRatePerMillis;
    public double tokens;
    public long lastRefillTimestamp;
    public Tokenizer<T> tokenizer = $ -> 1.0;
    @Nullable
    public ScheduledRunnable scheduledRunnable;

    public RateLimiter(double refillRatePerMillis) {
        this.refillRatePerMillis = refillRatePerMillis;
        this.lastRefillTimestamp = this.reactor.currentTimeMillis();
    }

    public static <T> RateLimiter<T> create(double refillRate, ChronoUnit perUnit) {
        return (RateLimiter)RateLimiter.builder(refillRate, perUnit).build();
    }

    public static <T> Builder builder(double refillRate, ChronoUnit perUnit) {
        Checks.checkArgument((refillRate >= 0.0 ? 1 : 0) != 0, (Object)"Negative refill rate");
        Duration perUnitDuration = perUnit.getDuration();
        double refillRatePerMillis = perUnit.ordinal() > ChronoUnit.MILLIS.ordinal() ? refillRate / (double)perUnitDuration.dividedBy(MILLIS_DURATION) : refillRate * (double)MILLIS_DURATION.dividedBy(perUnitDuration);
        return new RateLimiter<T>(refillRatePerMillis).new Builder();
    }

    @Override
    protected Promise<Void> onItem(T item) {
        this.scheduledRunnable = null;
        this.refill();
        double itemTokens = this.tokenizer.getTokens(item);
        if (itemTokens <= this.tokens) {
            this.tokens -= itemTokens;
            return this.send(item);
        }
        return Promise.ofCallback(cb -> {
            this.scheduledRunnable = this.reactor.delay(this.calculateDelay(itemTokens), () -> this.onItem(item).subscribe((Callback)cb));
        });
    }

    private void refill() {
        long timestamp = this.reactor.currentTimeMillis();
        double passedMillis = timestamp - this.lastRefillTimestamp;
        this.tokens += passedMillis * this.refillRatePerMillis;
        this.lastRefillTimestamp = timestamp;
    }

    private long calculateDelay(double itemTokens) {
        double missing = itemTokens - this.tokens;
        assert (missing > 0.0);
        return (long)Math.ceil(missing / this.refillRatePerMillis);
    }

    @Override
    protected void onCleanup() {
        this.scheduledRunnable = (ScheduledRunnable)Utils.nullify((Object)this.scheduledRunnable, ScheduledRunnable::cancel);
    }

    public static interface Tokenizer<T> {
        public double getTokens(T var1);

        public static Tokenizer<ByteBuf> forByteBufs() {
            return ByteBuf::readRemaining;
        }
    }

    public final class Builder
    extends AbstractBuilder<Builder, RateLimiter<T>> {
        private Builder() {
        }

        public Builder withInitialTokens(double initialTokens) {
            Builder.checkNotBuilt((AbstractBuilder)this);
            RateLimiter.this.tokens = initialTokens;
            return this;
        }

        public Builder withTokenizer(Tokenizer<T> tokenizer) {
            Builder.checkNotBuilt((AbstractBuilder)this);
            RateLimiter.this.tokenizer = tokenizer;
            return this;
        }

        protected RateLimiter<T> doBuild() {
            return RateLimiter.this;
        }
    }
}

