package io.atleon.core;

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/atleon/core/ActivityEnforcingTransformer.class */
final class ActivityEnforcingTransformer<T> implements Function<Publisher<T>, Publisher<T>> {
    private final ActivityEnforcementConfig config;

    /* loaded from: input_file:io/atleon/core/ActivityEnforcingTransformer$InactiveStreamException.class */
    private static final class InactiveStreamException extends TimeoutException {
        public InactiveStreamException(String str, Duration duration, Instant instant) {
            super(String.format("Stream=%s has been inactive for longer than duration=%s with lastActive=%s", str, duration, instant));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ActivityEnforcingTransformer(ActivityEnforcementConfig activityEnforcementConfig) {
        this.config = activityEnforcementConfig;
    }

    @Override // java.util.function.Function
    public Publisher<T> apply(Publisher<T> publisher) {
        return this.config.isEnabled() ? Flux.from(publisher).transformDeferred((v1) -> {
            return enforceActivity(v1);
        }) : publisher;
    }

    private Flux<T> enforceActivity(Publisher<T> publisher) {
        AtomicReference atomicReference = new AtomicReference(Instant.now());
        Objects.requireNonNull(atomicReference);
        return Flux.merge(new Publisher[]{createInactivityError(atomicReference::get), publisher}).doOnEach(signal -> {
            atomicReference.set(Instant.now());
        });
    }

    private Mono<T> createInactivityError(Supplier<Instant> supplier) {
        return Flux.interval(this.config.getDelay(), this.config.getInterval()).map(l -> {
            return (Instant) supplier.get();
        }).filter(this::hasBecomeInactiveSince).next().flatMap(this::createInactivityError);
    }

    private boolean hasBecomeInactiveSince(Instant instant) {
        return this.config.getMaxInactivity().compareTo(Duration.between(instant, Instant.now())) < 0;
    }

    private Mono<T> createInactivityError(Instant instant) {
        return Mono.error(new InactiveStreamException(this.config.getName(), this.config.getMaxInactivity(), instant));
    }
}
