package io.trino.operator;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.util.Objects;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/trino/operator/TestStreamingDirectExchangeBuffer.class */
public class TestStreamingDirectExchangeBuffer {
    private static final StageId STAGE_ID = new StageId(new QueryId("query"), 0);
    private static final TaskId TASK_0 = new TaskId(STAGE_ID, 0, 0);
    private static final TaskId TASK_1 = new TaskId(STAGE_ID, 1, 0);
    private static final Slice PAGE_0 = Slices.utf8Slice("page0");
    private static final Slice PAGE_1 = Slices.utf8Slice("page-1");
    private static final Slice PAGE_2 = Slices.utf8Slice("page-_2");

    @Test
    public void testHappyPath() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            ListenableFuture isBlocked = streamingDirectExchangeBuffer.isBlocked();
            Assertions.assertThat(isBlocked.isDone()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isNull();
            streamingDirectExchangeBuffer.addTask(TASK_0);
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            Assertions.assertThat(isBlocked.isDone()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isNull();
            streamingDirectExchangeBuffer.addTask(TASK_1);
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            Assertions.assertThat(isBlocked.isDone()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isNull();
            streamingDirectExchangeBuffer.noMoreTasks();
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            Assertions.assertThat(isBlocked.isDone()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isNull();
            streamingDirectExchangeBuffer.addPages(TASK_0, ImmutableList.of(PAGE_0));
            Assertions.assertThat(streamingDirectExchangeBuffer.getBufferedPageCount()).isEqualTo(1);
            Assertions.assertThat(streamingDirectExchangeBuffer.getRetainedSizeInBytes()).isEqualTo(PAGE_0.getRetainedSize());
            Assertions.assertThat(streamingDirectExchangeBuffer.getMaxRetainedSizeInBytes()).isEqualTo(PAGE_0.getRetainedSize());
            Assertions.assertThat(streamingDirectExchangeBuffer.getRemainingCapacityInBytes()).isEqualTo(DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes() - PAGE_0.getRetainedSize());
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            Assertions.assertThat(isBlocked.isDone()).isTrue();
            Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isEqualTo(PAGE_0);
            ListenableFuture isBlocked2 = streamingDirectExchangeBuffer.isBlocked();
            Assertions.assertThat(streamingDirectExchangeBuffer.getRetainedSizeInBytes()).isEqualTo(0L);
            Assertions.assertThat(streamingDirectExchangeBuffer.getMaxRetainedSizeInBytes()).isEqualTo(PAGE_0.getRetainedSize());
            Assertions.assertThat(streamingDirectExchangeBuffer.getRemainingCapacityInBytes()).isEqualTo(DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes());
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            Assertions.assertThat(isBlocked2.isDone()).isFalse();
            streamingDirectExchangeBuffer.taskFinished(TASK_0);
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.isBlocked().isDone()).isFalse();
            streamingDirectExchangeBuffer.addPages(TASK_1, ImmutableList.of(PAGE_1, PAGE_2));
            Assertions.assertThat(streamingDirectExchangeBuffer.getBufferedPageCount()).isEqualTo(2);
            Assertions.assertThat(streamingDirectExchangeBuffer.getRetainedSizeInBytes()).isEqualTo(PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assertions.assertThat(streamingDirectExchangeBuffer.getMaxRetainedSizeInBytes()).isEqualTo(PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assertions.assertThat(streamingDirectExchangeBuffer.getRemainingCapacityInBytes()).isEqualTo((DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes() - PAGE_1.getRetainedSize()) - PAGE_2.getRetainedSize());
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.isBlocked().isDone()).isTrue();
            Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isEqualTo(PAGE_1);
            Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isEqualTo(PAGE_2);
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.isBlocked().isDone()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.getRetainedSizeInBytes()).isEqualTo(0L);
            Assertions.assertThat(streamingDirectExchangeBuffer.getMaxRetainedSizeInBytes()).isEqualTo(PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assertions.assertThat(streamingDirectExchangeBuffer.getRemainingCapacityInBytes()).isEqualTo(DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes());
            streamingDirectExchangeBuffer.taskFinished(TASK_1);
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isTrue();
            Assertions.assertThat(isBlocked2.isDone()).isTrue();
            streamingDirectExchangeBuffer.close();
        } catch (Throwable th) {
            try {
                streamingDirectExchangeBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testClose() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        streamingDirectExchangeBuffer.addTask(TASK_0);
        streamingDirectExchangeBuffer.addTask(TASK_1);
        Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
        Assertions.assertThat(streamingDirectExchangeBuffer.isBlocked().isDone()).isFalse();
        Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isNull();
        streamingDirectExchangeBuffer.close();
        Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isTrue();
        Assertions.assertThat(streamingDirectExchangeBuffer.isBlocked().isDone()).isTrue();
        Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isNull();
    }

    @Test
    public void testIsFinished() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            ListenableFuture isBlocked = streamingDirectExchangeBuffer.isBlocked();
            Assertions.assertThat(isBlocked.isDone()).isFalse();
            streamingDirectExchangeBuffer.noMoreTasks();
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isTrue();
            Assertions.assertThat(isBlocked.isDone()).isTrue();
            streamingDirectExchangeBuffer.close();
            StreamingDirectExchangeBuffer streamingDirectExchangeBuffer2 = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
            try {
                Assertions.assertThat(streamingDirectExchangeBuffer2.isFinished()).isFalse();
                ListenableFuture isBlocked2 = streamingDirectExchangeBuffer2.isBlocked();
                Assertions.assertThat(isBlocked2.isDone()).isFalse();
                streamingDirectExchangeBuffer2.addTask(TASK_0);
                streamingDirectExchangeBuffer2.noMoreTasks();
                Assertions.assertThat(streamingDirectExchangeBuffer2.isFinished()).isFalse();
                Assertions.assertThat(isBlocked2.isDone()).isFalse();
                streamingDirectExchangeBuffer2.taskFinished(TASK_0);
                Assertions.assertThat(streamingDirectExchangeBuffer2.isFinished()).isTrue();
                Assertions.assertThat(isBlocked2.isDone()).isTrue();
                streamingDirectExchangeBuffer2.close();
                streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
                try {
                    Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
                    ListenableFuture isBlocked3 = streamingDirectExchangeBuffer.isBlocked();
                    Assertions.assertThat(isBlocked3.isDone()).isFalse();
                    streamingDirectExchangeBuffer.addTask(TASK_0);
                    Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
                    Assertions.assertThat(isBlocked3.isDone()).isFalse();
                    RuntimeException runtimeException = new RuntimeException();
                    streamingDirectExchangeBuffer.taskFailed(TASK_0, runtimeException);
                    Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
                    Assertions.assertThat(streamingDirectExchangeBuffer.isFailed()).isTrue();
                    Assertions.assertThat(isBlocked3.isDone()).isTrue();
                    Objects.requireNonNull(streamingDirectExchangeBuffer);
                    Assertions.assertThatThrownBy(streamingDirectExchangeBuffer::pollPage).isEqualTo(runtimeException);
                    streamingDirectExchangeBuffer.close();
                } finally {
                    try {
                        streamingDirectExchangeBuffer.close();
                    } catch (Throwable th) {
                        th.addSuppressed(th);
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testFutureCancellationDoesNotAffectOtherFutures() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            ListenableFuture isBlocked = streamingDirectExchangeBuffer.isBlocked();
            ListenableFuture isBlocked2 = streamingDirectExchangeBuffer.isBlocked();
            ListenableFuture isBlocked3 = streamingDirectExchangeBuffer.isBlocked();
            Assertions.assertThat(isBlocked.isDone()).isFalse();
            Assertions.assertThat(isBlocked2.isDone()).isFalse();
            Assertions.assertThat(isBlocked3.isDone()).isFalse();
            isBlocked3.cancel(true);
            Assertions.assertThat(isBlocked.isDone()).isFalse();
            Assertions.assertThat(isBlocked2.isDone()).isFalse();
            streamingDirectExchangeBuffer.noMoreTasks();
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isTrue();
            Assertions.assertThat(isBlocked.isDone()).isTrue();
            Assertions.assertThat(isBlocked2.isDone()).isTrue();
            streamingDirectExchangeBuffer.close();
        } catch (Throwable th) {
            try {
                streamingDirectExchangeBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testRemoteTaskFailedError() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            streamingDirectExchangeBuffer.addTask(TASK_0);
            streamingDirectExchangeBuffer.taskFailed(TASK_0, new TrinoException(StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
            streamingDirectExchangeBuffer.noMoreTasks();
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.isFailed()).isFalse();
            Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isNull();
            streamingDirectExchangeBuffer.close();
            streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
            try {
                streamingDirectExchangeBuffer.addTask(TASK_0);
                streamingDirectExchangeBuffer.noMoreTasks();
                streamingDirectExchangeBuffer.taskFailed(TASK_0, new TrinoException(StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
                Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
                Assertions.assertThat(streamingDirectExchangeBuffer.isFailed()).isFalse();
                Assertions.assertThat(streamingDirectExchangeBuffer.pollPage()).isNull();
                streamingDirectExchangeBuffer.close();
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testSingleWakeUp() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assertions.assertThat(streamingDirectExchangeBuffer.isFinished()).isFalse();
            ListenableFuture isBlocked = streamingDirectExchangeBuffer.isBlocked();
            ListenableFuture isBlocked2 = streamingDirectExchangeBuffer.isBlocked();
            Assertions.assertThat(isBlocked.isDone()).isFalse();
            Assertions.assertThat(isBlocked2.isDone()).isFalse();
            streamingDirectExchangeBuffer.addTask(TASK_0);
            streamingDirectExchangeBuffer.addPages(TASK_0, ImmutableList.of(PAGE_0));
            streamingDirectExchangeBuffer.pollPage();
            Assertions.assertThat(isBlocked.isDone()).isTrue();
            Assertions.assertThat(isBlocked2.isDone()).isFalse();
            streamingDirectExchangeBuffer.addPages(TASK_0, ImmutableList.of(PAGE_0));
            streamingDirectExchangeBuffer.pollPage();
            Assertions.assertThat(isBlocked2.isDone()).isTrue();
            streamingDirectExchangeBuffer.close();
        } catch (Throwable th) {
            try {
                streamingDirectExchangeBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
