package org.asynchttpclient.reactivestreams;

import io.netty.handler.codec.http.HttpHeaders;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.exception.RemotelyClosedException;
import org.asynchttpclient.handler.StreamedAsyncHandler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

/* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsErrorTest.class */
public class ReactiveStreamsErrorTest extends AbstractBasicTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsErrorTest.class);
    private static final byte[] BODY_CHUNK = "someBytes".getBytes();
    private AsyncHttpClient client;
    private ServletResponseHandler servletResponseHandler;

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsErrorTest$ManualRequestSubscriber.class */
    private static class ManualRequestSubscriber implements Subscriber<HttpResponseBodyPart> {
        private final List<HttpResponseBodyPart> elements;
        private final CountDownLatch latch;
        private volatile Throwable error;

        private ManualRequestSubscriber() {
            this.elements = Collections.synchronizedList(new ArrayList());
            this.latch = new CountDownLatch(1);
        }

        public void onSubscribe(Subscription subscription) {
            ReactiveStreamsErrorTest.LOGGER.debug("SimpleSubscriber onSubscribe");
        }

        public void onNext(HttpResponseBodyPart httpResponseBodyPart) {
            ReactiveStreamsErrorTest.LOGGER.debug("SimpleSubscriber onNext");
            this.elements.add(httpResponseBodyPart);
        }

        public void onError(Throwable th) {
            ReactiveStreamsErrorTest.LOGGER.debug("SimpleSubscriber onError");
            this.error = th;
            this.latch.countDown();
        }

        public void onComplete() {
            ReactiveStreamsErrorTest.LOGGER.debug("SimpleSubscriber onComplete");
            this.latch.countDown();
        }

        void await() throws InterruptedException {
            if (this.latch.await(3500L, TimeUnit.MILLISECONDS)) {
                return;
            }
            Assert.fail("Request should have finished");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsErrorTest$ServletResponseHandler.class */
    public interface ServletResponseHandler {
        void handle(HttpServletResponse httpServletResponse) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsErrorTest$SimpleStreamer.class */
    public static class SimpleStreamer implements StreamedAsyncHandler<Void> {
        final Consumer<Publisher<HttpResponseBodyPart>> bodyStreamHandler;

        private SimpleStreamer(Consumer<Publisher<HttpResponseBodyPart>> consumer) {
            this.bodyStreamHandler = consumer;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            ReactiveStreamsErrorTest.LOGGER.debug("Got stream");
            this.bodyStreamHandler.accept(publisher);
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus httpResponseStatus) {
            ReactiveStreamsErrorTest.LOGGER.debug("Got status line");
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpHeaders httpHeaders) {
            ReactiveStreamsErrorTest.LOGGER.debug("Got headers");
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) {
            throw new IllegalStateException();
        }

        public void onThrowable(Throwable th) {
            ReactiveStreamsErrorTest.LOGGER.debug("Caught error", th);
        }

        /* renamed from: onCompleted, reason: merged with bridge method [inline-methods] */
        public Void m30onCompleted() {
            ReactiveStreamsErrorTest.LOGGER.debug("Completed request");
            return null;
        }
    }

    @BeforeTest
    public void initClient() {
        this.client = Dsl.asyncHttpClient(Dsl.config().setMaxRequestRetry(0).setRequestTimeout(3000).setReadTimeout(1000));
    }

    @AfterTest
    public void closeClient() throws Throwable {
        this.client.close();
    }

    @Override // org.asynchttpclient.AbstractBasicTest
    /* renamed from: configureHandler */
    public AbstractHandler mo45configureHandler() throws Exception {
        return new AbstractHandler() { // from class: org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.1
            public void handle(String str, Request request, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {
                try {
                    ReactiveStreamsErrorTest.this.servletResponseHandler.handle(httpServletResponse);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    @Test
    public void timeoutWithNoStatusLineSent() throws Throwable {
        try {
            execute(httpServletResponse -> {
                Thread.sleep(5000L);
            }, publisher -> {
            });
            Assert.fail("Request should have timed out");
        } catch (ExecutionException e) {
            expectReadTimeout(e.getCause());
        }
    }

    @Test
    public void neverSubscribingToResponseBodyHitsRequestTimeout() throws Throwable {
        try {
            execute(httpServletResponse -> {
                httpServletResponse.getOutputStream().write(BODY_CHUNK);
                httpServletResponse.getOutputStream().flush();
                Thread.sleep(500L);
                httpServletResponse.getOutputStream().write(BODY_CHUNK);
                httpServletResponse.getOutputStream().flush();
                httpServletResponse.getOutputStream().close();
            }, publisher -> {
            });
            Assert.fail("Request should have timed out");
        } catch (ExecutionException e) {
            expectRequestTimeout(e.getCause());
        }
    }

    @Test
    public void readTimeoutInMiddleOfBody() throws Throwable {
        try {
            execute(httpServletResponse -> {
                httpServletResponse.getOutputStream().write(BODY_CHUNK);
                httpServletResponse.getOutputStream().flush();
                Thread.sleep(500L);
                httpServletResponse.getOutputStream().write(BODY_CHUNK);
                httpServletResponse.getOutputStream().flush();
                Thread.sleep(5000L);
                httpServletResponse.getOutputStream().write(BODY_CHUNK);
                httpServletResponse.getOutputStream().flush();
                httpServletResponse.getOutputStream().close();
            }, publisher -> {
                publisher.subscribe(new ManualRequestSubscriber() { // from class: org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.2
                    @Override // org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.ManualRequestSubscriber
                    public void onSubscribe(Subscription subscription) {
                        subscription.request(Long.MAX_VALUE);
                    }
                });
            });
            Assert.fail("Request should have timed out");
        } catch (ExecutionException e) {
            expectReadTimeout(e.getCause());
        }
    }

    @Test
    public void notRequestingForLongerThanReadTimeoutDoesNotCauseTimeout() throws Throwable {
        ServletResponseHandler servletResponseHandler = httpServletResponse -> {
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            Thread.sleep(100L);
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            httpServletResponse.getOutputStream().close();
        };
        ManualRequestSubscriber manualRequestSubscriber = new ManualRequestSubscriber() { // from class: org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.3
            @Override // org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.ManualRequestSubscriber
            public void onSubscribe(Subscription subscription) {
                super.onSubscribe(subscription);
                new Thread(() -> {
                    try {
                        subscription.request(1L);
                        Thread.sleep(1500L);
                        subscription.request(Long.MAX_VALUE);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }).start();
            }
        };
        execute(servletResponseHandler, publisher -> {
            publisher.subscribe(manualRequestSubscriber);
        });
        manualRequestSubscriber.await();
        Assert.assertEquals(manualRequestSubscriber.elements.size(), 2);
    }

    @Test
    public void readTimeoutCancelsBodyStream() throws Throwable {
        ServletResponseHandler servletResponseHandler = httpServletResponse -> {
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            Thread.sleep(2000L);
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            httpServletResponse.getOutputStream().close();
        };
        ManualRequestSubscriber manualRequestSubscriber = new ManualRequestSubscriber() { // from class: org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.4
            @Override // org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.ManualRequestSubscriber
            public void onSubscribe(Subscription subscription) {
                super.onSubscribe(subscription);
                subscription.request(Long.MAX_VALUE);
            }
        };
        try {
            execute(servletResponseHandler, publisher -> {
                publisher.subscribe(manualRequestSubscriber);
            });
            Assert.fail("Request should have timed out");
        } catch (ExecutionException e) {
            expectReadTimeout(e.getCause());
        }
        manualRequestSubscriber.await();
        Assert.assertEquals(manualRequestSubscriber.elements.size(), 1);
    }

    @Test
    public void requestTimeoutCancelsBodyStream() throws Throwable {
        ServletResponseHandler servletResponseHandler = httpServletResponse -> {
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            Thread.sleep(900L);
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            Thread.sleep(900L);
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            Thread.sleep(900L);
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            Thread.sleep(900L);
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            httpServletResponse.getOutputStream().close();
        };
        ManualRequestSubscriber manualRequestSubscriber = new ManualRequestSubscriber() { // from class: org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.5
            @Override // org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.ManualRequestSubscriber
            public void onSubscribe(Subscription subscription) {
                super.onSubscribe(subscription);
                subscription.request(Long.MAX_VALUE);
            }
        };
        try {
            execute(servletResponseHandler, publisher -> {
                publisher.subscribe(manualRequestSubscriber);
            });
            Assert.fail("Request should have timed out");
        } catch (ExecutionException e) {
            expectRequestTimeout(e.getCause());
        }
        manualRequestSubscriber.await();
        expectRequestTimeout(manualRequestSubscriber.error);
        Assert.assertEquals(manualRequestSubscriber.elements.size(), 4);
    }

    @Test
    public void ioErrorsArePropagatedToSubscriber() throws Throwable {
        ServletResponseHandler servletResponseHandler = httpServletResponse -> {
            httpServletResponse.setContentLength(100);
            httpServletResponse.getOutputStream().write(BODY_CHUNK);
            httpServletResponse.getOutputStream().flush();
            httpServletResponse.getOutputStream().close();
        };
        ManualRequestSubscriber manualRequestSubscriber = new ManualRequestSubscriber() { // from class: org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.6
            @Override // org.asynchttpclient.reactivestreams.ReactiveStreamsErrorTest.ManualRequestSubscriber
            public void onSubscribe(Subscription subscription) {
                super.onSubscribe(subscription);
                subscription.request(Long.MAX_VALUE);
            }
        };
        Throwable th = null;
        try {
            execute(servletResponseHandler, publisher -> {
                publisher.subscribe(manualRequestSubscriber);
            });
            Assert.fail("Request should have failed");
        } catch (ExecutionException e) {
            th = e.getCause();
            Assert.assertTrue(th instanceof RemotelyClosedException, "Unexpected error: " + e);
        }
        manualRequestSubscriber.await();
        Assert.assertEquals(manualRequestSubscriber.error, th);
        Assert.assertEquals(manualRequestSubscriber.elements.size(), 1);
    }

    private void expectReadTimeout(Throwable th) {
        Assert.assertTrue(th instanceof TimeoutException, "Expected a read timeout, but got " + th);
        Assert.assertTrue(th.getMessage().contains("Read timeout"), "Expected read timeout, but was " + th);
    }

    private void expectRequestTimeout(Throwable th) {
        Assert.assertTrue(th instanceof TimeoutException, "Expected a request timeout, but got " + th);
        Assert.assertTrue(th.getMessage().contains("Request timeout"), "Expected request timeout, but was " + th);
    }

    private void execute(ServletResponseHandler servletResponseHandler, Consumer<Publisher<HttpResponseBodyPart>> consumer) throws Exception {
        this.servletResponseHandler = servletResponseHandler;
        this.client.prepareGet(getTargetUrl()).execute(new SimpleStreamer(consumer)).get(3500L, TimeUnit.MILLISECONDS);
    }
}
