package io.vertx.ext.reactivestreams.impl;

import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.ext.reactivestreams.ReactiveReadStream;
import java.util.ArrayDeque;
import java.util.Queue;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vertx/ext/reactivestreams/impl/ReactiveReadStreamImpl.class */
public class ReactiveReadStreamImpl<T> implements ReactiveReadStream<T> {
    private final long batchSize;
    private Handler<T> dataHandler;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;
    private Subscription subscription;
    private final Queue<T> pending = new ArrayDeque();
    private long demand = Long.MAX_VALUE;
    private long tokens;

    public ReactiveReadStreamImpl(long j) {
        this.batchSize = j;
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream
    /* renamed from: handler */
    public synchronized ReactiveReadStream<T> mo3handler(Handler<T> handler) {
        this.dataHandler = handler;
        if (this.dataHandler != null && this.demand > 0) {
            checkRequestTokens();
        }
        return this;
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream
    /* renamed from: pause */
    public synchronized ReactiveReadStream<T> mo2pause() {
        this.demand = 0L;
        return this;
    }

    /* renamed from: fetch, reason: merged with bridge method [inline-methods] */
    public ReactiveReadStream<T> m11fetch(long j) {
        T poll;
        if (j > 0) {
            this.demand += j;
            if (this.demand < 0) {
                this.demand = Long.MAX_VALUE;
            }
            while (this.demand > 0 && (poll = this.pending.poll()) != null) {
                if (this.demand != Long.MAX_VALUE) {
                    this.demand--;
                }
                handleData(poll);
            }
            checkRequestTokens();
        }
        return this;
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream
    /* renamed from: resume */
    public synchronized ReactiveReadStream<T> mo1resume() {
        return m11fetch(Long.MAX_VALUE);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream
    public synchronized ReactiveReadStream<T> endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream
    public synchronized ReactiveReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public synchronized void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("subscription");
        }
        if (this.subscription != null) {
            subscription.cancel();
        } else {
            this.subscription = subscription;
        }
    }

    public synchronized void onNext(T t) {
        if (t == null) {
            throw new NullPointerException("data");
        }
        checkUnsolicitedTokens();
        if (this.demand <= 0) {
            this.pending.add(t);
            return;
        }
        if (this.demand != Long.MAX_VALUE) {
            this.demand--;
        }
        if (this.pending.size() > 0) {
            this.pending.add(t);
            t = this.pending.poll();
        }
        handleData(t);
    }

    public synchronized void onError(Throwable th) {
        if (th == null) {
            throw new NullPointerException("throwable");
        }
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle(th);
        }
    }

    public synchronized void onComplete() {
        if (this.endHandler != null) {
            this.endHandler.handle((Object) null);
        }
    }

    protected void checkUnsolicitedTokens() {
        if (this.tokens == 0) {
            throw new IllegalStateException("Data received but wasn't requested");
        }
    }

    private synchronized void handleData(T t) {
        if (this.dataHandler != null) {
            this.dataHandler.handle(t);
            this.tokens--;
            checkRequestTokens();
        }
    }

    private void checkRequestTokens() {
        if (this.demand <= 0 || this.subscription == null || this.tokens != 0) {
            return;
        }
        this.tokens = this.batchSize;
        this.subscription.request(this.batchSize);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo0endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo4exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo5exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
