package io.vertx.ext.reactivestreams.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.reactivestreams.ReactiveWriteStream;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vertx/ext/reactivestreams/impl/ReactiveWriteStreamImpl.class */
public class ReactiveWriteStreamImpl<T> implements ReactiveWriteStream<T> {
    private Handler<Void> drainHandler;
    protected final Context ctx;
    private boolean closed;
    private Set<ReactiveWriteStreamImpl<T>.SubscriptionImpl> subscriptions = new HashSet();
    private final Queue<Item<T>> pending = new ArrayDeque();
    private int writeQueueMaxSize = 32;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vertx/ext/reactivestreams/impl/ReactiveWriteStreamImpl$Item.class */
    public static class Item<T> {
        final T value;
        final Handler<AsyncResult<Void>> handler;

        Item(T t, Handler<AsyncResult<Void>> handler) {
            this.value = t;
            this.handler = handler;
        }
    }

    /* loaded from: input_file:io/vertx/ext/reactivestreams/impl/ReactiveWriteStreamImpl$SubscriptionImpl.class */
    public class SubscriptionImpl implements Subscription {
        private final Subscriber<? super T> subscriber;
        private final AtomicLong tokens;

        private SubscriptionImpl(Subscriber<? super T> subscriber) {
            this.tokens = new AtomicLong(Long.MIN_VALUE);
            this.subscriber = subscriber;
        }

        public long tokens() {
            return -(Long.MIN_VALUE - this.tokens.get());
        }

        public void takeTokens(long j) {
            this.tokens.addAndGet(-j);
        }

        public void request(long j) {
            if (j <= 0) {
                ReactiveWriteStreamImpl.this.signalError(this.subscriber, new IllegalArgumentException("3.9 Subscriber cannot request less then 1 for the number of elements."));
            } else if (this.tokens.addAndGet(j) > 0) {
                ReactiveWriteStreamImpl.this.signalError(this.subscriber, new IllegalStateException("3.17 Subscriber has more then Long.MAX_VALUE (2^63-1) currently pending."));
            } else {
                ReactiveWriteStreamImpl.this.checkSend();
            }
        }

        public void cancel() {
            ReactiveWriteStreamImpl.this.subscriptions.remove(this);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.subscriber == ((SubscriptionImpl) obj).subscriber;
        }

        public int hashCode() {
            return this.subscriber.hashCode();
        }
    }

    public ReactiveWriteStreamImpl(Vertx vertx) {
        this.ctx = vertx.getOrCreateContext();
    }

