package io.prestosql.operator.exchange;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.testing.Assertions;
import io.airlift.units.DataSize;
import io.prestosql.SequencePageBuilder;
import io.prestosql.execution.Lifespan;
import io.prestosql.operator.BenchmarkWindowOperator;
import io.prestosql.operator.InterpretedHashGenerator;
import io.prestosql.operator.PageAssertions;
import io.prestosql.operator.PipelineExecutionStrategy;
import io.prestosql.operator.exchange.LocalExchange;
import io.prestosql.spi.Page;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.Type;
import io.prestosql.sql.planner.SystemPartitioningHandle;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:io/prestosql/operator/exchange/TestLocalExchange.class */
public class TestLocalExchange {
    private static final List<Type> TYPES = ImmutableList.of(BigintType.BIGINT);
    private static final DataSize RETAINED_PAGE_SIZE = new DataSize(createPage(42).getRetainedSizeInBytes(), DataSize.Unit.BYTE);
    private static final DataSize LOCAL_EXCHANGE_MAX_BUFFERED_BYTES = new DataSize(32.0d, DataSize.Unit.MEGABYTE);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.prestosql.operator.exchange.TestLocalExchange$1, reason: invalid class name */
    /* loaded from: input_file:io/prestosql/operator/exchange/TestLocalExchange$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$prestosql$operator$PipelineExecutionStrategy = new int[PipelineExecutionStrategy.values().length];

        static {
            try {
                $SwitchMap$io$prestosql$operator$PipelineExecutionStrategy[PipelineExecutionStrategy.UNGROUPED_EXECUTION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$prestosql$operator$PipelineExecutionStrategy[PipelineExecutionStrategy.GROUPED_EXECUTION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] executionStrategy() {
        return new Object[]{new Object[]{PipelineExecutionStrategy.UNGROUPED_EXECUTION}, new Object[]{PipelineExecutionStrategy.GROUPED_EXECUTION}};
    }

    @Test(dataProvider = "executionStrategy")
    public void testGatherSingleWriter(PipelineExecutionStrategy pipelineExecutionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(SystemPartitioningHandle.SINGLE_DISTRIBUTION, 8, TYPES, ImmutableList.of(), Optional.empty(), pipelineExecutionStrategy, new DataSize(retainedSizeOfPages(99), DataSize.Unit.BYTE));
        LocalExchange.LocalExchangeSinkFactoryId newSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        run(localExchangeFactory, pipelineExecutionStrategy, localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 1);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = localExchange.getSinkFactory(newSinkFactoryId);
            LocalExchangeSource source = localExchange.getSource(0);
            assertSource(source, 0);
            LocalExchangeSink createSink = sinkFactory.createSink();
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            assertSinkCanWrite(createSink);
            assertSource(source, 0);
            ListenableFuture waitForReading = source.waitForReading();
            Assert.assertFalse(waitForReading.isDone());
            createSink.addPage(createPage(0));
            Assert.assertTrue(waitForReading.isDone());
            assertExchangeTotalBufferedBytes(localExchange, 1);
            assertSource(source, 1);
            createSink.addPage(createPage(1));
            assertSource(source, 2);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            assertRemovePage(source, createPage(0));
            assertSource(source, 1);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            assertRemovePage(source, createPage(1));
            assertSource(source, 0);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            createSink.addPage(createPage(2));
            createSink.addPage(createPage(3));
            assertSource(source, 2);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            createSink.finish();
            assertSinkFinished(createSink);
            assertSource(source, 2);
            assertRemovePage(source, createPage(2));
            assertSource(source, 1);
            assertSinkFinished(createSink);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            assertRemovePage(source, createPage(3));
            assertSourceFinished(source);
            assertExchangeTotalBufferedBytes(localExchange, 0);
        });
    }

    @Test(dataProvider = "executionStrategy")
    public void testBroadcast(PipelineExecutionStrategy pipelineExecutionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION, 2, TYPES, ImmutableList.of(), Optional.empty(), pipelineExecutionStrategy, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES);
        LocalExchange.LocalExchangeSinkFactoryId newSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        run(localExchangeFactory, pipelineExecutionStrategy, localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = localExchange.getSinkFactory(newSinkFactoryId);
            LocalExchangeSink createSink = sinkFactory.createSink();
            assertSinkCanWrite(createSink);
            LocalExchangeSink createSink2 = sinkFactory.createSink();
            assertSinkCanWrite(createSink2);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource source = localExchange.getSource(0);
            assertSource(source, 0);
            LocalExchangeSource source2 = localExchange.getSource(1);
            assertSource(source2, 0);
            createSink.addPage(createPage(0));
            assertSource(source, 1);
            assertSource(source2, 1);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            createSink.addPage(createPage(0));
            assertSource(source, 2);
            assertSource(source2, 2);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            assertRemovePage(source, createPage(0));
            assertSource(source, 1);
            assertSource(source2, 2);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            assertRemovePage(source, createPage(0));
            assertSource(source, 0);
            assertSource(source2, 2);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            createSink.finish();
            assertSinkFinished(createSink);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            createSink2.addPage(createPage(0));
            assertSource(source, 1);
            assertSource(source2, 3);
            assertExchangeTotalBufferedBytes(localExchange, 3);
            createSink2.finish();
            assertSinkFinished(createSink2);
            assertSource(source, 1);
            assertSource(source2, 3);
            assertExchangeTotalBufferedBytes(localExchange, 3);
            assertRemovePage(source, createPage(0));
            assertSourceFinished(source);
            assertSource(source2, 3);
            assertExchangeTotalBufferedBytes(localExchange, 3);
            assertRemovePage(source2, createPage(0));
            assertRemovePage(source2, createPage(0));
            assertSourceFinished(source);
            assertSource(source2, 1);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            assertRemovePage(source2, createPage(0));
            assertSourceFinished(source);
            assertSourceFinished(source2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
        });
    }

    @Test(dataProvider = "executionStrategy")
    public void testRandom(PipelineExecutionStrategy pipelineExecutionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION, 2, TYPES, ImmutableList.of(), Optional.empty(), pipelineExecutionStrategy, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES);
        LocalExchange.LocalExchangeSinkFactoryId newSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        run(localExchangeFactory, pipelineExecutionStrategy, localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = localExchange.getSinkFactory(newSinkFactoryId);
            LocalExchangeSink createSink = sinkFactory.createSink();
            assertSinkCanWrite(createSink);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource source = localExchange.getSource(0);
            assertSource(source, 0);
            LocalExchangeSource source2 = localExchange.getSource(1);
            assertSource(source2, 0);
            for (int i = 0; i < 100; i++) {
                createSink.addPage(createPage(0));
                assertExchangeTotalBufferedBytes(localExchange, i + 1);
                LocalExchangeBufferInfo bufferInfo = source.getBufferInfo();
                LocalExchangeBufferInfo bufferInfo2 = source2.getBufferInfo();
                Assert.assertEquals(bufferInfo.getBufferedBytes() + bufferInfo2.getBufferedBytes(), retainedSizeOfPages(i + 1));
                Assert.assertEquals(bufferInfo.getBufferedPages() + bufferInfo2.getBufferedPages(), i + 1);
            }
            Assert.assertTrue(source.getBufferInfo().getBufferedPages() > 0);
            Assert.assertTrue(source2.getBufferInfo().getBufferedPages() > 0);
            assertExchangeTotalBufferedBytes(localExchange, 100);
        });
    }

    @Test(dataProvider = "executionStrategy")
    public void testPassthrough(PipelineExecutionStrategy pipelineExecutionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION, 2, TYPES, ImmutableList.of(), Optional.empty(), pipelineExecutionStrategy, new DataSize(retainedSizeOfPages(1), DataSize.Unit.BYTE));
        LocalExchange.LocalExchangeSinkFactoryId newSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        run(localExchangeFactory, pipelineExecutionStrategy, localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = localExchange.getSinkFactory(newSinkFactoryId);
            LocalExchangeSink createSink = sinkFactory.createSink();
            LocalExchangeSink createSink2 = sinkFactory.createSink();
            assertSinkCanWrite(createSink);
            assertSinkCanWrite(createSink2);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource source = localExchange.getSource(0);
            assertSource(source, 0);
            LocalExchangeSource source2 = localExchange.getSource(1);
            assertSource(source2, 0);
            createSink.addPage(createPage(0));
            assertSource(source, 1);
            assertSource(source2, 0);
            assertSinkWriteBlocked(createSink);
            assertSinkCanWrite(createSink2);
            createSink2.addPage(createPage(1));
            assertSource(source, 1);
            assertSource(source2, 1);
            assertSinkWriteBlocked(createSink);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            assertRemovePage(source, createPage(0));
            assertSource(source, 0);
            assertSinkCanWrite(createSink);
            assertSinkWriteBlocked(createSink2);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            createSink.finish();
            assertSinkFinished(createSink);
            assertSource(source2, 1);
            source.finish();
            source2.finish();
            assertRemovePage(source2, createPage(1));
            assertSourceFinished(source);
            assertSourceFinished(source2);
            assertSinkFinished(createSink2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
        });
    }

    @Test(dataProvider = "executionStrategy")
    public void testPartition(PipelineExecutionStrategy pipelineExecutionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 2, TYPES, ImmutableList.of(0), Optional.empty(), pipelineExecutionStrategy, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES);
        LocalExchange.LocalExchangeSinkFactoryId newSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        run(localExchangeFactory, pipelineExecutionStrategy, localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = localExchange.getSinkFactory(newSinkFactoryId);
            LocalExchangeSink createSink = sinkFactory.createSink();
            assertSinkCanWrite(createSink);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource source = localExchange.getSource(0);
            assertSource(source, 0);
            LocalExchangeSource source2 = localExchange.getSource(1);
            assertSource(source2, 0);
            createSink.addPage(createPage(0));
            assertSource(source, 1);
            assertSource(source2, 1);
            Assert.assertTrue(localExchange.getBufferedBytes() >= retainedSizeOfPages(1));
            createSink.addPage(createPage(0));
            assertSource(source, 2);
            assertSource(source2, 2);
            Assert.assertTrue(localExchange.getBufferedBytes() >= retainedSizeOfPages(2));
            assertPartitionedRemovePage(source, 0, 2);
            assertSource(source, 1);
            assertSource(source2, 2);
            assertPartitionedRemovePage(source, 0, 2);
            assertSource(source, 0);
            assertSource(source2, 2);
            createSink.finish();
            assertSinkFinished(createSink);
            assertSourceFinished(source);
            assertSource(source2, 2);
            assertPartitionedRemovePage(source2, 1, 2);
            assertSourceFinished(source);
            assertSource(source2, 1);
            assertPartitionedRemovePage(source2, 1, 2);
            assertSourceFinished(source);
            assertSourceFinished(source2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
        });
    }

    @Test(dataProvider = "executionStrategy")
    public void writeUnblockWhenAllReadersFinish(PipelineExecutionStrategy pipelineExecutionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION, 2, ImmutableList.of(BigintType.BIGINT), ImmutableList.of(), Optional.empty(), pipelineExecutionStrategy, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES);
        LocalExchange.LocalExchangeSinkFactoryId newSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        run(localExchangeFactory, pipelineExecutionStrategy, localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = localExchange.getSinkFactory(newSinkFactoryId);
            LocalExchangeSink createSink = sinkFactory.createSink();
            assertSinkCanWrite(createSink);
            LocalExchangeSink createSink2 = sinkFactory.createSink();
            assertSinkCanWrite(createSink2);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource source = localExchange.getSource(0);
            assertSource(source, 0);
            LocalExchangeSource source2 = localExchange.getSource(1);
            assertSource(source2, 0);
            source.finish();
            assertSourceFinished(source);
            assertSinkCanWrite(createSink);
            assertSinkCanWrite(createSink2);
            source2.finish();
            assertSourceFinished(source2);
            assertSinkFinished(createSink);
            assertSinkFinished(createSink2);
        });
    }

    @Test(dataProvider = "executionStrategy")
    public void writeUnblockWhenAllReadersFinishAndPagesConsumed(PipelineExecutionStrategy pipelineExecutionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION, 2, TYPES, ImmutableList.of(), Optional.empty(), pipelineExecutionStrategy, new DataSize(1.0d, DataSize.Unit.BYTE));
        LocalExchange.LocalExchangeSinkFactoryId newSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        run(localExchangeFactory, pipelineExecutionStrategy, localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = localExchange.getSinkFactory(newSinkFactoryId);
            LocalExchangeSink createSink = sinkFactory.createSink();
            assertSinkCanWrite(createSink);
            LocalExchangeSink createSink2 = sinkFactory.createSink();
            assertSinkCanWrite(createSink2);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource source = localExchange.getSource(0);
            assertSource(source, 0);
            LocalExchangeSource source2 = localExchange.getSource(1);
            assertSource(source2, 0);
            createSink.addPage(createPage(0));
            ListenableFuture<?> assertSinkWriteBlocked = assertSinkWriteBlocked(createSink);
            ListenableFuture<?> assertSinkWriteBlocked2 = assertSinkWriteBlocked(createSink2);
            assertSource(source, 1);
            assertSource(source2, 1);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            source.finish();
            assertSource(source, 1);
            assertRemovePage(source, createPage(0));
            assertSourceFinished(source);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            assertSource(source2, 1);
            assertSinkWriteBlocked(createSink);
            assertSinkWriteBlocked(createSink2);
            source2.finish();
            assertSource(source2, 1);
            assertRemovePage(source2, createPage(0));
            assertSourceFinished(source2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            Assert.assertTrue(assertSinkWriteBlocked.isDone());
            Assert.assertTrue(assertSinkWriteBlocked2.isDone());
            assertSinkFinished(createSink);
            assertSinkFinished(createSink2);
        });
    }

    @Test
    public void testMismatchedExecutionStrategy() {
        try {
            new LocalExchange.LocalExchangeFactory(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 2, TYPES, ImmutableList.of(0), Optional.empty(), PipelineExecutionStrategy.UNGROUPED_EXECUTION, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES).getLocalExchange(Lifespan.driverGroup(3));
            Assert.fail("expected failure");
        } catch (IllegalArgumentException e) {
            Assertions.assertContains(e.getMessage(), "Driver-group exchange cannot be created.");
        }
        try {
            new LocalExchange.LocalExchangeFactory(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 2, TYPES, ImmutableList.of(0), Optional.empty(), PipelineExecutionStrategy.GROUPED_EXECUTION, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES).getLocalExchange(Lifespan.taskWide());
            Assert.fail("expected failure");
        } catch (IllegalArgumentException e2) {
            Assertions.assertContains(e2.getMessage(), "Task-wide exchange cannot be created.");
        }
    }

    private void run(LocalExchange.LocalExchangeFactory localExchangeFactory, PipelineExecutionStrategy pipelineExecutionStrategy, Consumer<LocalExchange> consumer) {
        switch (AnonymousClass1.$SwitchMap$io$prestosql$operator$PipelineExecutionStrategy[pipelineExecutionStrategy.ordinal()]) {
            case 1:
                consumer.accept(localExchangeFactory.getLocalExchange(Lifespan.taskWide()));
                return;
            case BenchmarkWindowOperator.Context.NUMBER_OF_GROUP_COLUMNS /* 2 */:
                consumer.accept(localExchangeFactory.getLocalExchange(Lifespan.driverGroup(1)));
                consumer.accept(localExchangeFactory.getLocalExchange(Lifespan.driverGroup(12)));
                consumer.accept(localExchangeFactory.getLocalExchange(Lifespan.driverGroup(23)));
                return;
            default:
                throw new IllegalArgumentException("Unknown pipelineExecutionStrategy");
        }
    }

    private static void assertSource(LocalExchangeSource localExchangeSource, int i) {
        LocalExchangeBufferInfo bufferInfo = localExchangeSource.getBufferInfo();
        Assert.assertEquals(bufferInfo.getBufferedPages(), i);
        Assert.assertFalse(localExchangeSource.isFinished());
        if (i != 0) {
            Assert.assertTrue(localExchangeSource.waitForReading().isDone());
            Assert.assertTrue(bufferInfo.getBufferedBytes() > 0);
            return;
        }
        Assert.assertFalse(localExchangeSource.waitForReading().isDone());
        Assert.assertNull(localExchangeSource.removePage());
        Assert.assertFalse(localExchangeSource.waitForReading().isDone());
        Assert.assertFalse(localExchangeSource.isFinished());
        Assert.assertEquals(bufferInfo.getBufferedBytes(), 0L);
    }

    private static void assertSourceFinished(LocalExchangeSource localExchangeSource) {
        Assert.assertTrue(localExchangeSource.isFinished());
        LocalExchangeBufferInfo bufferInfo = localExchangeSource.getBufferInfo();
        Assert.assertEquals(bufferInfo.getBufferedPages(), 0);
        Assert.assertEquals(bufferInfo.getBufferedBytes(), 0L);
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Assert.assertNull(localExchangeSource.removePage());
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Assert.assertTrue(localExchangeSource.isFinished());
    }

    private static void assertRemovePage(LocalExchangeSource localExchangeSource, Page page) {
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Page removePage = localExchangeSource.removePage();
        Assert.assertNotNull(removePage);
        Assert.assertEquals(removePage.getChannelCount(), page.getChannelCount());
        PageAssertions.assertPageEquals(TYPES, removePage, page);
    }

    private static void assertPartitionedRemovePage(LocalExchangeSource localExchangeSource, int i, int i2) {
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Page removePage = localExchangeSource.removePage();
        Assert.assertNotNull(removePage);
        LocalPartitionGenerator localPartitionGenerator = new LocalPartitionGenerator(new InterpretedHashGenerator(TYPES, new int[]{0}), i2);
        for (int i3 = 0; i3 < removePage.getPositionCount(); i3++) {
            Assert.assertEquals(localPartitionGenerator.getPartition(removePage, i3), i);
        }
    }

    private static void assertSinkCanWrite(LocalExchangeSink localExchangeSink) {
        Assert.assertFalse(localExchangeSink.isFinished());
        Assert.assertTrue(localExchangeSink.waitForWriting().isDone());
    }

    private static ListenableFuture<?> assertSinkWriteBlocked(LocalExchangeSink localExchangeSink) {
        Assert.assertFalse(localExchangeSink.isFinished());
        ListenableFuture<?> waitForWriting = localExchangeSink.waitForWriting();
        Assert.assertFalse(waitForWriting.isDone());
        return waitForWriting;
    }

    private static void assertSinkFinished(LocalExchangeSink localExchangeSink) {
        Assert.assertTrue(localExchangeSink.isFinished());
        Assert.assertTrue(localExchangeSink.waitForWriting().isDone());
        localExchangeSink.addPage(createPage(0));
        Assert.assertTrue(localExchangeSink.isFinished());
        Assert.assertTrue(localExchangeSink.waitForWriting().isDone());
    }

    private static void assertExchangeTotalBufferedBytes(LocalExchange localExchange, int i) {
        Assert.assertEquals(localExchange.getBufferedBytes(), retainedSizeOfPages(i));
    }

    private static Page createPage(int i) {
        return SequencePageBuilder.createSequencePage(TYPES, 100, i);
    }

    public static long retainedSizeOfPages(int i) {
        return RETAINED_PAGE_SIZE.toBytes() * i;
    }
}
