package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.MoreFutures;
import io.airlift.testing.TestingTicker;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.client.NodeVersion;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.scheduler.NodeAllocator;
import io.trino.memory.MemoryInfo;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.memory.MemoryPoolInfo;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingSession;
import java.net.URI;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/execution/scheduler/TestBinPackingNodeAllocator.class */
public class TestBinPackingNodeAllocator {
    private static final Session SESSION = TestingSession.testSessionBuilder().build();
    private static final HostAddress NODE_1_ADDRESS = HostAddress.fromParts("127.0.0.1", 8080);
    private static final HostAddress NODE_2_ADDRESS = HostAddress.fromParts("127.0.0.1", 8081);
    private static final HostAddress NODE_3_ADDRESS = HostAddress.fromParts("127.0.0.1", 8082);
    private static final HostAddress NODE_4_ADDRESS = HostAddress.fromParts("127.0.0.1", 8083);
    private static final InternalNode NODE_1 = new InternalNode("node-1", URI.create("local://" + NODE_1_ADDRESS), NodeVersion.UNKNOWN, false);
    private static final InternalNode NODE_2 = new InternalNode("node-2", URI.create("local://" + NODE_2_ADDRESS), NodeVersion.UNKNOWN, false);
    private static final InternalNode NODE_3 = new InternalNode("node-3", URI.create("local://" + NODE_3_ADDRESS), NodeVersion.UNKNOWN, false);
    private static final InternalNode NODE_4 = new InternalNode("node-4", URI.create("local://" + NODE_4_ADDRESS), NodeVersion.UNKNOWN, false);
    private static final CatalogHandle CATALOG_1 = TestingHandles.createTestCatalogHandle("catalog1");
    private static final NodeRequirements REQ_NONE = new NodeRequirements(Optional.empty(), Set.of());
    private static final NodeRequirements REQ_NODE_1 = new NodeRequirements(Optional.empty(), Set.of(NODE_1_ADDRESS));
    private static final NodeRequirements REQ_NODE_2 = new NodeRequirements(Optional.empty(), Set.of(NODE_2_ADDRESS));
    private static final NodeRequirements REQ_CATALOG_1 = new NodeRequirements(Optional.of(CATALOG_1), Set.of());
    private static final long TEST_TIMEOUT = 2500;
    private BinPackingNodeAllocatorService nodeAllocatorService;
    private ConcurrentHashMap<String, Optional<MemoryInfo>> workerMemoryInfos;
    private final TestingTicker ticker = new TestingTicker();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/trino/execution/scheduler/TestBinPackingNodeAllocator$ThrowingRunnable.class */
    public interface ThrowingRunnable {
        void run() throws Exception;
    }

    private void setupNodeAllocatorService(InMemoryNodeManager inMemoryNodeManager) {
        setupNodeAllocatorService(inMemoryNodeManager, DataSize.ofBytes(0L));
    }

