package monix.nio;

import java.nio.ByteBuffer;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.Ack$Continue$;
import monix.execution.Ack$Stop$;
import monix.execution.Cancelable;
import monix.execution.Cancelable$;
import monix.execution.Scheduler;
import monix.execution.atomic.AtomicBoolean;
import monix.execution.atomic.AtomicBuilder$AtomicBooleanBuilder$;
import monix.execution.atomic.PaddingStrategy$NoPadding$;
import monix.execution.cancelables.SingleAssignCancelable$;
import monix.execution.exceptions.APIContractViolationException$;
import monix.nio.internal.Bytes;
import monix.nio.internal.Bytes$;
import monix.nio.internal.EmptyBytes$;
import monix.nio.internal.NonEmptyBytes;
import monix.nio.internal.NonEmptyBytes$;
import monix.reactive.Observable;
import monix.reactive.observers.Subscriber;
import scala.MatchError;
import scala.Option;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: AsyncChannelObservable.scala */
/* loaded from: input_file:monix/nio/AsyncChannelObservable.class */
public abstract class AsyncChannelObservable extends Observable<byte[]> {
    private final AtomicBoolean wasSubscribed = AtomicBuilder$AtomicBooleanBuilder$.MODULE$.buildInstance(false, PaddingStrategy$NoPadding$.MODULE$, true);
    private final ByteBuffer buffer = ByteBuffer.allocate(bufferSize());

    public abstract int bufferSize();

    public abstract Option<AsyncChannel> channel();

    public Future<BoxedUnit> init(Subscriber<byte[]> subscriber) {
        return Future$.MODULE$.successful(BoxedUnit.UNIT);
    }

    public Cancelable unsafeSubscribeFn(Subscriber<byte[]> subscriber) {
        if (this.wasSubscribed.getAndSet(true)) {
            subscriber.onError(APIContractViolationException$.MODULE$.apply(getClass().getName()));
            return Cancelable$.MODULE$.empty();
        }
        try {
            return startReading(subscriber);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    subscriber.onError((Throwable) unapply.get());
                    closeChannel(subscriber.scheduler());
                    return Cancelable$.MODULE$.empty();
                }
            }
            throw th;
        }
    }

    private Cancelable startReading(Subscriber<byte[]> subscriber) {
        Cancelable runAsync = Task$.MODULE$.fromFuture(init(subscriber)).flatMap(boxedUnit -> {
            return loop(subscriber, 0L, subscriber.scheduler());
        }).executeWithOptions(options -> {
            return options.enableAutoCancelableRunLoops();
        }).runAsync(new AsyncChannelObservable$$anon$1(subscriber, this), subscriber.scheduler());
        return SingleAssignCancelable$.MODULE$.plusOne(Cancelable$.MODULE$.apply(() -> {
            runAsync.cancel();
            closeChannel(subscriber.scheduler());
        }));
    }

    private Task<byte[]> loop(Subscriber<byte[]> subscriber, long j, Scheduler scheduler) {
        this.buffer.clear();
        return (Task) channel().map(asyncChannel -> {
            return asyncChannel.read(this.buffer, j).doOnCancel(Task$.MODULE$.defer(() -> {
                return loop$$anonfun$3$$anonfun$1(r2);
            })).flatMap(obj -> {
                return loop$$anonfun$5$$anonfun$3(subscriber, j, scheduler, BoxesRunTime.unboxToInt(obj));
            });
        }).getOrElse(AsyncChannelObservable::loop$$anonfun$2);
    }

    public final void closeChannel(Scheduler scheduler) {
        channel().foreach(asyncChannel -> {
            return asyncChannel.close().runToFuture(scheduler);
        });
    }

    private static final Task loop$$anonfun$3$$anonfun$1(AsyncChannel asyncChannel) {
        return asyncChannel.close();
    }

    private final /* synthetic */ Task loop$$anonfun$5$$anonfun$3(Subscriber subscriber, long j, Scheduler scheduler, int i) {
        Bytes apply = Bytes$.MODULE$.apply(this.buffer, i);
        if (EmptyBytes$.MODULE$.equals(apply)) {
            subscriber.onComplete();
            return Task$.MODULE$.now(Bytes$.MODULE$.emptyBytes());
        }
        if (!(apply instanceof NonEmptyBytes)) {
            throw new MatchError(apply);
        }
        return Task$.MODULE$.fromFuture(subscriber.onNext(NonEmptyBytes$.MODULE$.unapply((NonEmptyBytes) apply)._1())).flatMap(ack -> {
            if (Ack$Continue$.MODULE$.equals(ack)) {
                return loop(subscriber, j + i, scheduler);
            }
            if (Ack$Stop$.MODULE$.equals(ack)) {
                return Task$.MODULE$.now(Bytes$.MODULE$.emptyBytes());
            }
            throw new MatchError(ack);
        });
    }

    private static final Task loop$$anonfun$2() {
        return Task$.MODULE$.now(Bytes$.MODULE$.emptyBytes());
    }
}
