package org.asynchttpclient.netty.reactivestreams;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.channel.NameResolution;
import org.asynchttpclient.handler.AsyncHandlerExtensions;
import org.asynchttpclient.netty.handler.StreamedResponsePublisher;
import org.asynchttpclient.reactivestreams.ReactiveStreamsTest;
import org.asynchttpclient.test.TestUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/asynchttpclient/netty/reactivestreams/NettyReactiveStreamsTest.class */
public class NettyReactiveStreamsTest extends ReactiveStreamsTest {

    /* loaded from: input_file:org/asynchttpclient/netty/reactivestreams/NettyReactiveStreamsTest$BlockedStreamSubscriber.class */
    private static class BlockedStreamSubscriber extends ReactiveStreamsTest.SimpleSubscriber<HttpResponseBodyPart> {
        private static final Logger LOGGER = LoggerFactory.getLogger(BlockedStreamSubscriber.class);
        private final CountDownLatch streamStarted;
        private final CountDownLatch streamOnHold;

        public BlockedStreamSubscriber(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.streamStarted = countDownLatch;
            this.streamOnHold = countDownLatch2;
        }

        @Override // org.asynchttpclient.reactivestreams.ReactiveStreamsTest.SimpleSubscriber
        public void onNext(HttpResponseBodyPart httpResponseBodyPart) {
            this.streamStarted.countDown();
            try {
                this.streamOnHold.await();
            } catch (InterruptedException e) {
                LOGGER.error("`streamOnHold` latch was interrupted", e);
            }
            super.onNext((BlockedStreamSubscriber) httpResponseBodyPart);
        }
    }

    /* loaded from: input_file:org/asynchttpclient/netty/reactivestreams/NettyReactiveStreamsTest$ReplayedSimpleAsyncHandler.class */
    private static class ReplayedSimpleAsyncHandler extends ReactiveStreamsTest.SimpleStreamedAsyncHandler implements AsyncHandlerExtensions {
        private final CountDownLatch replaying;

        public ReplayedSimpleAsyncHandler(CountDownLatch countDownLatch, ReactiveStreamsTest.SimpleSubscriber<HttpResponseBodyPart> simpleSubscriber) {
            super(simpleSubscriber);
            this.replaying = countDownLatch;
        }

        public void onConnectionOpen() {
        }

        public void onConnectionSuccess(Object obj, InetAddress inetAddress) {
        }

        public void onConnectionFailure(InetAddress inetAddress) {
        }

        public void onConnectionPool() {
        }

        public void onConnectionPooled(Object obj) {
        }

        public void onConnectionOffer(Object obj) {
        }

        public void onRequestSend(Object obj) {
        }

        public void onRetry() {
            this.replaying.countDown();
        }

        public void onDnsResolved(NameResolution[] nameResolutionArr) {
        }

        public void onSslHandshakeCompleted() {
        }
    }

    @Test(groups = {"standalone", "default_provider"}, enabled = true)
    public void testRetryingOnFailingStream() throws Exception {
        DefaultAsyncHttpClient defaultAsyncHttpClient = new DefaultAsyncHttpClient();
        Throwable th = null;
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            CountDownLatch countDownLatch3 = new CountDownLatch(1);
            final AtomicReference atomicReference = new AtomicReference(null);
            defaultAsyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new ReplayedSimpleAsyncHandler(countDownLatch3, new BlockedStreamSubscriber(countDownLatch, countDownLatch2)) { // from class: org.asynchttpclient.netty.reactivestreams.NettyReactiveStreamsTest.1
                @Override // org.asynchttpclient.reactivestreams.ReactiveStreamsTest.SimpleStreamedAsyncHandler
                public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
                    if (publisher instanceof StreamedResponsePublisher) {
                        return !atomicReference.compareAndSet(null, (StreamedResponsePublisher) publisher) ? AsyncHandler.State.ABORT : super.onStream(publisher);
                    }
                    throw new IllegalStateException(String.format("publisher %s is expected to be an instance of %s", publisher, StreamedResponsePublisher.class));
                }
            });
            countDownLatch.await();
            Assert.assertTrue(atomicReference.get() != null, "Expected a not null publisher.");
            StreamedResponsePublisher streamedResponsePublisher = (StreamedResponsePublisher) atomicReference.get();
            final CountDownLatch countDownLatch4 = new CountDownLatch(1);
            getChannel(streamedResponsePublisher).close().addListener(new ChannelFutureListener() { // from class: org.asynchttpclient.netty.reactivestreams.NettyReactiveStreamsTest.2
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    countDownLatch4.countDown();
                }
            });
            countDownLatch2.countDown();
            countDownLatch4.await();
            countDownLatch3.await();
            Assert.assertTrue(true);
            if (defaultAsyncHttpClient != null) {
                if (0 == 0) {
                    defaultAsyncHttpClient.close();
                    return;
                }
                try {
                    defaultAsyncHttpClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (defaultAsyncHttpClient != null) {
                if (0 != 0) {
                    try {
                        defaultAsyncHttpClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    defaultAsyncHttpClient.close();
                }
            }
            throw th3;
        }
    }

    private Channel getChannel(StreamedResponsePublisher streamedResponsePublisher) throws Exception {
        Field declaredField = streamedResponsePublisher.getClass().getDeclaredField("channel");
        declaredField.setAccessible(true);
        return (Channel) declaredField.get(streamedResponsePublisher);
    }
}