    private void setupNodeAllocatorService(InMemoryNodeManager inMemoryNodeManager, DataSize dataSize) {
        shutdownNodeAllocatorService();
        this.workerMemoryInfos = new ConcurrentHashMap<>();
        MemoryInfo buildWorkerMemoryInfo = buildWorkerMemoryInfo(DataSize.ofBytes(0L), ImmutableMap.of());
        this.workerMemoryInfos.put(NODE_1.getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo));
        this.workerMemoryInfos.put(NODE_2.getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo));
        this.workerMemoryInfos.put(NODE_3.getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo));
        this.workerMemoryInfos.put(NODE_4.getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo));
        this.nodeAllocatorService = new BinPackingNodeAllocatorService(inMemoryNodeManager, () -> {
            return this.workerMemoryInfos;
        }, false, false, Duration.of(1L, ChronoUnit.MINUTES), dataSize, this.ticker);
        this.nodeAllocatorService.start();
    }

    private void updateWorkerUsedMemory(InternalNode internalNode, DataSize dataSize, Map<TaskId, DataSize> map) {
        this.workerMemoryInfos.put(internalNode.getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo(dataSize, map)));
    }

    private MemoryInfo buildWorkerMemoryInfo(DataSize dataSize, Map<TaskId, DataSize> map) {
        return new MemoryInfo(4, new MemoryPoolInfo(DataSize.of(64L, DataSize.Unit.GIGABYTE).toBytes(), dataSize.toBytes(), 0L, ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of(), (Map) map.entrySet().stream().collect(ImmutableMap.toImmutableMap(entry -> {
            return ((TaskId) entry.getKey()).toString();
        }, entry2 -> {
            return Long.valueOf(((DataSize) entry2.getValue()).toBytes());
        })), ImmutableMap.of()));
    }

    @AfterMethod(alwaysRun = true)
    public void shutdownNodeAllocatorService() {
        if (this.nodeAllocatorService != null) {
            this.nodeAllocatorService.stop();
        }
        this.nodeAllocatorService = null;
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testAllocateSimple() throws Exception {
        InMemoryNodeManager inMemoryNodeManager = new InMemoryNodeManager(new InternalNode[]{NODE_1, NODE_2});
        setupNodeAllocatorService(inMemoryNodeManager);
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)), NODE_1);
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_2);
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)), NODE_1);
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)), NODE_2);
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire2);
            acquire.release();
            assertEventually(() -> {
                assertAcquired(acquire2);
                Assert.assertEquals(acquire2.getNode().get(), NODE_2);
            });
            NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire3);
            inMemoryNodeManager.addNodes(new InternalNode[]{NODE_3});
            this.nodeAllocatorService.processPendingAcquires();
            assertEventually(() -> {
                assertAcquired(acquire3);
                Assert.assertEquals(acquire3.getNode().get(), NODE_3);
            });
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testAllocateDifferentSizes() throws Exception {
        setupNodeAllocatorService(new InMemoryNodeManager(new InternalNode[]{NODE_1, NODE_2}));
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)), NODE_1);
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)), NODE_2);
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_1);
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire2, NODE_2);
            NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire3, NODE_1);
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE)), NODE_2);
            NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire4);
            NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire5);
            acquire.release();
            assertNotAcquired(acquire4);
            assertNotAcquired(acquire5);
            acquire2.release();
            assertAcquired(acquire5);
            acquire3.release();
            assertAcquired(acquire4);
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testAllocateDifferentSizesOpportunisticAcquisition() {
        setupNodeAllocatorService(new InMemoryNodeManager(new InternalNode[]{NODE_1, NODE_2}));
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_1);
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire2, NODE_2);
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE)), NODE_1);
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE)), NODE_2);
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE)), NODE_1);
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE)), NODE_2);
            NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire3);
            NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire4);
            acquire2.release();
            assertAcquired(acquire3);
            acquire.release();
            assertAcquired(acquire4);
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testAllocateReleaseBeforeAcquired() {
        setupNodeAllocatorService(new InMemoryNodeManager(new InternalNode[]{NODE_1}));
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            assertAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)), NODE_1);
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_1);
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire2);
            NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire3);
            acquire2.release();
            assertNotAcquired(acquire3);
            acquire.release();
            assertEventually(() -> {
                assertAcquired(acquire3, NODE_1);
            });
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testNoMatchingNodeAvailable() {
        InMemoryNodeManager inMemoryNodeManager = new InMemoryNodeManager(new InternalNode[0]);
        setupNodeAllocatorService(inMemoryNodeManager);
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire);
            this.ticker.increment(59L, TimeUnit.SECONDS);
            this.nodeAllocatorService.processPendingAcquires();
            assertNotAcquired(acquire);
            this.ticker.increment(2L, TimeUnit.SECONDS);
            this.nodeAllocatorService.processPendingAcquires();
            Assertions.assertThatThrownBy(() -> {
                Futures.getUnchecked(acquire.getNode());
            }).hasMessageContaining("No nodes available to run query");
            inMemoryNodeManager.addNodes(new InternalNode[]{NODE_2});
            assertAcquired(nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64L, DataSize.Unit.GIGABYTE)), NODE_2);
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire2);
            inMemoryNodeManager.removeNode(NODE_2);
            this.nodeAllocatorService.processPendingAcquires();
            this.ticker.increment(61L, TimeUnit.SECONDS);
            this.nodeAllocatorService.processPendingAcquires();
            assertEventually(() -> {
                Assert.assertFalse(acquire2.getNode().isCancelled());
                Assert.assertTrue(acquire2.getNode().isDone());
                Assertions.assertThatThrownBy(() -> {
                    MoreFutures.getFutureValue(acquire2.getNode());
                }).hasMessage("No nodes available to run query");
            });
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testNoMatchingNodeAvailableTimeoutReset() {
        InMemoryNodeManager inMemoryNodeManager = new InMemoryNodeManager(new InternalNode[0]);
        setupNodeAllocatorService(inMemoryNodeManager);
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64L, DataSize.Unit.GIGABYTE));
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire);
            assertNotAcquired(acquire2);
            this.ticker.increment(30L, TimeUnit.SECONDS);
            inMemoryNodeManager.addNodes(new InternalNode[]{NODE_2});
            this.nodeAllocatorService.processPendingAcquires();
            ((AbstractBooleanAssert) Assertions.assertThat(acquire.getNode().isDone() != acquire2.getNode().isDone()).describedAs("exactly one of pending acquires should be completed", new Object[0])).isTrue();
            NodeAllocator.NodeLease nodeLease = acquire.getNode().isDone() ? acquire : acquire2;
            NodeAllocator.NodeLease nodeLease2 = acquire.getNode().isDone() ? acquire2 : acquire;
            inMemoryNodeManager.removeNode(NODE_2);
            Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
            nodeLease.release();
            this.nodeAllocatorService.processPendingAcquires();
            assertNotAcquired(nodeLease2);
            this.ticker.increment(59L, TimeUnit.SECONDS);
            this.nodeAllocatorService.processPendingAcquires();
            assertNotAcquired(nodeLease2);
            this.ticker.increment(2L, TimeUnit.SECONDS);
            this.nodeAllocatorService.processPendingAcquires();
            Assertions.assertThatThrownBy(() -> {
                Futures.getUnchecked(nodeLease2.getNode());
            }).hasMessageContaining("No nodes available to run query");
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testRemoveAcquiredNode() {
        InMemoryNodeManager inMemoryNodeManager = new InMemoryNodeManager(new InternalNode[]{NODE_1});
        setupNodeAllocatorService(inMemoryNodeManager);
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_1);
            inMemoryNodeManager.removeNode(NODE_1);
            acquire.release();
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testAllocateNodeWithAddressRequirements() {
        setupNodeAllocatorService(new InMemoryNodeManager(new InternalNode[]{NODE_1, NODE_2}));
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_2);
            assertAcquired(nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32L, DataSize.Unit.GIGABYTE)), NODE_2);
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire2);
            assertAcquired(nodeAllocator.acquire(REQ_NODE_1, DataSize.of(32L, DataSize.Unit.GIGABYTE)), NODE_1);
            acquire.release();
            assertEventually(() -> {
                assertAcquired(acquire2);
            });
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testAllocateNotEnoughRuntimeMemory() {
        setupNodeAllocatorService(new InMemoryNodeManager(new InternalNode[]{NODE_1, NODE_2}));
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_1);
            acquire.attachTaskId(taskId(1));
            updateWorkerUsedMemory(NODE_1, DataSize.of(33L, DataSize.Unit.GIGABYTE), ImmutableMap.of(taskId(1), DataSize.of(33L, DataSize.Unit.GIGABYTE)));
            this.nodeAllocatorService.refreshNodePoolMemoryInfos();
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire2, NODE_2);
            acquire2.attachTaskId(taskId(2));
            NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire3, NODE_2);
            acquire3.attachTaskId(taskId(3));
            NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire4, NODE_1);
            acquire4.attachTaskId(taskId(4));
            NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire5);
            NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire6);
            updateWorkerUsedMemory(NODE_1, DataSize.of(32L, DataSize.Unit.GIGABYTE), ImmutableMap.of(taskId(1), DataSize.of(32L, DataSize.Unit.GIGABYTE)));
            this.nodeAllocatorService.refreshNodePoolMemoryInfos();
            this.nodeAllocatorService.processPendingAcquires();
            assertAcquired(acquire5, NODE_1);
            acquire5.attachTaskId(taskId(5));
            assertNotAcquired(acquire6);
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testAllocateRuntimeMemoryDiscrepancies() {
        InMemoryNodeManager inMemoryNodeManager = new InMemoryNodeManager(new InternalNode[]{NODE_1});
        setupNodeAllocatorService(inMemoryNodeManager);
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_1);
            acquire.attachTaskId(taskId(1));
            updateWorkerUsedMemory(NODE_1, DataSize.of(33L, DataSize.Unit.GIGABYTE), ImmutableMap.of(taskId(1), DataSize.of(4L, DataSize.Unit.GIGABYTE)));
            this.nodeAllocatorService.refreshNodePoolMemoryInfos();
            assertNotAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)));
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
            setupNodeAllocatorService(inMemoryNodeManager);
            NodeAllocator nodeAllocator2 = this.nodeAllocatorService.getNodeAllocator(SESSION);
            try {
                NodeAllocator.NodeLease acquire2 = nodeAllocator2.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
                assertAcquired(acquire2, NODE_1);
                acquire2.attachTaskId(taskId(1));
                updateWorkerUsedMemory(NODE_1, DataSize.of(4L, DataSize.Unit.GIGABYTE), ImmutableMap.of(taskId(1), DataSize.of(33L, DataSize.Unit.GIGABYTE)));
                this.nodeAllocatorService.refreshNodePoolMemoryInfos();
                assertNotAcquired(nodeAllocator2.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)));
                if (nodeAllocator2 != null) {
                    nodeAllocator2.close();
                }
                setupNodeAllocatorService(inMemoryNodeManager);
                nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
                try {
                    NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
                    assertAcquired(acquire3, NODE_1);
                    acquire3.attachTaskId(taskId(1));
                    updateWorkerUsedMemory(NODE_1, DataSize.of(33L, DataSize.Unit.GIGABYTE), ImmutableMap.of());
                    this.nodeAllocatorService.refreshNodePoolMemoryInfos();
                    assertNotAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)));
                    if (nodeAllocator != null) {
                        nodeAllocator.close();
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th) {
                    th.addSuppressed(th);
                }
            }
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testSpaceReservedOnPrimaryNodeIfNoNodeWithEnoughRuntimeMemoryAvailable() {
        setupNodeAllocatorService(new InMemoryNodeManager(new InternalNode[]{NODE_1, NODE_2}));
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_1);
            acquire.attachTaskId(taskId(1));
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire2, NODE_2);
            acquire2.attachTaskId(taskId(2));
            updateWorkerUsedMemory(NODE_1, DataSize.of(40L, DataSize.Unit.GIGABYTE), ImmutableMap.of(taskId(1), DataSize.of(40L, DataSize.Unit.GIGABYTE)));
            updateWorkerUsedMemory(NODE_2, DataSize.of(41L, DataSize.Unit.GIGABYTE), ImmutableMap.of(taskId(2), DataSize.of(41L, DataSize.Unit.GIGABYTE)));
            this.nodeAllocatorService.refreshNodePoolMemoryInfos();
            assertNotAcquired(nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)));
            NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(20L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire3, NODE_1);
            acquire3.attachTaskId(taskId(2));
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testAllocateWithRuntimeMemoryEstimateOverhead() {
        setupNodeAllocatorService(new InMemoryNodeManager(new InternalNode[]{NODE_1}), DataSize.of(4L, DataSize.Unit.GIGABYTE));
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        try {
            NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertAcquired(acquire, NODE_1);
            acquire.attachTaskId(taskId(1));
            updateWorkerUsedMemory(NODE_1, DataSize.of(30L, DataSize.Unit.GIGABYTE), ImmutableMap.of(taskId(1), DataSize.of(30L, DataSize.Unit.GIGABYTE)));
            this.nodeAllocatorService.refreshNodePoolMemoryInfos();
            NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE));
            assertNotAcquired(acquire2);
            updateWorkerUsedMemory(NODE_1, DataSize.of(28L, DataSize.Unit.GIGABYTE), ImmutableMap.of(taskId(1), DataSize.of(28L, DataSize.Unit.GIGABYTE)));
            this.nodeAllocatorService.refreshNodePoolMemoryInfos();
            this.nodeAllocatorService.processPendingAcquires();
            assertAcquired(acquire2, NODE_1);
            if (nodeAllocator != null) {
                nodeAllocator.close();
            }
        } catch (Throwable th) {
            if (nodeAllocator != null) {
                try {
                    nodeAllocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStressAcquireRelease() {
        setupNodeAllocatorService(new InMemoryNodeManager(new InternalNode[]{NODE_1}), DataSize.of(4L, DataSize.Unit.GIGABYTE));
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION);
        for (int i = 0; i < 10000000; i++) {
            try {
                nodeAllocator.acquire(REQ_NONE, DataSize.of(32L, DataSize.Unit.GIGABYTE)).release();
            } catch (Throwable th) {
                if (nodeAllocator != null) {
                    try {
                        nodeAllocator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (nodeAllocator != null) {
            nodeAllocator.close();
        }
    }

    private TaskId taskId(int i) {
        return new TaskId(new StageId("test_query", 0), i, 0);
    }

    private void assertAcquired(NodeAllocator.NodeLease nodeLease, InternalNode internalNode) {
        assertAcquired(nodeLease, Optional.of(internalNode));
    }

    private void assertAcquired(NodeAllocator.NodeLease nodeLease) {
        assertAcquired(nodeLease, Optional.empty());
    }

    private void assertAcquired(NodeAllocator.NodeLease nodeLease, Optional<InternalNode> optional) {
        assertEventually(() -> {
            Assert.assertFalse(nodeLease.getNode().isCancelled(), "node lease cancelled");
            Assert.assertTrue(nodeLease.getNode().isDone(), "node lease not acquired");
            if (optional.isPresent()) {
                Assert.assertEquals(nodeLease.getNode().get(), optional.get());
            }
        });
    }

    private void assertNotAcquired(NodeAllocator.NodeLease nodeLease) {
        Assert.assertFalse(nodeLease.getNode().isCancelled(), "node lease cancelled");
        Assert.assertFalse(nodeLease.getNode().isDone(), "node lease acquired");
        this.nodeAllocatorService.processPendingAcquires();
        Assert.assertFalse(nodeLease.getNode().isCancelled(), "node lease cancelled");
        Assert.assertFalse(nodeLease.getNode().isDone(), "node lease acquired");
    }

    private static void assertEventually(ThrowingRunnable throwingRunnable) {
        io.trino.testing.assertions.Assert.assertEventually(new io.airlift.units.Duration(2500.0d, TimeUnit.MILLISECONDS), new io.airlift.units.Duration(10.0d, TimeUnit.MILLISECONDS), () -> {
            try {
                throwingRunnable.run();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}
