package monix.nio;

import java.nio.file.WatchEvent;
import monix.eval.Task;
import monix.eval.Task$;
import monix.eval.Task$AsyncBuilder$;
import monix.eval.Task$AsyncBuilder$CreatePartiallyApplied$;
import monix.execution.Ack$Continue$;
import monix.execution.Ack$Stop$;
import monix.execution.Callback;
import monix.execution.Cancelable;
import monix.execution.Cancelable$;
import monix.execution.Scheduler;
import monix.execution.atomic.AtomicBoolean;
import monix.execution.atomic.AtomicBuilder$AtomicBooleanBuilder$;
import monix.execution.atomic.PaddingStrategy$NoPadding$;
import monix.execution.cancelables.SingleAssignCancelable$;
import monix.execution.exceptions.APIContractViolationException$;
import monix.reactive.Observable;
import monix.reactive.observers.Subscriber;
import scala.MatchError;
import scala.Option;
import scala.collection.IterableOnceOps;
import scala.collection.JavaConverters$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.util.control.NonFatal$;

/* compiled from: WatchServiceObservable.scala */
/* loaded from: input_file:monix/nio/WatchServiceObservable.class */
public abstract class WatchServiceObservable extends Observable<WatchEvent<?>[]> {
    private final AtomicBoolean wasSubscribed = AtomicBuilder$AtomicBooleanBuilder$.MODULE$.buildInstance(false, PaddingStrategy$NoPadding$.MODULE$, true);
    private final Task<WatchEvent<?>[]> emptyTask = Task$AsyncBuilder$CreatePartiallyApplied$.MODULE$.apply$extension(Task$.MODULE$.create(), (scheduler, callback) -> {
        return Cancelable$.MODULE$.empty();
    }, Task$AsyncBuilder$.MODULE$.forCancelable());

    public abstract Option<WatchService> watchService();

    public Cancelable unsafeSubscribeFn(Subscriber<WatchEvent<?>[]> subscriber) {
        if (this.wasSubscribed.getAndSet(true)) {
            subscriber.onError(APIContractViolationException$.MODULE$.apply(getClass().getName()));
            return Cancelable$.MODULE$.empty();
        }
        try {
            return startPolling(subscriber);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    subscriber.onError((Throwable) unapply.get());
                    return Cancelable$.MODULE$.empty();
                }
            }
            throw th;
        }
    }

    public Future<BoxedUnit> init(Subscriber<WatchEvent<?>[]> subscriber) {
        return Future$.MODULE$.successful(BoxedUnit.UNIT);
    }

    private Cancelable startPolling(final Subscriber<WatchEvent<?>[]> subscriber) {
        Cancelable runAsync = Task$.MODULE$.fromFuture(init(subscriber)).flatMap(boxedUnit -> {
            return loop(subscriber, subscriber.scheduler());
        }).executeWithOptions(options -> {
            return options.enableAutoCancelableRunLoops();
        }).runAsync(new Callback<Throwable, WatchEvent<?>[]>(subscriber) { // from class: monix.nio.WatchServiceObservable$$anon$1
            private final Subscriber subscriber$1;

            {
                this.subscriber$1 = subscriber;
            }

            public void onSuccess(WatchEvent[] watchEventArr) {
            }

            public void onError(Throwable th) {
                this.subscriber$1.onError(th);
            }
        }, subscriber.scheduler());
        return SingleAssignCancelable$.MODULE$.plusOne(Cancelable$.MODULE$.apply(() -> {
            runAsync.cancel();
        }));
    }

    private Task<WatchEvent<?>[]> loop(Subscriber<WatchEvent<?>[]> subscriber, Scheduler scheduler) {
        return (Task) watchService().map(watchService -> {
            return watchService.take().doOnCancel(Task$.MODULE$.defer(() -> {
                return loop$$anonfun$3$$anonfun$1(r2);
            })).flatMap(watchKey -> {
                WatchEvent[] watchEventArr = (WatchEvent[]) ((IterableOnceOps) JavaConverters$.MODULE$.asScalaBufferConverter(watchKey.pollEvents()).asScala()).toArray(ClassTag$.MODULE$.apply(WatchEvent.class));
                watchKey.reset();
                return Task$.MODULE$.fromFuture(subscriber.onNext(watchEventArr)).flatMap(ack -> {
                    if (Ack$Continue$.MODULE$.equals(ack)) {
                        return loop(subscriber, scheduler);
                    }
                    if (Ack$Stop$.MODULE$.equals(ack)) {
                        return this.emptyTask;
                    }
                    throw new MatchError(ack);
                });
            });
        }).getOrElse(this::loop$$anonfun$2);
    }

    private static final Task loop$$anonfun$3$$anonfun$1(WatchService watchService) {
        return watchService.close();
    }

    private final Task loop$$anonfun$2() {
        return this.emptyTask;
    }
}
