package io.smallrye.mutiny.operators.multi;

import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.mutiny.subscription.ContextSupport;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.SerializedSubscriber;
import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:WEB-INF/lib/mutiny-1.4.0.jar:io/smallrye/mutiny/operators/multi/MultiRetryWhenOp.class */
public final class MultiRetryWhenOp<T> extends AbstractMultiOperator<T, T> {
    private final Function<? super Multi<Throwable>, ? extends Publisher<?>> triggerStreamFactory;
    private final Predicate<? super Throwable> onFailurePredicate;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/mutiny-1.4.0.jar:io/smallrye/mutiny/operators/multi/MultiRetryWhenOp$RetryWhenOperator.class */
    public static final class RetryWhenOperator<T> extends SwitchableSubscriptionSubscriber<T> {
        private final Publisher<? extends T> upstream;
        private final AtomicInteger wip;
        private final Subscriber<Throwable> signaller;
        private final Subscriptions.DeferredSubscription arbiter;
        private final Predicate<? super Throwable> onFailurePredicate;
        long produced;

        RetryWhenOperator(Publisher<? extends T> publisher, Predicate<? super Throwable> predicate, MultiSubscriber<? super T> multiSubscriber, Subscriber<Throwable> subscriber) {
            super(multiSubscriber);
            this.wip = new AtomicInteger();
            this.arbiter = new Subscriptions.DeferredSubscription();
            this.onFailurePredicate = predicate;
            this.upstream = publisher;
            this.signaller = subscriber;
        }

        @Override // io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber, org.reactivestreams.Subscription
        public void cancel() {
            if (isCancelled()) {
                return;
            }
            this.arbiter.cancel();
            super.cancel();
        }

        public void setWhen(Subscription subscription) {
            this.arbiter.set(subscription);
        }

        @Override // io.smallrye.mutiny.subscription.MultiSubscriber
        public void onItem(T t) {
            this.downstream.onItem(t);
            this.produced++;
        }

        @Override // io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber, io.smallrye.mutiny.subscription.MultiSubscriber
        public void onFailure(Throwable th) {
            if (testOnFailurePredicate(th)) {
                return;
            }
            long j = this.produced;
            if (j != 0) {
                this.produced = 0L;
                emitted(j);
            }
            this.arbiter.request(1L);
            this.signaller.onNext(th);
        }

        private boolean testOnFailurePredicate(Throwable th) {
            try {
                if (!this.onFailurePredicate.test(th)) {
                    this.arbiter.cancel();
                    this.downstream.onFailure(th);
                }
                return false;
            } catch (Throwable th2) {
                this.arbiter.cancel();
                this.downstream.onFailure(new CompositeException(th2, th));
                return true;
            }
        }

        @Override // io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber, io.smallrye.mutiny.subscription.MultiSubscriber
        public void onCompletion() {
            this.arbiter.cancel();
            this.downstream.onComplete();
        }

        void resubscribe() {
            if (this.wip.getAndIncrement() != 0) {
                return;
            }
            while (!isCancelled()) {
                this.upstream.subscribe(this);
                if (this.wip.decrementAndGet() == 0) {
                    return;
                }
            }
        }

        void whenFailure(Throwable th) {
            super.cancel();
            this.downstream.onFailure(th);
        }

        void whenComplete() {
            super.cancel();
            this.downstream.onComplete();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/mutiny-1.4.0.jar:io/smallrye/mutiny/operators/multi/MultiRetryWhenOp$TriggerSubscriber.class */
    public static final class TriggerSubscriber extends AbstractMulti<Throwable> implements Multi<Throwable>, Subscriber<Object>, ContextSupport {
        RetryWhenOperator<?> operator;
        private final Processor<Throwable, Throwable> processor = UnicastProcessor.create().serialized();
        private Context context;

        TriggerSubscriber() {
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.operator.setWhen(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Object obj) {
            this.operator.resubscribe();
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.operator.whenFailure(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.operator.whenComplete();
        }

        @Override // io.smallrye.mutiny.operators.AbstractMulti, org.reactivestreams.Publisher
        public void subscribe(Subscriber<? super Throwable> subscriber) {
            if (subscriber instanceof ContextSupport) {
                this.context = ((ContextSupport) subscriber).context();
            } else {
                this.context = Context.empty();
            }
            this.processor.subscribe(subscriber);
        }

        @Override // io.smallrye.mutiny.subscription.ContextSupport
        public Context context() {
            return this.context;
        }
    }

    public MultiRetryWhenOp(Multi<? extends T> multi, Predicate<? super Throwable> predicate, Function<? super Multi<Throwable>, ? extends Publisher<?>> function) {
        super(multi);
        this.onFailurePredicate = predicate;
        this.triggerStreamFactory = function;
    }

    private static <T> void subscribe(MultiSubscriber<? super T> multiSubscriber, Predicate<? super Throwable> predicate, Function<? super Multi<Throwable>, ? extends Publisher<?>> function, Multi<? extends T> multi) {
        TriggerSubscriber triggerSubscriber = new TriggerSubscriber();
        SerializedSubscriber serializedSubscriber = new SerializedSubscriber(triggerSubscriber.processor);
        serializedSubscriber.onSubscribe(Subscriptions.empty());
        SerializedSubscriber serializedSubscriber2 = new SerializedSubscriber(multiSubscriber);
        RetryWhenOperator<?> retryWhenOperator = new RetryWhenOperator<>(multi, predicate, serializedSubscriber2, serializedSubscriber);
        triggerSubscriber.operator = retryWhenOperator;
        serializedSubscriber2.onSubscribe(retryWhenOperator);
        try {
            Publisher<?> apply = function.apply(triggerSubscriber);
            if (apply == null) {
                throw new NullPointerException("The stream factory returned `null`");
            }
            apply.subscribe(triggerSubscriber);
            if (retryWhenOperator.isCancelled()) {
                return;
            }
            multi.subscribe(retryWhenOperator);
        } catch (Throwable th) {
            multiSubscriber.onFailure(th);
        }
    }

    @Override // io.smallrye.mutiny.operators.AbstractMulti
    public void subscribe(MultiSubscriber<? super T> multiSubscriber) {
        subscribe(multiSubscriber, this.onFailurePredicate, this.triggerStreamFactory, this.upstream);
    }
}
