package io.atleon.core;

import io.atleon.util.Defaults;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/atleon/core/DeduplicatingTransformer.class */
final class DeduplicatingTransformer<T> implements Function<Publisher<T>, Publisher<T>> {
    private static final Scheduler DEFAULT_SCHEDULER = Schedulers.newBoundedElastic(Defaults.THREAD_CAP, Integer.MAX_VALUE, DeduplicatingTransformer.class.getSimpleName());
    private final DeduplicationConfig config;
    private final Deduplication<T> deduplication;
    private final Scheduler sourceScheduler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/core/DeduplicatingTransformer$AloDeduplication.class */
    public static final class AloDeduplication<T> implements Deduplication<Alo<T>> {
        private final Deduplication<T> deduplication;

        public AloDeduplication(Deduplication<T> deduplication) {
            this.deduplication = deduplication;
        }

        @Override // io.atleon.core.Deduplication
        public Object extractKey(Alo<T> alo) {
            return this.deduplication.extractKey(alo.get());
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // io.atleon.core.Deduplication
        public Alo<T> reduceDuplicates(Alo<T> alo, Alo<T> alo2) {
            Deduplication<T> deduplication = this.deduplication;
            Objects.requireNonNull(deduplication);
            return alo.reduce(deduplication::reduceDuplicates, alo2);
        }
    }

    private DeduplicatingTransformer(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication, Scheduler scheduler) {
        this.config = deduplicationConfig;
        this.deduplication = deduplication;
        this.sourceScheduler = scheduler;
    }

    static <T> DeduplicatingTransformer<T> identity(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication) {
        return identity(deduplicationConfig, deduplication, DEFAULT_SCHEDULER);
    }

    static <T> DeduplicatingTransformer<T> identity(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication, Scheduler scheduler) {
        return new DeduplicatingTransformer<>(deduplicationConfig, deduplication, scheduler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> DeduplicatingTransformer<Alo<T>> alo(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication) {
        return alo(deduplicationConfig, deduplication, DEFAULT_SCHEDULER);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> DeduplicatingTransformer<Alo<T>> alo(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication, Scheduler scheduler) {
        return new DeduplicatingTransformer<>(deduplicationConfig, new AloDeduplication(deduplication), scheduler);
    }

    @Override // java.util.function.Function
    public Publisher<T> apply(Publisher<T> publisher) {
        return this.config.isEnabled() ? Flux.from(publisher).switchOnFirst((signal, flux) -> {
            return flux.transform((v1) -> {
                return applyDeduplication(v1);
            });
        }) : publisher;
    }

    private Flux<T> applyDeduplication(Publisher<T> publisher) {
        Scheduler single = Schedulers.single(this.sourceScheduler);
        Flux publishOn = Flux.from(publisher).publishOn(single, this.config.getDeduplicationSourcePrefetch());
        Deduplication<T> deduplication = this.deduplication;
        Objects.requireNonNull(deduplication);
        return publishOn.groupBy(deduplication::extractKey).flatMap(groupedFlux -> {
            return deduplicateGroup(groupedFlux, single);
        }, this.config.getDeduplicationConcurrency()).subscribeOn(single);
    }

    private Mono<T> deduplicateGroup(GroupedFlux<Object, T> groupedFlux, Scheduler scheduler) {
        Flux take = groupedFlux.take(this.config.getDeduplicationDuration(), scheduler).take(this.config.getMaxDeduplicationSize());
        Deduplication<T> deduplication = this.deduplication;
        Objects.requireNonNull(deduplication);
        return take.reduce(deduplication::reduceDuplicates);
    }
}
