package monix.reactive.internal.operators;

import monix.eval.Task;
import monix.execution.Ack;
import monix.execution.Ack$;
import monix.execution.Ack$AckExtensions$;
import monix.execution.Ack$Continue$;
import monix.execution.Ack$Stop$;
import monix.execution.AsyncSemaphore;
import monix.execution.AsyncSemaphore$;
import monix.execution.Callback$;
import monix.execution.Cancelable;
import monix.execution.Cancelable$;
import monix.execution.CancelableFuture;
import monix.execution.ChannelType$MultiProducer$;
import monix.execution.Scheduler;
import monix.execution.cancelables.CompositeCancelable;
import monix.execution.cancelables.CompositeCancelable$;
import monix.execution.cancelables.SingleAssignCancelable;
import monix.execution.cancelables.SingleAssignCancelable$;
import monix.reactive.Observable;
import monix.reactive.OverflowStrategy;
import monix.reactive.observers.BufferedSubscriber$;
import monix.reactive.observers.Subscriber;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.concurrent.Future;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.control.NonFatal$;

/* compiled from: MapParallelUnorderedObservable.scala */
/* loaded from: input_file:monix/reactive/internal/operators/MapParallelUnorderedObservable.class */
public final class MapParallelUnorderedObservable<A, B> extends Observable<B> {
    private final Observable<A> source;
    public final int monix$reactive$internal$operators$MapParallelUnorderedObservable$$parallelism;
    public final Function1<A, Task<B>> monix$reactive$internal$operators$MapParallelUnorderedObservable$$f;
    public final OverflowStrategy<B> monix$reactive$internal$operators$MapParallelUnorderedObservable$$overflowStrategy;

    /* compiled from: MapParallelUnorderedObservable.scala */
    /* loaded from: input_file:monix/reactive/internal/operators/MapParallelUnorderedObservable$MapAsyncParallelSubscription.class */
    private final class MapAsyncParallelSubscription implements Subscriber<A>, Cancelable {
        private final CompositeCancelable composite;
        private final Scheduler scheduler;
        private final AsyncSemaphore semaphore;
        private final Subscriber<B> buffer;
        private boolean isDone;
        private Ack lastAck;
        private final MapParallelUnorderedObservable<A, B> $outer;

        public MapAsyncParallelSubscription(MapParallelUnorderedObservable mapParallelUnorderedObservable, Subscriber<B> subscriber, CompositeCancelable compositeCancelable) {
            this.composite = compositeCancelable;
            if (mapParallelUnorderedObservable == null) {
                throw new NullPointerException();
            }
            this.$outer = mapParallelUnorderedObservable;
            this.scheduler = subscriber.scheduler();
            this.semaphore = AsyncSemaphore$.MODULE$.apply(mapParallelUnorderedObservable.monix$reactive$internal$operators$MapParallelUnorderedObservable$$parallelism, AsyncSemaphore$.MODULE$.apply$default$2());
            this.buffer = BufferedSubscriber$.MODULE$.apply(subscriber, mapParallelUnorderedObservable.monix$reactive$internal$operators$MapParallelUnorderedObservable$$overflowStrategy, ChannelType$MultiProducer$.MODULE$);
            this.isDone = false;
            this.lastAck = Ack$Continue$.MODULE$;
        }

        @Override // monix.reactive.observers.Subscriber
        public Scheduler scheduler() {
            return this.scheduler;
        }

        private Object process(A a) {
            boolean z = true;
            try {
                SingleAssignCancelable apply = SingleAssignCancelable$.MODULE$.apply();
                this.composite.$plus$eq(apply);
                z = false;
                return apply.$colon$eq(((Task) this.$outer.monix$reactive$internal$operators$MapParallelUnorderedObservable$$f.apply(a)).redeem(th -> {
                    this.lastAck = Ack$Stop$.MODULE$;
                    this.composite.$minus$eq(apply);
                    this.composite.cancel();
                    onError(th);
                }, obj -> {
                    Ack$AckExtensions$.MODULE$.syncOnComplete$extension(Ack$.MODULE$.AckExtensions(this.buffer.mo23onNext(obj)), r6 -> {
                        if (r6 instanceof Success) {
                            Ack ack = (Ack) ((Success) r6).value();
                            if (Ack$Stop$.MODULE$.equals(ack)) {
                                this.lastAck = Ack$Stop$.MODULE$;
                                this.composite.cancel();
                                return;
                            } else if (Ack$Continue$.MODULE$.equals(ack)) {
                                this.semaphore.release();
                                this.composite.$minus$eq(apply);
                                return;
                            }
                        }
                        if (!(r6 instanceof Failure)) {
                            throw new MatchError(r6);
                        }
                        Throwable exception = ((Failure) r6).exception();
                        this.lastAck = Ack$Stop$.MODULE$;
                        this.composite.$minus$eq(apply);
                        onError(exception);
                    }, scheduler());
                }).runAsync(Callback$.MODULE$.empty(scheduler()), scheduler()));
            } catch (Throwable th2) {
                if (!NonFatal$.MODULE$.apply(th2)) {
                    throw th2;
                }
                if (z) {
                    onError(th2);
                } else {
                    scheduler().reportFailure(th2);
                }
                return BoxedUnit.UNIT;
            }
        }

