package io.prestosql.operator;

import com.google.common.collect.ImmutableListMultimap;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.testing.Assertions;
import io.airlift.testing.TestingTicker;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.prestosql.execution.buffer.PagesSerde;
import io.prestosql.execution.buffer.SerializedPage;
import io.prestosql.execution.buffer.TestingPagesSerdeFactory;
import io.prestosql.operator.HttpPageBufferClient;
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.Page;
import io.prestosql.spi.StandardErrorCode;
import io.prestosql.spi.block.Block;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:io/prestosql/operator/TestHttpPageBufferClient.class */
public class TestHttpPageBufferClient {
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();

    /* loaded from: input_file:io/prestosql/operator/TestHttpPageBufferClient$StaticRequestProcessor.class */
    private static class StaticRequestProcessor implements TestingHttpClient.Processor {
        private final AtomicReference<Response> response;
        private final CyclicBarrier beforeRequest;
        private final CyclicBarrier afterRequest;

        private StaticRequestProcessor(CyclicBarrier cyclicBarrier, CyclicBarrier cyclicBarrier2) {
            this.response = new AtomicReference<>();
            this.beforeRequest = cyclicBarrier;
            this.afterRequest = cyclicBarrier2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setResponse(Response response) {
            this.response.set(response);
        }

        public Response handle(Request request) throws Exception {
            this.beforeRequest.await(10L, TimeUnit.SECONDS);
            try {
                return this.response.get();
            } finally {
                this.afterRequest.await(10L, TimeUnit.SECONDS);
            }
        }
    }

    /* loaded from: input_file:io/prestosql/operator/TestHttpPageBufferClient$TestingClientCallback.class */
    private static class TestingClientCallback implements HttpPageBufferClient.ClientCallback {
        private final CyclicBarrier done;
        private final List<SerializedPage> pages = Collections.synchronizedList(new ArrayList());
        private final AtomicInteger completedRequests = new AtomicInteger();
        private final AtomicInteger finishedBuffers = new AtomicInteger();
        private final AtomicInteger failedBuffers = new AtomicInteger();
        private final AtomicReference<Throwable> failure = new AtomicReference<>();

        public TestingClientCallback(CyclicBarrier cyclicBarrier) {
            this.done = cyclicBarrier;
        }

