package monix.reactive.internal.builders;

import monix.execution.Ack$;
import monix.execution.Ack$AckExtensions$;
import monix.execution.Cancelable;
import monix.execution.Cancelable$;
import monix.execution.Scheduler;
import monix.execution.cancelables.SingleAssignCancelable;
import monix.execution.cancelables.SingleAssignCancelable$;
import monix.reactive.Observable;
import monix.reactive.Pipe;
import monix.reactive.observables.ConnectableObservable;
import monix.reactive.observers.Subscriber;
import scala.Function1;
import scala.concurrent.Future;
import scala.util.control.NonFatal$;

/* compiled from: PipeThroughSelectorObservable.scala */
/* loaded from: input_file:monix/reactive/internal/builders/PipeThroughSelectorObservable.class */
public final class PipeThroughSelectorObservable<A, B, C> extends Observable<C> {
    private final Observable<A> source;
    private final Pipe<A, B> pipe;
    private final Function1<Observable<B>, Observable<C>> f;

    public <A, B, C> PipeThroughSelectorObservable(Observable<A> observable, Pipe<A, B> pipe, Function1<Observable<B>, Observable<C>> function1) {
        this.source = observable;
        this.pipe = pipe;
        this.f = function1;
    }

    @Override // monix.reactive.Observable
    public Cancelable unsafeSubscribeFn(final Subscriber<C> subscriber) {
        boolean z = true;
        final SingleAssignCancelable apply = SingleAssignCancelable$.MODULE$.apply();
        try {
            ConnectableObservable<R> multicast = this.source.multicast(this.pipe, subscriber.scheduler());
            z = false;
            Cancelable unsafeSubscribeFn = ((Observable) this.f.apply(multicast)).unsafeSubscribeFn(new Subscriber<C>(subscriber, apply) { // from class: monix.reactive.internal.builders.PipeThroughSelectorObservable$$anon$1
                private final Subscriber out$1;
                private final SingleAssignCancelable upstream$1;
                private final Scheduler scheduler;

                {
                    this.out$1 = subscriber;
                    this.upstream$1 = apply;
                    this.scheduler = subscriber.scheduler();
                }

                @Override // monix.reactive.observers.Subscriber
                public Scheduler scheduler() {
                    return this.scheduler;
                }

                @Override // monix.reactive.Observer
                public void onError(Throwable th) {
                    this.out$1.onError(th);
                }

                @Override // monix.reactive.Observer
                public void onComplete() {
                    this.out$1.onComplete();
                }

                @Override // monix.reactive.Observer
                /* renamed from: onNext */
                public Future mo23onNext(Object obj) {
                    return Ack$AckExtensions$.MODULE$.syncOnStopOrFailure$extension(Ack$.MODULE$.AckExtensions(this.out$1.mo23onNext(obj)), option -> {
                        this.upstream$1.cancel();
                    }, scheduler());
                }
            });
            apply.$colon$eq(multicast.connect());
            return Cancelable$.MODULE$.apply(() -> {
                apply.cancel();
                unsafeSubscribeFn.cancel();
            });
        } catch (Throwable th) {
            if (!NonFatal$.MODULE$.apply(th)) {
                throw th;
            }
            apply.cancel();
            if (z) {
                subscriber.onError(th);
            } else {
                subscriber.scheduler().reportFailure(th);
            }
            return Cancelable$.MODULE$.empty();
        }
    }
}
