package org.ossgang.commons.observables.operators;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.ossgang.commons.awaitables.Retry;
import org.ossgang.commons.monads.Maybe;
import org.ossgang.commons.observables.Observable;
import org.ossgang.commons.observables.Observer;
import org.ossgang.commons.observables.Observers;
import org.ossgang.commons.observables.Subscription;
import org.ossgang.commons.observables.SubscriptionOption;

/* loaded from: input_file:org/ossgang/commons/observables/operators/BlockingOperators.class */
public final class BlockingOperators {
    private BlockingOperators() {
        throw new UnsupportedOperationException("static only");
    }

    public static <T> T awaitNextValue(Observable<T> observable, Duration duration) {
        return (T) awaitNextItem(observable, duration, consumer -> {
            Objects.requireNonNull(consumer);
            return consumer::accept;
        });
    }

    public static <T> Maybe<T> awaitNext(Observable<T> observable, Duration duration) {
        return (Maybe) awaitNextItem(observable, duration, Observers::forMaybes);
    }

    private static <T, O> T awaitNextItem(Observable<O> observable, Duration duration, Function<Consumer<T>, Observer<O>> function) {
        AtomicReference atomicReference = new AtomicReference();
        Objects.requireNonNull(atomicReference);
        Subscription subscribe = observable.subscribe(function.apply(atomicReference::set), new SubscriptionOption[0]);
        try {
            Objects.requireNonNull(atomicReference);
            T atMost = Retry.retry(atomicReference::get).untilNotNull().atMost(duration);
            subscribe.unsubscribe();
            return atMost;
        } catch (Throwable th) {
            subscribe.unsubscribe();
            throw th;
        }
    }
}
