package io.vertx.rx.java;

import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;

/* loaded from: input_file:io/vertx/rx/java/ObservableReadStream.class */
public class ObservableReadStream<T, R> implements Observable.OnSubscribe<R> {
    private static final Throwable COMPLETED_SENTINEL = new Throwable();
    public static final long DEFAULT_MAX_BUFFER_SIZE = 256;
    private final ReadStream<T> stream;
    private final Function<T, R> adapter;
    private final AtomicReference<ObservableReadStream<T, R>.Sub> subscription;
    private boolean subscribed;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/rx/java/ObservableReadStream$Adapter.class */
    public abstract class Adapter implements Handler<T> {
        protected final Subscriber<? super R> subscriber;
        long requested;

        Adapter(Subscriber<? super R> subscriber) {
            this.subscriber = subscriber;
        }

        synchronized long requested() {
            long j;
            synchronized (ObservableReadStream.this) {
                j = this.requested;
            }
            return j;
        }

        void request(long j) {
            synchronized (ObservableReadStream.this) {
                if (j != Long.MAX_VALUE) {
                    if (j < Long.MAX_VALUE - this.requested) {
                        this.requested += j;
                    }
                }
                this.requested = Long.MAX_VALUE;
            }
        }

        abstract boolean dispose();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract void end(Throwable th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/rx/java/ObservableReadStream$QueueAdapter.class */
    public class QueueAdapter extends ObservableReadStream<T, R>.Adapter {
        QueueAdapter(Subscriber<? super R> subscriber) {
            super(subscriber);
        }

        @Override // io.vertx.rx.java.ObservableReadStream.Adapter
        boolean dispose() {
            return true;
        }

        @Override // io.vertx.rx.java.ObservableReadStream.Adapter
        void end(Throwable th) {
            if (th == ObservableReadStream.COMPLETED_SENTINEL) {
                this.subscriber.onCompleted();
            } else {
                this.subscriber.onError(th);
            }
        }

        public void handle(T t) {
            this.subscriber.onNext(ObservableReadStream.this.adapter.apply(t));
        }

        @Override // io.vertx.rx.java.ObservableReadStream.Adapter
        void request(long j) {
            super.request(j);
            if (!ObservableReadStream.this.subscribed || j <= 0) {
                return;
            }
            ObservableReadStream.this.stream.fetch(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/rx/java/ObservableReadStream$Sub.class */
    public class Sub implements Subscription, Producer {
        private ObservableReadStream<T, R>.Adapter adapter;

        Sub(ObservableReadStream<T, R>.Adapter adapter) {
            this.adapter = adapter;
        }

        public void request(long j) {
            if (j < 0) {
                throw new IllegalArgumentException("Cannot request negative items:" + j);
            }
            this.adapter.request(j);
        }

        public void unsubscribe() {
            boolean dispose;
            if (ObservableReadStream.this.subscription.compareAndSet(this, null)) {
                synchronized (ObservableReadStream.this) {
                    dispose = this.adapter.dispose();
                }
                RxHelper.setNullHandlers(ObservableReadStream.this.stream);
                if (dispose) {
                    try {
                        ObservableReadStream.this.stream.resume();
                    } catch (Exception e) {
                    }
                }
                ObservableReadStream.this.subscribed = false;
            }
        }

        public boolean isUnsubscribed() {
            return ObservableReadStream.this.subscription.get() != this;
        }
    }

    public ObservableReadStream(ReadStream<T> readStream, Function<T, R> function) {
        this(readStream, function, 256L);
    }

    public ObservableReadStream(ReadStream<T> readStream, Function<T, R> function, long j) {
        this.subscription = new AtomicReference<>();
        readStream.pause();
        this.stream = readStream;
        this.adapter = function;
    }

    public long getRequested() {
        ObservableReadStream<T, R>.Sub sub = this.subscription.get();
        if (sub != null) {
            return ((Sub) sub).adapter.requested();
        }
        return 0L;
    }

    public void call(Subscriber<? super R> subscriber) {
        QueueAdapter queueAdapter = new QueueAdapter(subscriber);
        ObservableReadStream<T, R>.Sub sub = new Sub(queueAdapter);
        if (!this.subscription.compareAndSet(null, sub)) {
            throw new IllegalStateException();
        }
        subscriber.setProducer(sub);
        subscriber.add(sub);
        ReadStream<T> readStream = this.stream;
        ObservableReadStream<T, R>.Adapter adapter = ((Sub) sub).adapter;
        Objects.requireNonNull(adapter);
        readStream.exceptionHandler(adapter::end);
        this.stream.endHandler(r4 -> {
            sub.adapter.end(COMPLETED_SENTINEL);
        });
        this.stream.handler(((Sub) sub).adapter);
        this.subscribed = true;
        long requested = queueAdapter.requested();
        this.stream.pause();
        if (requested > 0) {
            this.stream.fetch(requested);
        }
    }
}
