package io.trino.operator;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HeaderName;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.testing.Assertions;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.FeaturesConfig;
import io.trino.block.BlockAssertions;
import io.trino.execution.buffer.PagesSerde;
import io.trino.execution.buffer.SerializedPage;
import io.trino.execution.buffer.TestingPagesSerdeFactory;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.SimpleLocalMemoryContext;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/operator/TestExchangeClient.class */
public class TestExchangeClient {
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();

    @BeforeClass
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed(getClass().getSimpleName() + "-%s"));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
            this.scheduler = null;
        }
        if (this.pageBufferClientCallbackExecutor != null) {
            this.pageBufferClientCallbackExecutor.shutdownNow();
            this.pageBufferClientCallbackExecutor = null;
        }
    }

    @Test
    public void testHappyPath() {
        DataSize of = DataSize.of(10L, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(of);
        URI create = URI.create("http://localhost:8080");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        mockExchangeRequestProcessor.setComplete(create);
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.of(32L, DataSize.Unit.MEGABYTE), of, 1, new Duration(1.0d, TimeUnit.MINUTES), true, new TestingHttpClient(mockExchangeRequestProcessor, this.scheduler), this.scheduler, new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), this.pageBufferClientCallbackExecutor);
        exchangeClient.addLocation(create);
        exchangeClient.noMoreLocations();
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(1));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(2));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(3));
        Assert.assertNull(getNextPage(exchangeClient));
        Assert.assertEquals(exchangeClient.isClosed(), true);
        ExchangeClientStatus status = exchangeClient.getStatus();
        Assert.assertEquals(status.getBufferedPages(), 0);
        Assert.assertEquals(status.getBufferedBytes(), 0L);
        assertStatus((PageBufferClientStatus) status.getPageBufferClientStatuses().get(0), create, "closed", 3, 3, 3, "not scheduled");
    }

    @Test(timeOut = 10000)
    public void testAddLocation() throws Exception {
        DataSize of = DataSize.of(10L, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(of);
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.of(32L, DataSize.Unit.MEGABYTE), of, 1, new Duration(1.0d, TimeUnit.MINUTES), true, new TestingHttpClient(mockExchangeRequestProcessor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed(getClass().getSimpleName() + "-testAddLocation-%s"))), this.scheduler, new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), this.pageBufferClientCallbackExecutor);
        URI create = URI.create("http://localhost:8081/foo");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        mockExchangeRequestProcessor.setComplete(create);
        exchangeClient.addLocation(create);
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(1));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(2));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(3));
        Assert.assertFalse(MoreFutures.tryGetFutureValue(exchangeClient.isBlocked(), 10, TimeUnit.MILLISECONDS).isPresent());
        Assert.assertEquals(exchangeClient.isClosed(), false);
        URI create2 = URI.create("http://localhost:8082/bar");
        mockExchangeRequestProcessor.addPage(create2, createPage(4));
        mockExchangeRequestProcessor.addPage(create2, createPage(5));
        mockExchangeRequestProcessor.addPage(create2, createPage(6));
        mockExchangeRequestProcessor.setComplete(create2);
        exchangeClient.addLocation(create2);
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(4));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(5));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(6));
        Assert.assertFalse(MoreFutures.tryGetFutureValue(exchangeClient.isBlocked(), 10, TimeUnit.MILLISECONDS).isPresent());
        Assert.assertEquals(exchangeClient.isClosed(), false);
        exchangeClient.noMoreLocations();
        while (!exchangeClient.isClosed()) {
            Thread.sleep(1L);
        }
        ImmutableMap uniqueIndex = Maps.uniqueIndex(exchangeClient.getStatus().getPageBufferClientStatuses(), (v0) -> {
            return v0.getUri();
        });
        assertStatus((PageBufferClientStatus) uniqueIndex.get(create), create, "closed", 3, 3, 3, "not scheduled");
        assertStatus((PageBufferClientStatus) uniqueIndex.get(create2), create2, "closed", 3, 3, 3, "not scheduled");
    }

    @Test
    public void testBufferLimit() {
        DataSize ofBytes = DataSize.ofBytes(1L);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(ofBytes);
        URI create = URI.create("http://localhost:8080");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        mockExchangeRequestProcessor.setComplete(create);
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.ofBytes(1L), ofBytes, 1, new Duration(1.0d, TimeUnit.MINUTES), true, new TestingHttpClient(mockExchangeRequestProcessor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed(getClass().getSimpleName() + "-testBufferLimit-%s"))), this.scheduler, new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), this.pageBufferClientCallbackExecutor);
        exchangeClient.addLocation(create);
        exchangeClient.noMoreLocations();
        Assert.assertEquals(exchangeClient.isClosed(), false);
        long nanoTime = System.nanoTime();
        do {
            Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);
        Assert.assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);
        assertStatus((PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "queued", 1, 1, 1, "not scheduled");
        assertPageEquals(exchangeClient.pollPage(), createPage(1));
        do {
            Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        assertStatus((PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "queued", 2, 2, 2, "not scheduled");
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);
        Assert.assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);
        assertPageEquals(exchangeClient.pollPage(), createPage(2));
        do {
            Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        assertStatus((PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "queued", 3, 3, 3, "not scheduled");
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);
        Assert.assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);
        assertPageEquals(getNextPage(exchangeClient), createPage(3));
        Assert.assertNull(getNextPage(exchangeClient));
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 0);
        Assert.assertTrue(exchangeClient.getStatus().getBufferedBytes() == 0);
        Assert.assertEquals(exchangeClient.isClosed(), true);
        assertStatus((PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "closed", 3, 5, 5, "not scheduled");
    }

    @Test
    public void testAbortOnDataCorruption() {
        ExchangeClient upDataCorruption = setUpDataCorruption(FeaturesConfig.DataIntegrityVerification.ABORT, URI.create("http://localhost:8080"));
        Assert.assertFalse(upDataCorruption.isClosed());
        org.assertj.core.api.Assertions.assertThatThrownBy(() -> {
            getNextPage(upDataCorruption);
        }).isInstanceOf(TrinoException.class).hasMessageMatching("Checksum verification failure on localhost when reading from http://localhost:8080/0: Data corruption, read checksum: 0xf91cfe5d2bc6e1c2, calculated checksum: 0x3c51297c7b78052f");
        Objects.requireNonNull(upDataCorruption);
        org.assertj.core.api.Assertions.assertThatThrownBy(upDataCorruption::isFinished).isInstanceOf(TrinoException.class).hasMessageMatching("Checksum verification failure on localhost when reading from http://localhost:8080/0: Data corruption, read checksum: 0xf91cfe5d2bc6e1c2, calculated checksum: 0x3c51297c7b78052f");
        upDataCorruption.close();
    }

    @Test
    public void testRetryDataCorruption() {
        URI create = URI.create("http://localhost:8080");
        ExchangeClient upDataCorruption = setUpDataCorruption(FeaturesConfig.DataIntegrityVerification.RETRY, create);
        Assert.assertFalse(upDataCorruption.isClosed());
        assertPageEquals(getNextPage(upDataCorruption), createPage(1));
        Assert.assertFalse(upDataCorruption.isClosed());
        assertPageEquals(getNextPage(upDataCorruption), createPage(2));
        Assert.assertNull(getNextPage(upDataCorruption));
        Assert.assertTrue(upDataCorruption.isClosed());
        ExchangeClientStatus status = upDataCorruption.getStatus();
        Assert.assertEquals(status.getBufferedPages(), 0);
        Assert.assertEquals(status.getBufferedBytes(), 0L);
        assertStatus((PageBufferClientStatus) status.getPageBufferClientStatuses().get(0), create, "closed", 2, 4, 4, "not scheduled");
    }

    private ExchangeClient setUpDataCorruption(FeaturesConfig.DataIntegrityVerification dataIntegrityVerification, URI uri) {
        DataSize of = DataSize.of(10L, DataSize.Unit.MEGABYTE);
        final MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(of);
        mockExchangeRequestProcessor.addPage(uri, createPage(1));
        mockExchangeRequestProcessor.addPage(uri, createPage(2));
        mockExchangeRequestProcessor.setComplete(uri);
        ExchangeClient exchangeClient = new ExchangeClient("localhost", dataIntegrityVerification, DataSize.of(32L, DataSize.Unit.MEGABYTE), of, 1, new Duration(1.0d, TimeUnit.MINUTES), true, new TestingHttpClient(new TestingHttpClient.Processor() { // from class: io.trino.operator.TestExchangeClient.1
            private int completedRequests;
            private TestingResponse savedResponse;

            public synchronized Response handle(Request request) throws Exception {
                if (this.completedRequests != 0) {
                    if (this.completedRequests != 1) {
                        this.completedRequests++;
                        return mockExchangeRequestProcessor.handle(request);
                    }
                    Verify.verify(this.savedResponse != null);
                    TestingResponse testingResponse = this.savedResponse;
                    this.savedResponse = null;
                    this.completedRequests++;
                    return testingResponse;
                }
                Verify.verify(this.savedResponse == null);
                TestingResponse handle = mockExchangeRequestProcessor.handle(request);
                Preconditions.checkState(handle.getStatusCode() == HttpStatus.OK.code(), "Unexpected status code: %s", handle.getStatusCode());
                ListMultimap listMultimap = (ListMultimap) handle.getHeaders().entries().stream().collect(ImmutableListMultimap.toImmutableListMultimap(entry -> {
                    return ((HeaderName) entry.getKey()).toString();
                }, (v0) -> {
                    return v0.getValue();
                }));
                byte[] byteArray = ByteStreams.toByteArray(handle.getInputStream());
                Preconditions.checkState(byteArray.length > 42, "too short");
                this.savedResponse = new TestingResponse(HttpStatus.OK, listMultimap, (byte[]) byteArray.clone());
                byteArray[42] = (byte) (byteArray[42] + 1);
                this.completedRequests++;
                return new TestingResponse(HttpStatus.OK, listMultimap, byteArray);
            }
        }, this.scheduler), this.scheduler, new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), this.pageBufferClientCallbackExecutor);
        exchangeClient.addLocation(uri);
        exchangeClient.noMoreLocations();
        return exchangeClient;
    }

    @Test
    public void testClose() throws Exception {
        DataSize ofBytes = DataSize.ofBytes(1L);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(ofBytes);
        URI create = URI.create("http://localhost:8080");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.ofBytes(1L), ofBytes, 1, new Duration(1.0d, TimeUnit.MINUTES), true, new TestingHttpClient(mockExchangeRequestProcessor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed(getClass().getSimpleName() + "-testClose-%s"))), this.scheduler, new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), this.pageBufferClientCallbackExecutor);
        exchangeClient.addLocation(create);
        exchangeClient.noMoreLocations();
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(getNextPage(exchangeClient), createPage(1));
        exchangeClient.close();
        while (!exchangeClient.isFinished()) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals(exchangeClient.isClosed(), true);
        Assert.assertNull(exchangeClient.pollPage());
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 0);
        Assert.assertEquals(exchangeClient.getStatus().getBufferedBytes(), 0L);
        PageBufferClientStatus pageBufferClientStatus = (PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0);
        Assert.assertEquals(pageBufferClientStatus.getUri(), create);
        Assert.assertEquals(pageBufferClientStatus.getState(), "closed", "status");
        Assert.assertEquals(pageBufferClientStatus.getHttpRequestState(), "not scheduled", "httpRequestState");
    }

    private static Page createPage(int i) {
        return new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, i)});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static SerializedPage getNextPage(ExchangeClient exchangeClient) {
        return (SerializedPage) MoreFutures.tryGetFutureValue(Futures.transform(exchangeClient.isBlocked(), r3 -> {
            return exchangeClient.pollPage();
        }, MoreExecutors.directExecutor()), 100, TimeUnit.SECONDS).orElse(null);
    }

    private static void assertPageEquals(SerializedPage serializedPage, Page page) {
        Assert.assertNotNull(serializedPage);
        Assert.assertEquals(serializedPage.getPositionCount(), page.getPositionCount());
        Assert.assertEquals(PAGES_SERDE.deserialize(serializedPage).getChannelCount(), page.getChannelCount());
    }

    private static void assertStatus(PageBufferClientStatus pageBufferClientStatus, URI uri, String str, int i, int i2, int i3, String str2) {
        Assert.assertEquals(pageBufferClientStatus.getUri(), uri);
        Assert.assertEquals(pageBufferClientStatus.getState(), str, "status");
        Assert.assertEquals(pageBufferClientStatus.getPagesReceived(), i, "pagesReceived");
        Assert.assertEquals(pageBufferClientStatus.getRequestsScheduled(), i2, "requestsScheduled");
        Assert.assertEquals(pageBufferClientStatus.getRequestsCompleted(), i3, "requestsCompleted");
        Assert.assertEquals(pageBufferClientStatus.getHttpRequestState(), str2, "httpRequestState");
    }
}
