package io.atleon.core;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/atleon/core/Batcher.class */
public final class Batcher {
    private final int maxSize;
    private final Duration maxDuration;
    private final int prefetch;

    private Batcher(int i, Duration duration, int i2) {
        this.maxSize = i;
        this.maxDuration = duration;
        this.prefetch = i2;
    }

    public static Batcher create(int i, Duration duration, int i2) {
        return new Batcher(i, duration, i2);
    }

    public <T, R> Flux<R> applyMapping(Publisher<T> publisher, Function<? super List<T>, ? extends Publisher<? extends R>> function, int i) {
        return i <= 1 ? toBatches(publisher).concatMap(function, this.prefetch) : toBatches(publisher).publishOn(Schedulers.immediate(), this.prefetch).flatMap(function, i);
    }

    private <T> Flux<List<T>> toBatches(Publisher<T> publisher) {
        if (this.maxSize <= 1) {
            return Flux.from(publisher).map(Collections::singletonList);
        }
        if (this.maxDuration.isZero() || this.maxDuration.isNegative()) {
            throw new IllegalArgumentException("Batching is enabled, but batch duration is not positive");
        }
        return Flux.from(publisher).bufferTimeout(this.maxSize, this.maxDuration);
    }
}