        public List<Page> getPages() {
            Stream<SerializedPage> stream = this.pages.stream();
            PagesSerde pagesSerde = TestHttpPageBufferClient.PAGES_SERDE;
            pagesSerde.getClass();
            return (List) stream.map(pagesSerde::deserialize).collect(Collectors.toList());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getCompletedRequests() {
            return this.completedRequests.get();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getFinishedBuffers() {
            return this.finishedBuffers.get();
        }

        public int getFailedBuffers() {
            return this.failedBuffers.get();
        }

        public Throwable getFailure() {
            return this.failure.get();
        }

        public boolean addPages(HttpPageBufferClient httpPageBufferClient, List<SerializedPage> list) {
            this.pages.addAll(list);
            return true;
        }

        public void requestComplete(HttpPageBufferClient httpPageBufferClient) {
            this.completedRequests.getAndIncrement();
            awaitDone();
        }

        public void clientFinished(HttpPageBufferClient httpPageBufferClient) {
            this.finishedBuffers.getAndIncrement();
            awaitDone();
        }

        public void clientFailed(HttpPageBufferClient httpPageBufferClient, Throwable th) {
            this.failedBuffers.getAndIncrement();
            this.failure.compareAndSet(null, th);
        }

        public void resetStats() {
            this.pages.clear();
            this.completedRequests.set(0);
            this.finishedBuffers.set(0);
            this.failedBuffers.set(0);
            this.failure.set(null);
        }

        private void awaitDone() {
            try {
                this.done.await(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (BrokenBarrierException | TimeoutException e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    @BeforeClass
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed("test-%s"));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
            this.scheduler = null;
        }
        if (this.pageBufferClientCallbackExecutor != null) {
            this.pageBufferClientCallbackExecutor.shutdownNow();
            this.pageBufferClientCallbackExecutor = null;
        }
    }

    @Test
    public void testHappyPath() throws Exception {
        Page page = new Page(100, new Block[0]);
        DataSize dataSize = new DataSize(11.0d, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        TestingClientCallback testingClientCallback = new TestingClientCallback(cyclicBarrier);
        URI create = URI.create("http://localhost:8080");
        HttpPageBufferClient httpPageBufferClient = new HttpPageBufferClient(new TestingHttpClient(mockExchangeRequestProcessor, this.scheduler), dataSize, new Duration(1.0d, TimeUnit.MINUTES), true, create, testingClientCallback, this.scheduler, this.pageBufferClientCallbackExecutor);
        assertStatus(httpPageBufferClient, create, "queued", 0, 0, 0, 0, "not scheduled");
        mockExchangeRequestProcessor.addPage(create, page);
        testingClientCallback.resetStats();
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 1);
        assertPageEquals(page, testingClientCallback.getPages().get(0));
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 1);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 0);
        assertStatus(httpPageBufferClient, create, "queued", 1, 1, 1, 0, "not scheduled");
        testingClientCallback.resetStats();
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 0);
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 1);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 0);
        assertStatus(httpPageBufferClient, create, "queued", 1, 2, 2, 0, "not scheduled");
        mockExchangeRequestProcessor.addPage(create, page);
        mockExchangeRequestProcessor.addPage(create, page);
        testingClientCallback.resetStats();
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 2);
        assertPageEquals(page, testingClientCallback.getPages().get(0));
        assertPageEquals(page, testingClientCallback.getPages().get(1));
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 1);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 0);
        Assert.assertEquals(testingClientCallback.getFailedBuffers(), 0);
        testingClientCallback.resetStats();
        assertStatus(httpPageBufferClient, create, "queued", 3, 3, 3, 0, "not scheduled");
        testingClientCallback.resetStats();
        mockExchangeRequestProcessor.setComplete(create);
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 0);
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 1);
        testingClientCallback.resetStats();
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 1);
        Assert.assertEquals(testingClientCallback.getPages().size(), 0);
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 0);
        Assert.assertEquals(testingClientCallback.getFailedBuffers(), 0);
        assertStatus(httpPageBufferClient, create, "closed", 3, 5, 5, 0, "not scheduled");
    }

    @Test
    public void testLifecycle() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
        StaticRequestProcessor staticRequestProcessor = new StaticRequestProcessor(cyclicBarrier, cyclicBarrier2);
        staticRequestProcessor.setResponse(new TestingResponse(HttpStatus.NO_CONTENT, ImmutableListMultimap.of(), new byte[0]));
        CyclicBarrier cyclicBarrier3 = new CyclicBarrier(2);
        TestingClientCallback testingClientCallback = new TestingClientCallback(cyclicBarrier3);
        URI create = URI.create("http://localhost:8080");
        HttpPageBufferClient httpPageBufferClient = new HttpPageBufferClient(new TestingHttpClient(staticRequestProcessor, this.scheduler), new DataSize(10.0d, DataSize.Unit.MEGABYTE), new Duration(1.0d, TimeUnit.MINUTES), true, create, testingClientCallback, this.scheduler, this.pageBufferClientCallbackExecutor);
        assertStatus(httpPageBufferClient, create, "queued", 0, 0, 0, 0, "not scheduled");
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        assertStatus(httpPageBufferClient, create, "running", 0, 1, 0, 0, "PROCESSING_REQUEST");
        Assert.assertEquals(httpPageBufferClient.isRunning(), true);
        cyclicBarrier2.await(10L, TimeUnit.SECONDS);
        cyclicBarrier3.await(10L, TimeUnit.SECONDS);
        assertStatus(httpPageBufferClient, create, "queued", 0, 1, 1, 1, "not scheduled");
        httpPageBufferClient.close();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        assertStatus(httpPageBufferClient, create, "closed", 0, 1, 1, 1, "PROCESSING_REQUEST");
        cyclicBarrier2.await(10L, TimeUnit.SECONDS);
        cyclicBarrier3.await(10L, TimeUnit.SECONDS);
        assertStatus(httpPageBufferClient, create, "closed", 0, 1, 2, 1, "not scheduled");
    }

    @Test
    public void testInvalidResponses() throws Exception {
        StaticRequestProcessor staticRequestProcessor = new StaticRequestProcessor(new CyclicBarrier(1), new CyclicBarrier(1));
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        TestingClientCallback testingClientCallback = new TestingClientCallback(cyclicBarrier);
        URI create = URI.create("http://localhost:8080");
        HttpPageBufferClient httpPageBufferClient = new HttpPageBufferClient(new TestingHttpClient(staticRequestProcessor, this.scheduler), new DataSize(10.0d, DataSize.Unit.MEGABYTE), new Duration(1.0d, TimeUnit.MINUTES), true, create, testingClientCallback, this.scheduler, this.pageBufferClientCallbackExecutor);
        assertStatus(httpPageBufferClient, create, "queued", 0, 0, 0, 0, "not scheduled");
        staticRequestProcessor.setResponse(new TestingResponse(HttpStatus.NOT_FOUND, ImmutableListMultimap.of("Content-Type", "application/X-presto-pages"), new byte[0]));
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 0);
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 1);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 0);
        Assert.assertEquals(testingClientCallback.getFailedBuffers(), 1);
        Assertions.assertInstanceOf(testingClientCallback.getFailure(), PageTransportErrorException.class);
        Assertions.assertContains(testingClientCallback.getFailure().getMessage(), "Expected response code to be 200, but was 404");
        assertStatus(httpPageBufferClient, create, "queued", 0, 1, 1, 1, "not scheduled");
        testingClientCallback.resetStats();
        staticRequestProcessor.setResponse(new TestingResponse(HttpStatus.OK, ImmutableListMultimap.of("Content-Type", "INVALID_TYPE"), new byte[0]));
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 0);
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 1);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 0);
        Assert.assertEquals(testingClientCallback.getFailedBuffers(), 1);
        Assertions.assertInstanceOf(testingClientCallback.getFailure(), PageTransportErrorException.class);
        Assertions.assertContains(testingClientCallback.getFailure().getMessage(), "Expected application/x-presto-pages response from server but got INVALID_TYPE");
        assertStatus(httpPageBufferClient, create, "queued", 0, 2, 2, 2, "not scheduled");
        testingClientCallback.resetStats();
        staticRequestProcessor.setResponse(new TestingResponse(HttpStatus.OK, ImmutableListMultimap.of("Content-Type", "text/plain"), new byte[0]));
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 0);
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 1);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 0);
        Assert.assertEquals(testingClientCallback.getFailedBuffers(), 1);
        Assertions.assertInstanceOf(testingClientCallback.getFailure(), PageTransportErrorException.class);
        Assertions.assertContains(testingClientCallback.getFailure().getMessage(), "Expected application/x-presto-pages response from server but got text/plain");
        assertStatus(httpPageBufferClient, create, "queued", 0, 3, 3, 3, "not scheduled");
        httpPageBufferClient.close();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        assertStatus(httpPageBufferClient, create, "closed", 0, 3, 4, 3, "not scheduled");
    }

    @Test
    public void testCloseDuringPendingRequest() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
        StaticRequestProcessor staticRequestProcessor = new StaticRequestProcessor(cyclicBarrier, cyclicBarrier2);
        staticRequestProcessor.setResponse(new TestingResponse(HttpStatus.NO_CONTENT, ImmutableListMultimap.of(), new byte[0]));
        CyclicBarrier cyclicBarrier3 = new CyclicBarrier(2);
        TestingClientCallback testingClientCallback = new TestingClientCallback(cyclicBarrier3);
        URI create = URI.create("http://localhost:8080");
        HttpPageBufferClient httpPageBufferClient = new HttpPageBufferClient(new TestingHttpClient(staticRequestProcessor, this.scheduler), new DataSize(10.0d, DataSize.Unit.MEGABYTE), new Duration(1.0d, TimeUnit.MINUTES), true, create, testingClientCallback, this.scheduler, this.pageBufferClientCallbackExecutor);
        assertStatus(httpPageBufferClient, create, "queued", 0, 0, 0, 0, "not scheduled");
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        assertStatus(httpPageBufferClient, create, "running", 0, 1, 0, 0, "PROCESSING_REQUEST");
        Assert.assertEquals(httpPageBufferClient.isRunning(), true);
        httpPageBufferClient.close();
        try {
            cyclicBarrier3.await(10L, TimeUnit.SECONDS);
        } catch (BrokenBarrierException e) {
        }
        try {
            cyclicBarrier2.await(10L, TimeUnit.SECONDS);
        } catch (BrokenBarrierException e2) {
            cyclicBarrier2.reset();
        }
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        cyclicBarrier2.await(10L, TimeUnit.SECONDS);
        cyclicBarrier3.await(10L, TimeUnit.SECONDS);
        assertStatus(httpPageBufferClient, create, "closed", 0, 1, 2, 1, "not scheduled");
    }

    @Test
    public void testExceptionFromResponseHandler() throws Exception {
        TestingTicker testingTicker = new TestingTicker();
        AtomicReference atomicReference = new AtomicReference(new Duration(0.0d, TimeUnit.SECONDS));
        TestingHttpClient.Processor processor = request -> {
            testingTicker.increment(((Duration) atomicReference.get()).toMillis(), TimeUnit.MILLISECONDS);
            throw new RuntimeException("Foo");
        };
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        TestingClientCallback testingClientCallback = new TestingClientCallback(cyclicBarrier);
        URI create = URI.create("http://localhost:8080");
        HttpPageBufferClient httpPageBufferClient = new HttpPageBufferClient(new TestingHttpClient(processor, this.scheduler), new DataSize(10.0d, DataSize.Unit.MEGABYTE), new Duration(30.0d, TimeUnit.SECONDS), true, create, testingClientCallback, this.scheduler, testingTicker, this.pageBufferClientCallbackExecutor);
        assertStatus(httpPageBufferClient, create, "queued", 0, 0, 0, 0, "not scheduled");
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 0);
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 1);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 0);
        Assert.assertEquals(testingClientCallback.getFailedBuffers(), 0);
        assertStatus(httpPageBufferClient, create, "queued", 0, 1, 1, 1, "not scheduled");
        atomicReference.set(new Duration(30.0d, TimeUnit.SECONDS));
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 0);
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 2);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 0);
        Assert.assertEquals(testingClientCallback.getFailedBuffers(), 0);
        assertStatus(httpPageBufferClient, create, "queued", 0, 2, 2, 2, "not scheduled");
        atomicReference.set(new Duration(31.0d, TimeUnit.SECONDS));
        httpPageBufferClient.scheduleRequest();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(testingClientCallback.getPages().size(), 0);
        Assert.assertEquals(testingClientCallback.getCompletedRequests(), 3);
        Assert.assertEquals(testingClientCallback.getFinishedBuffers(), 0);
        Assert.assertEquals(testingClientCallback.getFailedBuffers(), 1);
        Assertions.assertInstanceOf(testingClientCallback.getFailure(), PageTransportTimeoutException.class);
        Assertions.assertContains(testingClientCallback.getFailure().getMessage(), "Encountered too many errors talking to a worker node. The node may have crashed or be under too much load. This is probably a transient issue, so please retry your query in a few minutes. (http://localhost:8080/0 - 3 failures, failure duration 31.00s, total failed request time 31.00s)");
        assertStatus(httpPageBufferClient, create, "queued", 0, 3, 3, 3, "not scheduled");
    }

    @Test
    public void testErrorCodes() {
        Assert.assertEquals(new PageTooLargeException().getErrorCode(), StandardErrorCode.PAGE_TOO_LARGE.toErrorCode());
        Assert.assertEquals(new PageTransportErrorException("").getErrorCode(), StandardErrorCode.PAGE_TRANSPORT_ERROR.toErrorCode());
        Assert.assertEquals(new PageTransportTimeoutException(HostAddress.fromParts("127.0.0.1", 8080), "", (Throwable) null).getErrorCode(), StandardErrorCode.PAGE_TRANSPORT_TIMEOUT.toErrorCode());
    }

    private static void assertStatus(HttpPageBufferClient httpPageBufferClient, URI uri, String str, int i, int i2, int i3, int i4, String str2) {
        PageBufferClientStatus status = httpPageBufferClient.getStatus();
        Assert.assertEquals(status.getUri(), uri);
        Assert.assertEquals(status.getState(), str, "status");
        Assert.assertEquals(status.getPagesReceived(), i, "pagesReceived");
        Assert.assertEquals(status.getRequestsScheduled(), i2, "requestsScheduled");
        Assert.assertEquals(status.getRequestsCompleted(), i3, "requestsCompleted");
        Assert.assertEquals(status.getRequestsFailed(), i4, "requestsFailed");
        Assert.assertEquals(status.getHttpRequestState(), str2, "httpRequestState");
    }

    private static void assertPageEquals(Page page, Page page2) {
        Assert.assertEquals(page2.getPositionCount(), page.getPositionCount());
        Assert.assertEquals(page2.getChannelCount(), page.getChannelCount());
    }
}
