package io.vertx.ext.reactivestreams.test;

import io.vertx.core.buffer.Buffer;
import io.vertx.ext.reactivestreams.ReactiveWriteStream;
import io.vertx.test.core.TestUtils;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vertx/ext/reactivestreams/test/ReactiveWriteStreamTest.class */
public class ReactiveWriteStreamTest extends ReactiveStreamTestBase {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vertx/ext/reactivestreams/test/ReactiveWriteStreamTest$MySubscriber.class */
    public class MySubscriber implements Subscriber<Buffer> {
        final List<Buffer> buffers = new CopyOnWriteArrayList();
        volatile Subscription subscription;

        MySubscriber() {
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
        }

        @Override // 
        public void onNext(Buffer buffer) {
            this.buffers.add(buffer);
        }

        public void onError(Throwable th) {
        }

        public void onComplete() {
        }
    }

    @Test
    public void testWriteNoTokensInitially() throws Exception {
        ReactiveWriteStream writeStream = ReactiveWriteStream.writeStream(this.vertx);
        MySubscriber mySubscriber = new MySubscriber();
        writeStream.subscribe(mySubscriber);
        waitUntil(() -> {
            return mySubscriber.subscription != null;
        });
        List<Buffer> createRandomBuffers = createRandomBuffers(4);
        Iterator<Buffer> it = createRandomBuffers.iterator();
        while (it.hasNext()) {
            writeStream.write(it.next());
        }
        assertTrue(mySubscriber.buffers.isEmpty());
        mySubscriber.subscription.request(1L);
        waitUntil(() -> {
            return mySubscriber.buffers.size() == 1;
        });
        assertEquals(1L, mySubscriber.buffers.size());
        assertSame(createRandomBuffers.get(0), mySubscriber.buffers.get(0));
        mySubscriber.subscription.request(2L);
        waitUntil(() -> {
            return mySubscriber.buffers.size() == 3;
        });
        assertEquals(3L, mySubscriber.buffers.size());
        assertSame(createRandomBuffers.get(1), mySubscriber.buffers.get(1));
        assertSame(createRandomBuffers.get(2), mySubscriber.buffers.get(2));
    }

    @Test
    public void testWriteInitialTokens() throws Exception {
        ReactiveWriteStream writeStream = ReactiveWriteStream.writeStream(this.vertx);
        MySubscriber mySubscriber = new MySubscriber();
        writeStream.subscribe(mySubscriber);
        waitUntil(() -> {
            return mySubscriber.subscription != null;
        });
        mySubscriber.subscription.request(3L);
        List<Buffer> createRandomBuffers = createRandomBuffers(4);
        Iterator<Buffer> it = createRandomBuffers.iterator();
        while (it.hasNext()) {
            writeStream.write(it.next());
        }
        waitUntil(() -> {
            return mySubscriber.buffers.size() == 3;
        });
        assertEquals(3L, mySubscriber.buffers.size());
        assertSame(createRandomBuffers.get(0), mySubscriber.buffers.get(0));
        assertSame(createRandomBuffers.get(1), mySubscriber.buffers.get(1));
        assertSame(createRandomBuffers.get(2), mySubscriber.buffers.get(2));
    }