        @Override // monix.reactive.Observer
        /* renamed from: onNext */
        public Future<Ack> mo23onNext(A a) {
            CancelableFuture cancelableFuture;
            Ack ack = this.lastAck;
            Ack$Stop$ ack$Stop$ = Ack$Stop$.MODULE$;
            if (ack != null ? !ack.equals(ack$Stop$) : ack$Stop$ != null) {
                if (!this.isDone) {
                    CancelableFuture acquire = this.semaphore.acquire();
                    Option value = acquire.value();
                    if (None$.MODULE$.equals(value)) {
                        cancelableFuture = acquire.flatMap(MapParallelUnorderedObservable::monix$reactive$internal$operators$MapParallelUnorderedObservable$MapAsyncParallelSubscription$$_$_$$anonfun$1, scheduler());
                    } else {
                        if (!(value instanceof Some)) {
                            throw new MatchError(value);
                        }
                        cancelableFuture = Ack$Continue$.MODULE$;
                    }
                    Future future = (Future) cancelableFuture;
                    this.composite.$plus$eq(acquire);
                    future.onComplete(r7 -> {
                        if (r7 instanceof Success) {
                            this.composite.$minus$eq(acquire);
                            return process(a);
                        }
                        if (!(r7 instanceof Failure)) {
                            throw new MatchError(r7);
                        }
                        Throwable exception = ((Failure) r7).exception();
                        this.composite.$minus$eq(acquire);
                        onError(exception);
                        return BoxedUnit.UNIT;
                    }, scheduler());
                    return Ack$AckExtensions$.MODULE$.syncTryFlatten$extension(Ack$.MODULE$.AckExtensions(future), scheduler());
                }
            }
            return Ack$Stop$.MODULE$;
        }

        @Override // monix.reactive.Observer
        public void onError(Throwable th) {
            if (this.isDone) {
                return;
            }
            this.isDone = true;
            this.lastAck = Ack$Stop$.MODULE$;
            this.buffer.onError(th);
        }

        @Override // monix.reactive.Observer
        public void onComplete() {
            this.semaphore.awaitAvailable(this.$outer.monix$reactive$internal$operators$MapParallelUnorderedObservable$$parallelism).foreach(boxedUnit -> {
                if (this.isDone) {
                    return;
                }
                this.isDone = true;
                this.lastAck = Ack$Stop$.MODULE$;
                this.buffer.onComplete();
            }, scheduler());
        }

        public void cancel() {
            this.isDone = true;
            this.composite.cancel();
        }

        public final MapParallelUnorderedObservable<A, B> monix$reactive$internal$operators$MapParallelUnorderedObservable$MapAsyncParallelSubscription$$$outer() {
            return this.$outer;
        }
    }

    public <A, B> MapParallelUnorderedObservable(Observable<A> observable, int i, Function1<A, Task<B>> function1, OverflowStrategy<B> overflowStrategy) {
        this.source = observable;
        this.monix$reactive$internal$operators$MapParallelUnorderedObservable$$parallelism = i;
        this.monix$reactive$internal$operators$MapParallelUnorderedObservable$$f = function1;
        this.monix$reactive$internal$operators$MapParallelUnorderedObservable$$overflowStrategy = overflowStrategy;
    }

    @Override // monix.reactive.Observable
    public Cancelable unsafeSubscribeFn(Subscriber<B> subscriber) {
        if (this.monix$reactive$internal$operators$MapParallelUnorderedObservable$$parallelism <= 0) {
            subscriber.onError(new IllegalArgumentException("parallelism > 0"));
            return Cancelable$.MODULE$.empty();
        }
        if (this.monix$reactive$internal$operators$MapParallelUnorderedObservable$$parallelism == 1) {
            return new MapTaskObservable(this.source, this.monix$reactive$internal$operators$MapParallelUnorderedObservable$$f).unsafeSubscribeFn(subscriber);
        }
        CompositeCancelable apply = CompositeCancelable$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Cancelable[0]));
        MapAsyncParallelSubscription mapAsyncParallelSubscription = new MapAsyncParallelSubscription(this, subscriber, apply);
        apply.$plus$eq(this.source.unsafeSubscribeFn(mapAsyncParallelSubscription));
        return mapAsyncParallelSubscription;
    }

    public static final /* synthetic */ Future monix$reactive$internal$operators$MapParallelUnorderedObservable$MapAsyncParallelSubscription$$_$_$$anonfun$1(BoxedUnit boxedUnit) {
        return Ack$Continue$.MODULE$;
    }
}
