package io.vertx.test.fakestream;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.Arguments;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.stream.Stream;

/* loaded from: input_file:io/vertx/test/fakestream/FakeStream.class */
public class FakeStream<T> implements ReadStream<T>, WriteStream<T> {
    private static final Object END_SENTINEL = new Object();
    private boolean emitting;
    private Handler<Throwable> exceptionHandler;
    private Handler<T> itemHandler;
    private Handler<Void> endHandler;
    private boolean ended;
    private boolean overflow;
    private Handler<Void> drainHandler;
    private int pauseCount;
    private int resumeCount;
    private long highWaterMark = 16;
    private final Deque<Op<T>> pending = new ArrayDeque();
    private long demand = Long.MAX_VALUE;
    private Future<Void> end = Future.succeededFuture();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vertx/test/fakestream/FakeStream$Op.class */
    public static class Op<T> {
        final T item;
        final Promise<Void> ack;

        Op(T t) {
            this.item = t;
            this.ack = Promise.promise();
        }

        Op(T t, Promise<Void> promise) {
            this.item = t;
            this.ack = promise;
        }
    }

    public synchronized int pauseCount() {
        return this.pauseCount;
    }

    public synchronized int resumeCount() {
        return this.resumeCount;
    }

    public synchronized boolean isPaused() {
        return this.demand == 0;
    }

    public synchronized boolean isEnded() {
        return this.ended;
    }

    public synchronized long demand() {
        return this.demand;
    }

    public synchronized FakeStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public synchronized Handler<Throwable> exceptionHandler() {
        return this.exceptionHandler;
    }

    public final boolean emit(T t) {
        return emit((Stream) Stream.of(t));
    }

    public boolean emit(Stream<T> stream) {
        return doEmit(stream.map(Op::new));
    }

    public final boolean doEmit(Op<T> op) {
        return doEmit(Stream.of(op));
    }

    private synchronized boolean doEmit(Stream<Op<T>> stream) {
        if (this.ended) {
            throw new IllegalStateException();
        }
        Deque<Op<T>> deque = this.pending;
        deque.getClass();
        stream.forEach((v1) -> {
            r1.add(v1);
        });
        checkPending();
        boolean z = ((long) this.pending.size()) <= this.highWaterMark;
        this.overflow |= !z;
        return z;
    }

    public Future<Void> end() {
        Promise promise = Promise.promise();
        end(promise);
        return promise.future();
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        synchronized (this) {
            if (this.ended) {
                throw new IllegalStateException();
            }
            this.ended = true;
            Promise promise = Promise.promise();
            promise.future().onComplete(asyncResult -> {
                if (handler != null) {
                    handler.handle(asyncResult);
                }
                Handler<Void> endHandler = endHandler();
                if (endHandler != null) {
                    endHandler.handle((Object) null);
                }
            });
            this.pending.add(new Op<>(END_SENTINEL, promise));
        }
        checkPending();
    }

    public synchronized void fail(Throwable th) {
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle(th);
        }
    }

    /* renamed from: handler, reason: merged with bridge method [inline-methods] */
    public synchronized FakeStream<T> m102handler(Handler<T> handler) {
        this.itemHandler = handler;
        return this;
    }

    public synchronized Handler<T> handler() {
        return this.itemHandler;
    }

    /* renamed from: pause, reason: merged with bridge method [inline-methods] */
    public synchronized FakeStream<T> m101pause() {
        this.pauseCount++;
        this.demand = 0L;
        return this;
    }

    private void checkPending() {
        Op<T> poll;
        if (this.emitting) {
            return;
        }
        this.emitting = true;
        while (this.demand > 0 && (poll = this.pending.poll()) != null) {
            if (this.demand != Long.MAX_VALUE) {
                this.demand--;
            }
            if (poll.item == END_SENTINEL) {
                this.end.onComplete(poll.ack);
            } else {
                Handler<T> handler = this.itemHandler;
                if (handler != null) {
                    try {
                        handler.handle(poll.item);
                    } catch (Exception e) {
                        poll.ack.fail(e);
                    }
                }
                poll.ack.complete();
            }
        }
        if (this.pending.isEmpty() && this.overflow) {
            this.overflow = false;
            Handler<Void> handler2 = this.drainHandler;
            this.drainHandler = null;
            if (handler2 != null) {
                handler2.handle((Object) null);
            }
        }
        this.emitting = false;
    }

    /* renamed from: fetch, reason: merged with bridge method [inline-methods] */
    public synchronized FakeStream<T> m99fetch(long j) {
        Arguments.require(j > 0, "Fetch amount must be > 0L");
        this.demand += j;
        if (this.demand < 0) {
            this.demand = Long.MAX_VALUE;
        }
        checkPending();
        return this;
    }

    /* renamed from: resume, reason: merged with bridge method [inline-methods] */
    public FakeStream<T> m100resume() {
        synchronized (this) {
            this.resumeCount++;
        }
        return m99fetch(Long.MAX_VALUE);
    }

    public synchronized FakeStream<T> endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    public synchronized Handler<Void> endHandler() {
        return this.endHandler;
    }

    public Future<Void> write(T t) {
        Promise promise = Promise.promise();
        doEmit(new Op<>(t, promise));
        return promise.future();
    }

    public void write(T t, Handler<AsyncResult<Void>> handler) {
        Promise promise = Promise.promise();
        if (handler != null) {
            promise.future().onComplete(handler);
        }
        doEmit(new Op<>(t, promise));
    }

    /* renamed from: setWriteQueueMaxSize, reason: merged with bridge method [inline-methods] */
    public synchronized FakeStream<T> m106setWriteQueueMaxSize(int i) {
        this.highWaterMark = i;
        return this;
    }

    public synchronized boolean writeQueueFull() {
        return ((long) this.pending.size()) > this.highWaterMark;
    }

    public synchronized FakeStream<T> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    public synchronized Handler<Void> drainHandler() {
        return this.drainHandler;
    }

    public synchronized FakeStream<T> setEnd(Future<Void> future) {
        if (this.ended) {
            throw new IllegalStateException();
        }
        this.end = future;
        return this;
    }

    /* renamed from: endHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m98endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m103exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m104exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    /* renamed from: drainHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ WriteStream m105drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ WriteStream m107exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