    @Test
    public void testMultipleSubscribers() throws Exception {
        ReactiveWriteStream writeStream = ReactiveWriteStream.writeStream(this.vertx);
        MySubscriber mySubscriber = new MySubscriber();
        writeStream.subscribe(mySubscriber);
        MySubscriber mySubscriber2 = new MySubscriber();
        writeStream.subscribe(mySubscriber2);
        MySubscriber mySubscriber3 = new MySubscriber();
        writeStream.subscribe(mySubscriber3);
        waitUntil(() -> {
            return mySubscriber.subscription != null;
        });
        waitUntil(() -> {
            return mySubscriber2.subscription != null;
        });
        waitUntil(() -> {
            return mySubscriber3.subscription != null;
        });
        List<Buffer> createRandomBuffers = createRandomBuffers(10);
        Iterator<Buffer> it = createRandomBuffers.iterator();
        while (it.hasNext()) {
            writeStream.write(it.next());
        }
        assertEquals(0L, mySubscriber.buffers.size());
        assertEquals(0L, mySubscriber2.buffers.size());
        assertEquals(0L, mySubscriber3.buffers.size());
        mySubscriber.subscription.request(1L);
        assertEquals(0L, mySubscriber.buffers.size());
        assertEquals(0L, mySubscriber2.buffers.size());
        assertEquals(0L, mySubscriber3.buffers.size());
        mySubscriber2.subscription.request(1L);
        assertEquals(0L, mySubscriber.buffers.size());
        assertEquals(0L, mySubscriber2.buffers.size());
        assertEquals(0L, mySubscriber3.buffers.size());
        mySubscriber3.subscription.request(1L);
        waitUntil(() -> {
            return mySubscriber.buffers.size() == 1;
        });
        waitUntil(() -> {
            return mySubscriber2.buffers.size() == 1;
        });
        waitUntil(() -> {
            return mySubscriber3.buffers.size() == 1;
        });
        assertEquals(1L, mySubscriber.buffers.size());
        assertEquals(1L, mySubscriber2.buffers.size());
        assertEquals(1L, mySubscriber3.buffers.size());
        assertEquals(createRandomBuffers.get(0), mySubscriber.buffers.get(0));
        assertEquals(createRandomBuffers.get(0), mySubscriber2.buffers.get(0));
        assertEquals(createRandomBuffers.get(0), mySubscriber3.buffers.get(0));
        mySubscriber.subscription.request(4L);
        assertEquals(1L, mySubscriber.buffers.size());
        assertEquals(1L, mySubscriber2.buffers.size());
        assertEquals(1L, mySubscriber3.buffers.size());
        mySubscriber2.subscription.request(3L);
        assertEquals(1L, mySubscriber.buffers.size());
        assertEquals(1L, mySubscriber2.buffers.size());
        assertEquals(1L, mySubscriber3.buffers.size());
        mySubscriber3.subscription.request(2L);
        waitUntil(() -> {
            return mySubscriber.buffers.size() == 3;
        });
        waitUntil(() -> {
            return mySubscriber2.buffers.size() == 3;
        });
        waitUntil(() -> {
            return mySubscriber3.buffers.size() == 3;
        });
        assertEquals(3L, mySubscriber.buffers.size());
        assertEquals(3L, mySubscriber2.buffers.size());
        assertEquals(3L, mySubscriber3.buffers.size());
        assertEquals(createRandomBuffers.get(0), mySubscriber.buffers.get(0));
        assertEquals(createRandomBuffers.get(1), mySubscriber.buffers.get(1));
        assertEquals(createRandomBuffers.get(2), mySubscriber.buffers.get(2));
        assertEquals(createRandomBuffers.get(0), mySubscriber2.buffers.get(0));
        assertEquals(createRandomBuffers.get(1), mySubscriber2.buffers.get(1));
        assertEquals(createRandomBuffers.get(2), mySubscriber2.buffers.get(2));
        assertEquals(createRandomBuffers.get(0), mySubscriber3.buffers.get(0));
        assertEquals(createRandomBuffers.get(1), mySubscriber3.buffers.get(1));
        assertEquals(createRandomBuffers.get(2), mySubscriber3.buffers.get(2));
        mySubscriber2.subscription.request(1L);
        assertEquals(3L, mySubscriber.buffers.size());
        assertEquals(3L, mySubscriber2.buffers.size());
        assertEquals(3L, mySubscriber3.buffers.size());
        mySubscriber3.subscription.request(2L);
        waitUntil(() -> {
            return mySubscriber.buffers.size() == 5;
        });
        waitUntil(() -> {
            return mySubscriber2.buffers.size() == 5;
        });
        waitUntil(() -> {
            return mySubscriber3.buffers.size() == 5;
        });
        assertEquals(5L, mySubscriber.buffers.size());
        assertEquals(5L, mySubscriber2.buffers.size());
        assertEquals(5L, mySubscriber3.buffers.size());
        assertEquals(createRandomBuffers.get(0), mySubscriber.buffers.get(0));
        assertEquals(createRandomBuffers.get(1), mySubscriber.buffers.get(1));
        assertEquals(createRandomBuffers.get(2), mySubscriber.buffers.get(2));
        assertEquals(createRandomBuffers.get(3), mySubscriber.buffers.get(3));
        assertEquals(createRandomBuffers.get(4), mySubscriber.buffers.get(4));
        assertEquals(createRandomBuffers.get(0), mySubscriber2.buffers.get(0));
        assertEquals(createRandomBuffers.get(1), mySubscriber2.buffers.get(1));
        assertEquals(createRandomBuffers.get(2), mySubscriber2.buffers.get(2));
        assertEquals(createRandomBuffers.get(3), mySubscriber2.buffers.get(3));
        assertEquals(createRandomBuffers.get(4), mySubscriber2.buffers.get(4));
        assertEquals(createRandomBuffers.get(0), mySubscriber3.buffers.get(0));
        assertEquals(createRandomBuffers.get(1), mySubscriber3.buffers.get(1));
        assertEquals(createRandomBuffers.get(2), mySubscriber3.buffers.get(2));
        assertEquals(createRandomBuffers.get(3), mySubscriber3.buffers.get(3));
        assertEquals(createRandomBuffers.get(4), mySubscriber3.buffers.get(4));
    }