    private void checkClosed() {
        if (this.closed) {
            throw new IllegalStateException("Closed");
        }
    }

    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        checkClosed();
        Objects.requireNonNull(subscriber);
        ReactiveWriteStreamImpl<T>.SubscriptionImpl subscriptionImpl = new SubscriptionImpl(subscriber);
        if (!this.subscriptions.add(subscriptionImpl)) {
            throw new IllegalStateException("1.10 Cannot subscribe multiple times with the same subscriber.");
        }
        this.ctx.runOnContext(r7 -> {
            try {
                subscriber.onSubscribe(subscriptionImpl);
            } catch (Throwable th) {
                signalError(subscriptionImpl.subscriber, th);
            }
        });
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    public synchronized ReactiveWriteStream<T> write(T t) {
        return write((ReactiveWriteStreamImpl<T>) t, (Handler<AsyncResult<Void>>) null);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    public ReactiveWriteStream<T> write(T t, Handler<AsyncResult<Void>> handler) {
        checkClosed();
        this.pending.add(new Item<>(t, handler));
        checkSend();
        return this;
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    /* renamed from: setWriteQueueMaxSize */
    public synchronized ReactiveWriteStream<T> mo7setWriteQueueMaxSize(int i) {
        checkClosed();
        if (this.writeQueueMaxSize < 1) {
            throw new IllegalArgumentException("writeQueueMaxSize must be >=1");
        }
        this.writeQueueMaxSize = i;
        return this;
    }

    public synchronized boolean writeQueueFull() {
        checkClosed();
        return this.pending.size() >= this.writeQueueMaxSize;
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    public synchronized ReactiveWriteStream<T> drainHandler(Handler<Void> handler) {
        checkClosed();
        this.drainHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    public synchronized ReactiveWriteStream<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    public void end() {
        close();
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        close();
        if (handler != null) {
            this.ctx.runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    public ReactiveWriteStream<T> close() {
        synchronized (this) {
            if (this.closed) {
                return this;
            }
            this.closed = true;
            complete();
            this.subscriptions.clear();
            Future failedFuture = Future.failedFuture(ConnectionBase.CLOSED_EXCEPTION);
            Iterator<Item<T>> it = this.pending.iterator();
            while (it.hasNext()) {
                Handler<AsyncResult<Void>> handler = it.next().handler;
                if (handler != null) {
                    this.ctx.runOnContext(r5 -> {
                        handler.handle(failedFuture);
                    });
                }
            }
            this.pending.clear();
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkSend() {
        if (this.subscriptions.isEmpty()) {
            return;
        }
        long min = Math.min(getAvailable(), this.pending.size());
        takeTokens(min);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= min) {
                break;
            }
            sendToSubscribers(this.pending.poll());
            j = j2 + 1;
        }
        if (this.drainHandler == null || this.pending.size() >= this.writeQueueMaxSize) {
            return;
        }
        callDrainHandler();
    }

    private void callDrainHandler() {
        Handler<Void> handler = this.drainHandler;
        this.ctx.runOnContext(r4 -> {
            handler.handle((Object) null);
        });
    }

    private long getAvailable() {
        long j = Long.MAX_VALUE;
        Iterator<ReactiveWriteStreamImpl<T>.SubscriptionImpl> it = this.subscriptions.iterator();
        while (it.hasNext()) {
            j = Math.min(it.next().tokens(), j);
        }
        return j;
    }

    private void takeTokens(long j) {
        Iterator<ReactiveWriteStreamImpl<T>.SubscriptionImpl> it = this.subscriptions.iterator();
        while (it.hasNext()) {
            it.next().takeTokens(j);
        }
    }

    private void complete() {
        for (ReactiveWriteStreamImpl<T>.SubscriptionImpl subscriptionImpl : this.subscriptions) {
            this.ctx.runOnContext(r3 -> {
                subscriptionImpl.subscriber.onComplete();
            });
        }
    }

    private void sendToSubscribers(Item<T> item) {
        Iterator<ReactiveWriteStreamImpl<T>.SubscriptionImpl> it = this.subscriptions.iterator();
        while (it.hasNext()) {
            onNext(this.ctx, ((SubscriptionImpl) it.next()).subscriber, item.value);
            if (item.handler != null) {
                item.handler.handle(Future.succeededFuture());
            }
        }
    }

    protected void onNext(Context context, Subscriber<? super T> subscriber, T t) {
        context.runOnContext(r7 -> {
            try {
                subscriber.onNext(t);
            } catch (Throwable th) {
                signalError(subscriber, th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void signalError(Subscriber<? super T> subscriber, Throwable th) {
        this.subscriptions.removeIf(subscriptionImpl -> {
            return subscriptionImpl.subscriber == subscriber;
        });
        subscriber.onError(th);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    /* renamed from: drainHandler */
    public /* bridge */ /* synthetic */ WriteStream mo6drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    /* renamed from: write */
    public /* bridge */ /* synthetic */ WriteStream mo8write(Object obj, Handler handler) {
        return write((ReactiveWriteStreamImpl<T>) obj, (Handler<AsyncResult<Void>>) handler);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    /* renamed from: write */
    public /* bridge */ /* synthetic */ WriteStream mo9write(Object obj) {
        return write((ReactiveWriteStreamImpl<T>) obj);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ WriteStream mo10exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveWriteStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo11exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
