package org.asynchttpclient.reactivestreams;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.reactivex.Flowable;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.catalina.Context;
import org.apache.catalina.startup.Tomcat;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.Response;
import org.asynchttpclient.handler.StreamedAsyncHandler;
import org.asynchttpclient.test.TestUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest.class */
public class ReactiveStreamsTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsTest.class);
    private Tomcat tomcat;
    private int port1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$ByteBufIterable.class */
    public static class ByteBufIterable implements Iterable<ByteBuf> {
        private final byte[] payload;
        private final int chunkSize;

        public ByteBufIterable(byte[] bArr, int i) {
            this.payload = bArr;
            this.chunkSize = i;
        }

        @Override // java.lang.Iterable
        public Iterator<ByteBuf> iterator() {
            return new Iterator<ByteBuf>() { // from class: org.asynchttpclient.reactivestreams.ReactiveStreamsTest.ByteBufIterable.1
                private int currentIndex = 0;

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.currentIndex != ByteBufIterable.this.payload.length;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public ByteBuf next() {
                    int i = this.currentIndex;
                    int min = Math.min(ByteBufIterable.this.chunkSize, ByteBufIterable.this.payload.length - i);
                    this.currentIndex += min;
                    return Unpooled.wrappedBuffer(ByteBufIterable.this.payload, i, min);
                }

                @Override // java.util.Iterator
                public void remove() {
                    throw new UnsupportedOperationException("ByteBufferIterable's iterator does not support remove.");
                }
            };
        }
    }

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$CancellingStreamedAsyncProvider.class */
    static class CancellingStreamedAsyncProvider implements StreamedAsyncHandler<CancellingStreamedAsyncProvider> {
        private final int cancelAfter;

        public CancellingStreamedAsyncProvider(int i) {
            this.cancelAfter = i;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            publisher.subscribe(new CancellingSubscriber(this.cancelAfter));
            return AsyncHandler.State.CONTINUE;
        }

        public void onThrowable(Throwable th) {
            throw new AssertionError(th);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
            throw new AssertionError("Should not have received body part");
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpHeaders httpHeaders) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        /* renamed from: onCompleted, reason: merged with bridge method [inline-methods] */
        public CancellingStreamedAsyncProvider m31onCompleted() throws Exception {
            return this;
        }
    }

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$CancellingSubscriber.class */
    static class CancellingSubscriber<T> implements Subscriber<T> {
        private final int cancelAfter;
        private volatile Subscription subscription;
        private volatile int count;

        public CancellingSubscriber(int i) {
            this.cancelAfter = i;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            if (this.cancelAfter == 0) {
                subscription.cancel();
            } else {
                subscription.request(1L);
            }
        }

        public void onNext(T t) {
            this.count++;
            if (this.count == this.cancelAfter) {
                this.subscription.cancel();
            } else {
                this.subscription.request(1L);
            }
        }

        public void onError(Throwable th) {
        }

        public void onComplete() {
        }
    }

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$FailedStream.class */
    private class FailedStream extends RuntimeException {
        private FailedStream() {
        }
    }

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$SimpleStreamedAsyncHandler.class */
    static class SimpleStreamedAsyncHandler implements StreamedAsyncHandler<Void> {
        private final Subscriber<HttpResponseBodyPart> subscriber;

        public SimpleStreamedAsyncHandler(Subscriber<HttpResponseBodyPart> subscriber) {
            this.subscriber = subscriber;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            publisher.subscribe(this.subscriber);
            return AsyncHandler.State.CONTINUE;
        }

        public void onThrowable(Throwable th) {
            throw new AssertionError(th);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
            throw new AssertionError("Should not have received body part");
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpHeaders httpHeaders) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        /* renamed from: onCompleted, reason: merged with bridge method [inline-methods] */
        public Void m32onCompleted() throws Exception {
            return null;
        }
    }

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$SimpleSubscriber.class */
    static class SimpleSubscriber<T> implements Subscriber<T> {
        private volatile Subscription subscription;
        private volatile Throwable error;
        private final List<T> elements = Collections.synchronizedList(new ArrayList());
        private final CountDownLatch latch = new CountDownLatch(1);

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        public void onNext(T t) {
            this.elements.add(t);
            this.subscription.request(1L);
        }

        public void onError(Throwable th) {
            this.error = th;
            this.latch.countDown();
        }

        public void onComplete() {
            this.latch.countDown();
        }

        public List<T> getElements() throws Throwable {
            this.latch.await();
            if (this.error != null) {
                throw this.error;
            }
            return this.elements;
        }
    }

    public static Publisher<ByteBuf> createPublisher(byte[] bArr, int i) {
        return Flowable.fromIterable(new ByteBufIterable(bArr, i));
    }

    @BeforeClass(alwaysRun = true)
    public void setUpGlobal() throws Exception {
        String str = new File(".").getAbsolutePath() + "/target";
        this.tomcat = new Tomcat();
        this.tomcat.setHostname("localhost");
        this.tomcat.setPort(0);
        this.tomcat.setBaseDir(str);
        Context addContext = this.tomcat.addContext("", str);
        Tomcat.addServlet(addContext, "webdav", new HttpServlet() { // from class: org.asynchttpclient.reactivestreams.ReactiveStreamsTest.1
            public void service(HttpServletRequest httpServletRequest, final HttpServletResponse httpServletResponse) throws ServletException, IOException {
                long j;
                ReactiveStreamsTest.LOGGER.debug("Echo received request {} on path {}", httpServletRequest, httpServletRequest.getServletContext().getContextPath());
                if (httpServletRequest.getHeader("X-HEAD") != null) {
                    httpServletResponse.setContentLength(1);
                }
                if (httpServletRequest.getHeader("X-ISO") != null) {
                    httpServletResponse.setContentType(TestUtils.TEXT_HTML_CONTENT_TYPE_WITH_ISO_8859_1_CHARSET);
                } else {
                    httpServletResponse.setContentType(TestUtils.TEXT_HTML_CONTENT_TYPE_WITH_UTF_8_CHARSET);
                }
                if (httpServletRequest.getMethod().equalsIgnoreCase("OPTIONS")) {
                    httpServletResponse.addHeader("Allow", "GET,HEAD,POST,OPTIONS,TRACE");
                }
                Enumeration headerNames = httpServletRequest.getHeaderNames();
                while (headerNames.hasMoreElements()) {
                    String str2 = (String) headerNames.nextElement();
                    if (str2.startsWith("LockThread")) {
                        int intHeader = httpServletRequest.getIntHeader(str2);
                        if (intHeader == -1) {
                            j = 40;
                        } else {
                            try {
                                j = intHeader * 1000;
                            } catch (InterruptedException e) {
                            }
                        }
                        Thread.sleep(j);
                    }
                    if (str2.startsWith("X-redirect")) {
                        httpServletResponse.sendRedirect(httpServletRequest.getHeader("X-redirect"));
                        return;
                    }
                    httpServletResponse.addHeader("X-" + str2, httpServletRequest.getHeader(str2));
                }
                String pathInfo = httpServletRequest.getPathInfo();
                if (pathInfo != null) {
                    httpServletResponse.addHeader("X-pathInfo", pathInfo);
                }
                String queryString = httpServletRequest.getQueryString();
                if (queryString != null) {
                    httpServletResponse.addHeader("X-queryString", queryString);
                }
                httpServletResponse.addHeader("X-KEEP-ALIVE", httpServletRequest.getRemoteAddr() + ":" + httpServletRequest.getRemotePort());
                Cookie[] cookies = httpServletRequest.getCookies();
                if (cookies != null) {
                    for (Cookie cookie : cookies) {
                        httpServletResponse.addCookie(cookie);
                    }
                }
                Enumeration parameterNames = httpServletRequest.getParameterNames();
                if (parameterNames.hasMoreElements()) {
                    StringBuilder sb = new StringBuilder();
                    while (parameterNames.hasMoreElements()) {
                        String str3 = (String) parameterNames.nextElement();
                        httpServletResponse.addHeader("X-" + str3, httpServletRequest.getParameter(str3));
                        sb.append(str3);
                        sb.append("_");
                    }
                    if (sb.length() > 0) {
                        httpServletResponse.getOutputStream().write(sb.toString().getBytes());
                    }
                }
                final AsyncContext startAsync = httpServletRequest.startAsync();
                final ServletInputStream inputStream = httpServletRequest.getInputStream();
                final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                inputStream.setReadListener(new ReadListener() { // from class: org.asynchttpclient.reactivestreams.ReactiveStreamsTest.1.1
                    byte[] buffer = new byte[5120];

                    public void onError(Throwable th) {
                        th.printStackTrace();
                        httpServletResponse.setStatus(io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
                        startAsync.complete();
                    }

                    public void onDataAvailable() throws IOException {
                        int read;
                        while (inputStream.isReady() && (read = inputStream.read(this.buffer)) != -1) {
                            byteArrayOutputStream.write(this.buffer, 0, read);
                        }
                    }

                    public void onAllDataRead() throws IOException {
                        byte[] byteArray = byteArrayOutputStream.toByteArray();
                        int length = byteArray.length;
                        httpServletResponse.addIntHeader("X-" + HttpHeaderNames.CONTENT_LENGTH, length);
                        httpServletResponse.addHeader(HttpHeaderNames.CONTENT_MD5.toString(), TestUtils.md5(byteArray, 0, length));
                        httpServletResponse.getOutputStream().write(byteArray, 0, length);
                        startAsync.complete();
                    }
                });
            }
        }).setAsyncSupported(true);
        addContext.addServletMappingDecoded("/*", "webdav");
        this.tomcat.start();
        this.port1 = this.tomcat.getConnector().getLocalPort();
    }

    @AfterClass(alwaysRun = true)
    public void tearDownGlobal() throws InterruptedException, Exception {
        this.tomcat.stop();
    }

    private String getTargetUrl() {
        return String.format("http://localhost:%d/foo/test", Integer.valueOf(this.port1));
    }

    @Test(groups = {"standalone"})
    public void testStreamingPutImage() throws Exception {
        AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(600000));
        Throwable th = null;
        try {
            Response response = (Response) asyncHttpClient.preparePut(getTargetUrl()).setBody(createPublisher(TestUtils.LARGE_IMAGE_BYTES, 2342)).execute().get();
            Assert.assertEquals(response.getStatusCode(), 200);
            Assert.assertEquals(response.getResponseBodyAsBytes(), TestUtils.LARGE_IMAGE_BYTES);
            if (asyncHttpClient != null) {
                if (0 == 0) {
                    asyncHttpClient.close();
                    return;
                }
                try {
                    asyncHttpClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (asyncHttpClient != null) {
                if (0 != 0) {
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    asyncHttpClient.close();
                }
            }
            throw th3;
        }
    }

    @Test(groups = {"standalone"})
    public void testConnectionDoesNotGetClosed() throws Exception {
        AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(600000));
        Throwable th = null;
        try {
            BoundRequestBuilder header = asyncHttpClient.preparePut(getTargetUrl()).setBody(createPublisher(TestUtils.LARGE_IMAGE_BYTES, 1000)).setHeader("X-" + HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(TestUtils.LARGE_IMAGE_BYTES.length)).setHeader("X-" + HttpHeaderNames.CONTENT_MD5, TestUtils.LARGE_IMAGE_BYTES_MD5);
            Response response = (Response) header.execute().get();
            Assert.assertEquals(response.getStatusCode(), 200, "HTTP response was invalid on first request.");
            response.getResponseBodyAsBytes();
            byte[] responseBodyAsBytes = response.getResponseBodyAsBytes();
            Assert.assertEquals(Integer.valueOf(response.getHeader("X-" + HttpHeaderNames.CONTENT_LENGTH)).intValue(), TestUtils.LARGE_IMAGE_BYTES.length, "Server side payload length invalid");
            Assert.assertEquals(responseBodyAsBytes.length, TestUtils.LARGE_IMAGE_BYTES.length, "Client side payload length invalid");
            Assert.assertEquals(response.getHeader(HttpHeaderNames.CONTENT_MD5), TestUtils.LARGE_IMAGE_BYTES_MD5, "Server side payload MD5 invalid");
            Assert.assertEquals(TestUtils.md5(responseBodyAsBytes), TestUtils.LARGE_IMAGE_BYTES_MD5, "Client side payload MD5 invalid");
            Assert.assertEquals(responseBodyAsBytes, TestUtils.LARGE_IMAGE_BYTES, "Image bytes are not equal on first attempt");
            Response response2 = (Response) header.execute().get();
            Assert.assertEquals(response2.getStatusCode(), 200);
            byte[] responseBodyAsBytes2 = response2.getResponseBodyAsBytes();
            Assert.assertEquals(Integer.valueOf(response2.getHeader("X-" + HttpHeaderNames.CONTENT_LENGTH)).intValue(), TestUtils.LARGE_IMAGE_BYTES.length, "Server side payload length invalid");
            Assert.assertEquals(responseBodyAsBytes2.length, TestUtils.LARGE_IMAGE_BYTES.length, "Client side payload length invalid");
            try {
                Assert.assertEquals(response2.getHeader(HttpHeaderNames.CONTENT_MD5), TestUtils.LARGE_IMAGE_BYTES_MD5, "Server side payload MD5 invalid");
                Assert.assertEquals(TestUtils.md5(responseBodyAsBytes2), TestUtils.LARGE_IMAGE_BYTES_MD5, "Client side payload MD5 invalid");
                Assert.assertEquals(responseBodyAsBytes2, TestUtils.LARGE_IMAGE_BYTES, "Image bytes weren't equal on subsequent test");
                if (asyncHttpClient != null) {
                    if (0 == 0) {
                        asyncHttpClient.close();
                        return;
                    }
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (AssertionError e) {
                e.printStackTrace();
                for (int i = 0; i < TestUtils.LARGE_IMAGE_BYTES.length; i++) {
                    Assert.assertEquals(responseBodyAsBytes2[i], TestUtils.LARGE_IMAGE_BYTES[i], "Invalid response byte at position " + i);
                }
                throw e;
            }
        } catch (Throwable th3) {
            if (asyncHttpClient != null) {
                if (0 != 0) {
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    asyncHttpClient.close();
                }
            }
            throw th3;
        }
    }

    public static void main(String[] strArr) throws Exception {
        ReactiveStreamsTest reactiveStreamsTest = new ReactiveStreamsTest();
        reactiveStreamsTest.setUpGlobal();
        for (int i = 0; i < 1000; i++) {
            try {
                reactiveStreamsTest.testConnectionDoesNotGetClosed();
            } finally {
                reactiveStreamsTest.tearDownGlobal();
            }
        }
    }

    @Test(groups = {"standalone"}, expectedExceptions = {ExecutionException.class})
    public void testFailingStream() throws Exception {
        AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(600000));
        Throwable th = null;
        try {
            asyncHttpClient.preparePut(getTargetUrl()).setBody(Flowable.error(new FailedStream())).execute().get();
            if (asyncHttpClient != null) {
                if (0 == 0) {
                    asyncHttpClient.close();
                    return;
                }
                try {
                    asyncHttpClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (asyncHttpClient != null) {
                if (0 != 0) {
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    asyncHttpClient.close();
                }
            }
            throw th3;
        }
    }

    @Test(groups = {"standalone"})
    public void streamedResponseTest() throws Throwable {
        AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient();
        Throwable th = null;
        try {
            try {
                SimpleSubscriber simpleSubscriber = new SimpleSubscriber();
                asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new SimpleStreamedAsyncHandler(simpleSubscriber)).get();
                Assert.assertEquals(getBytes(simpleSubscriber.getElements()), TestUtils.LARGE_IMAGE_BYTES);
                SimpleSubscriber simpleSubscriber2 = new SimpleSubscriber();
                asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new SimpleStreamedAsyncHandler(simpleSubscriber2)).get();
                Assert.assertEquals(getBytes(simpleSubscriber2.getElements()), TestUtils.LARGE_IMAGE_BYTES);
                Assert.assertEquals(((Response) asyncHttpClient.preparePost(getTargetUrl()).setBody("Hello").execute().get()).getResponseBody(), "Hello");
                if (asyncHttpClient != null) {
                    if (0 == 0) {
                        asyncHttpClient.close();
                        return;
                    }
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (asyncHttpClient != null) {
                if (th != null) {
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    asyncHttpClient.close();
                }
            }
            throw th4;
        }
    }

    @Test(groups = {"standalone"})
    public void cancelStreamedResponseTest() throws Throwable {
        AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient();
        Throwable th = null;
        try {
            asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new CancellingStreamedAsyncProvider(0)).get();
            asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new CancellingStreamedAsyncProvider(1)).get();
            asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new CancellingStreamedAsyncProvider(10)).get();
            Assert.assertEquals(((Response) asyncHttpClient.preparePost(getTargetUrl()).setBody("Hello").execute().get()).getResponseBody(), "Hello");
            if (asyncHttpClient != null) {
                if (0 == 0) {
                    asyncHttpClient.close();
                    return;
                }
                try {
                    asyncHttpClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (asyncHttpClient != null) {
                if (0 != 0) {
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    asyncHttpClient.close();
                }
            }
            throw th3;
        }
    }

    static byte[] getBytes(List<HttpResponseBodyPart> list) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Iterator<HttpResponseBodyPart> it = list.iterator();
        while (it.hasNext()) {
            byteArrayOutputStream.write(it.next().getBodyPartBytes());
        }
        return byteArrayOutputStream.toByteArray();
    }
}