    @Test
    public void testWriteQueueFullAndDrainDefaultQueueSize() throws Exception {
        testWriteQueueFullAndDrain(ReactiveWriteStream.writeStream(this.vertx), 10);
    }

    private void testWriteQueueFullAndDrain(ReactiveWriteStream<Buffer> reactiveWriteStream, int i) throws Exception {
        reactiveWriteStream.setWriteQueueMaxSize(i);
        MySubscriber mySubscriber = new MySubscriber();
        reactiveWriteStream.subscribe(mySubscriber);
        for (int i2 = 0; i2 < i - 1; i2++) {
            reactiveWriteStream.write(TestUtils.randomBuffer(50));
        }
        assertFalse(reactiveWriteStream.writeQueueFull());
        reactiveWriteStream.write(TestUtils.randomBuffer(100));
        assertTrue(reactiveWriteStream.writeQueueFull());
        reactiveWriteStream.drainHandler(r5 -> {
            assertFalse(reactiveWriteStream.writeQueueFull());
            testComplete();
        });
        waitUntil(() -> {
            return mySubscriber.subscription != null;
        });
        mySubscriber.subscription.request(2L);
        await();
    }

    @Test
    public void testCancelSubscriptionOnError1() {
        ReactiveWriteStream writeStream = ReactiveWriteStream.writeStream(this.vertx);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        MySubscriber mySubscriber = new MySubscriber() { // from class: io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onSubscribe(Subscription subscription) {
                super.onSubscribe(subscription);
                throw new RuntimeException();
            }

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onNext(Buffer buffer) {
                ReactiveWriteStreamTest.this.fail();
            }

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onError(Throwable th) {
                atomicBoolean.set(true);
            }
        };
        writeStream.subscribe(mySubscriber);
        MySubscriber mySubscriber2 = new MySubscriber() { // from class: io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.2
            int count = 0;

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onSubscribe(Subscription subscription) {
                subscription.request(1L);
                super.onSubscribe(subscription);
            }

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onNext(Buffer buffer) {
                int i = this.count + 1;
                this.count = i;
                if (i == 1) {
                    ReactiveWriteStreamTest.this.testComplete();
                }
            }
        };
        writeStream.subscribe(mySubscriber2);
        waitUntil(() -> {
            return mySubscriber.subscription != null;
        });
        waitUntil(() -> {
            return mySubscriber2.subscription != null;
        });
        atomicBoolean.getClass();
        waitUntil(atomicBoolean::get);
        writeStream.write(createRandomBuffers(1).get(0));
        await();
    }

    @Test
    public void testCancelSubscriptionOnError2() {
        ReactiveWriteStream writeStream = ReactiveWriteStream.writeStream(this.vertx);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        MySubscriber mySubscriber = new MySubscriber() { // from class: io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onSubscribe(Subscription subscription) {
                subscription.request(2L);
                super.onSubscribe(subscription);
            }

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onNext(Buffer buffer) {
                if (!atomicBoolean.get()) {
                    throw new RuntimeException();
                }
                ReactiveWriteStreamTest.this.fail();
            }

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onError(Throwable th) {
                atomicBoolean.set(true);
            }
        };
        writeStream.subscribe(mySubscriber);
        MySubscriber mySubscriber2 = new MySubscriber() { // from class: io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.4
            int count = 0;

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onSubscribe(Subscription subscription) {
                subscription.request(3L);
                super.onSubscribe(subscription);
            }

            @Override // io.vertx.ext.reactivestreams.test.ReactiveWriteStreamTest.MySubscriber
            public void onNext(Buffer buffer) {
                int i = this.count + 1;
                this.count = i;
                if (i == 3) {
                    ReactiveWriteStreamTest.this.testComplete();
                }
            }
        };
        writeStream.subscribe(mySubscriber2);
        waitUntil(() -> {
            return mySubscriber.subscription != null;
        });
        waitUntil(() -> {
            return mySubscriber2.subscription != null;
        });
        writeStream.write(createRandomBuffers(1).get(0));
        atomicBoolean.getClass();
        waitUntil(atomicBoolean::get);
        writeStream.write(createRandomBuffers(1).get(0));
        writeStream.write(createRandomBuffers(1).get(0));
        await();
    }
}
