package io.trino.server.remotetask;

import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.jaxrs.JsonMapper;
import io.airlift.jaxrs.testing.JaxrsTestingHttpProcessor;
import io.airlift.json.JsonBinder;
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecBinder;
import io.airlift.json.JsonModule;
import io.airlift.tracing.SpanSerialization;
import io.airlift.tracing.Tracing;
import io.airlift.units.Duration;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.block.BlockJsonSerde;
import io.trino.client.NodeVersion;
import io.trino.execution.BaseTestSqlTaskManager;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.DynamicFiltersCollector;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.RemoteTask;
import io.trino.execution.ScheduledSplit;
import io.trino.execution.SplitAssignment;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskManagerConfig;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.TaskTestUtils;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.metadata.BlockEncodingManager;
import io.trino.metadata.HandleJsonModule;
import io.trino.metadata.InternalBlockEncodingSerde;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Metadata;
import io.trino.metadata.MetadataManager;
import io.trino.metadata.Split;
import io.trino.operator.BenchmarkWindowOperator;
import io.trino.server.DynamicFilterService;
import io.trino.server.FailTaskRequest;
import io.trino.server.HttpRemoteTaskFactory;
import io.trino.server.TaskUpdateRequest;
import io.trino.spi.ErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.TestingColumnHandle;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.TypeSignature;
import io.trino.sql.DynamicFilters;
import io.trino.sql.ir.Reference;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SymbolAllocator;
import io.trino.sql.planner.SymbolKeyDeserializer;
import io.trino.sql.planner.TestingPlannerContext;
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingSplit;
import io.trino.testing.assertions.Assert;
import io.trino.type.InternalTypeManager;
import io.trino.type.TypeDeserializer;
import io.trino.type.TypeSignatureDeserializer;
import io.trino.type.TypeSignatureKeyDeserializer;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.UriInfo;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:io/trino/server/remotetask/TestHttpRemoteTask.class */
public class TestHttpRemoteTask {
    private static final Duration POLL_TIMEOUT = new Duration(100.0d, TimeUnit.MILLISECONDS);
    private static final Duration IDLE_TIMEOUT = new Duration(3.0d, TimeUnit.SECONDS);
    private static final Duration FAIL_TIMEOUT = new Duration(20.0d, TimeUnit.SECONDS);
    private static final TaskManagerConfig TASK_MANAGER_CONFIG = new TaskManagerConfig().setStatusRefreshMaxWait(new Duration(IDLE_TIMEOUT.roundTo(TimeUnit.MILLISECONDS) / 100.0d, TimeUnit.MILLISECONDS)).setInfoUpdateInterval(new Duration(IDLE_TIMEOUT.roundTo(TimeUnit.MILLISECONDS) / 10.0d, TimeUnit.MILLISECONDS));
    private static final boolean TRACE_HTTP = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/server/remotetask/TestHttpRemoteTask$FailureScenario.class */
    public enum FailureScenario {
        NO_FAILURE,
        TASK_MISMATCH,
        TASK_MISMATCH_WHEN_VERSION_IS_HIGH,
        REJECTED_EXECUTION
    }

    @Path("/task/{nodeId}")
    /* loaded from: input_file:io/trino/server/remotetask/TestHttpRemoteTask$TestingTaskResource.class */
    public static class TestingTaskResource {
        private static final String INITIAL_TASK_INSTANCE_ID = "task-instance-id";
        private static final String NEW_TASK_INSTANCE_ID = "task-instance-id-x";
        private final AtomicLong lastActivityNanos;
        private final FailureScenario failureScenario;
        private TaskInfo initialTaskInfo;
        private TaskStatus initialTaskStatus;
        private long version;
        private TaskState taskState;
        private long statusFetchCounter;
        private long createOrUpdateCounter;
        private long dynamicFiltersFetchCounter;
        private long dynamicFiltersSentCounter;
        private final AtomicReference<TestingHttpClient> httpClient = new AtomicReference<>();
        private Optional<DynamicFiltersCollector.VersionedDynamicFilterDomains> dynamicFilterDomains = Optional.empty();
        private Optional<Exception> dynamicFilterFailure = Optional.empty();
        private OptionalInt dynamicFilterFailureCount = OptionalInt.empty();
        private String taskInstanceId = INITIAL_TASK_INSTANCE_ID;
        private Map<DynamicFilterId, Domain> latestDynamicFilterFromCoordinator = ImmutableMap.of();
        private final List<DynamicFiltersFetchRequest> dynamicFiltersFetchRequests = new ArrayList();
        Map<PlanNodeId, SplitAssignment> taskSplitAssignmentMap = new HashMap();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/trino/server/remotetask/TestHttpRemoteTask$TestingTaskResource$DynamicFiltersFetchRequest.class */
        public static class DynamicFiltersFetchRequest {
            private final String uriInfo;
            private final TaskId taskId;
            private final Long currentDynamicFiltersVersion;
            private final long storedDynamicFiltersVersion;

            private DynamicFiltersFetchRequest(String str, TaskId taskId, Long l, long j) {
                this.uriInfo = (String) Objects.requireNonNull(str, "uriInfo is null");
                this.taskId = (TaskId) Objects.requireNonNull(taskId, "taskId is null");
                this.currentDynamicFiltersVersion = (Long) Objects.requireNonNull(l, "currentDynamicFiltersVersion is null");
                this.storedDynamicFiltersVersion = j;
            }

            public String toString() {
                return MoreObjects.toStringHelper(this).add("uriInfo", this.uriInfo).add("taskId", this.taskId).add("currentDynamicFiltersVersion", this.currentDynamicFiltersVersion).add("storedDynamicFiltersVersion", this.storedDynamicFiltersVersion).toString();
            }
        }

        public TestingTaskResource(AtomicLong atomicLong, FailureScenario failureScenario) {
            this.lastActivityNanos = (AtomicLong) Objects.requireNonNull(atomicLong, "lastActivityNanos is null");
            this.failureScenario = (FailureScenario) Objects.requireNonNull(failureScenario, "failureScenario is null");
        }

        public void setHttpClient(TestingHttpClient testingHttpClient) {
            this.httpClient.set(testingHttpClient);
        }

        @Produces({"application/json"})
        @GET
        @Path("{taskId}")
        public synchronized TaskInfo getTaskInfo(@PathParam("taskId") TaskId taskId, @HeaderParam("X-Trino-Current-Version") Long l, @HeaderParam("X-Trino-Max-Wait") Duration duration, @Context UriInfo uriInfo) {
            this.lastActivityNanos.set(System.nanoTime());
            return buildTaskInfo();
        }

        @Produces({"application/json"})
        @POST
        @Path("{taskId}")
        @Consumes({"application/json"})
        public synchronized TaskInfo createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo) {
            for (SplitAssignment splitAssignment : taskUpdateRequest.splitAssignments()) {
                this.taskSplitAssignmentMap.compute(splitAssignment.getPlanNodeId(), (planNodeId, splitAssignment2) -> {
                    return splitAssignment2 == null ? splitAssignment : splitAssignment2.update(splitAssignment);
                });
            }
            if (!taskUpdateRequest.dynamicFilterDomains().isEmpty()) {
                this.dynamicFiltersSentCounter++;
                this.latestDynamicFilterFromCoordinator = taskUpdateRequest.dynamicFilterDomains();
            }
            this.createOrUpdateCounter++;
            this.lastActivityNanos.set(System.nanoTime());
            return buildTaskInfo();
        }

        public synchronized SplitAssignment getTaskSplitAssignment(PlanNodeId planNodeId) {
            SplitAssignment splitAssignment = this.taskSplitAssignmentMap.get(planNodeId);
            if (splitAssignment == null) {
                return null;
            }
            return new SplitAssignment(splitAssignment.getPlanNodeId(), splitAssignment.getSplits(), splitAssignment.isNoMoreSplits());
        }

        @Produces({"application/json"})
        @GET
        @Path("{taskId}/status")
        public synchronized TaskStatus getTaskStatus(@PathParam("taskId") TaskId taskId, @HeaderParam("X-Trino-Current-Version") Long l, @HeaderParam("X-Trino-Max-Wait") Duration duration, @Context UriInfo uriInfo) throws InterruptedException {
            this.lastActivityNanos.set(System.nanoTime());
            wait(duration.roundTo(TimeUnit.MILLISECONDS));
            return buildTaskStatus();
        }

        @Produces({"application/json"})
        @GET
        @Path("{taskId}/dynamicfilters")
        public synchronized DynamicFiltersCollector.VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(@PathParam("taskId") TaskId taskId, @HeaderParam("X-Trino-Current-Version") Long l, @Context UriInfo uriInfo) throws Exception {
            this.dynamicFiltersFetchCounter++;
            this.dynamicFiltersFetchRequests.add(new DynamicFiltersFetchRequest(uriInfo.getRequestUri().toString(), taskId, l, ((Long) this.dynamicFilterDomains.map((v0) -> {
                return v0.getVersion();
            }).orElse(-1L)).longValue()));
            if (this.dynamicFilterFailureCount.orElse(TestHttpRemoteTask.TRACE_HTTP) <= 0) {
                return this.dynamicFilterDomains.orElse(null);
            }
            this.dynamicFilterFailureCount = OptionalInt.of(this.dynamicFilterFailureCount.getAsInt() - 1);
            throw this.dynamicFilterFailure.orElseThrow();
        }

        @Produces({"application/json"})
        @DELETE
        @Path("{taskId}")
        public synchronized TaskInfo deleteTask(@PathParam("taskId") TaskId taskId, @QueryParam("abort") @DefaultValue("true") boolean z, @Context UriInfo uriInfo) {
            this.lastActivityNanos.set(System.nanoTime());
            this.taskState = z ? TaskState.ABORTED : TaskState.CANCELED;
            return buildTaskInfo();
        }

        public void setInitialTaskInfo(TaskInfo taskInfo) {
            this.initialTaskInfo = taskInfo;
            this.initialTaskStatus = taskInfo.taskStatus();
            this.taskState = this.initialTaskStatus.getState();
            this.version = this.initialTaskStatus.getVersion();
            switch (this.failureScenario.ordinal()) {
                case TestHttpRemoteTask.TRACE_HTTP /* 0 */:
                case 1:
                case 3:
                    return;
                case BenchmarkWindowOperator.Context.NUMBER_OF_GROUP_COLUMNS /* 2 */:
                    this.version = 1000000L;
                    return;
                default:
                    throw new UnsupportedOperationException();
            }
        }

        public synchronized void setDynamicFilterDomains(DynamicFiltersCollector.VersionedDynamicFilterDomains versionedDynamicFilterDomains) {
            this.dynamicFilterDomains = Optional.of(versionedDynamicFilterDomains);
        }

        public synchronized void setDynamicFilterFailure(Exception exc, int i) {
            this.dynamicFilterFailure = Optional.of(exc);
            this.dynamicFilterFailureCount = OptionalInt.of(i);
        }

        public Map<DynamicFilterId, Domain> getLatestDynamicFilterFromCoordinator() {
            return this.latestDynamicFilterFromCoordinator;
        }

        public synchronized long getStatusFetchCounter() {
            return this.statusFetchCounter;
        }

        public synchronized long getCreateOrUpdateCounter() {
            return this.createOrUpdateCounter;
        }

        public synchronized long getDynamicFiltersFetchCounter() {
            return this.dynamicFiltersFetchCounter;
        }

        public synchronized long getDynamicFiltersSentCounter() {
            return this.dynamicFiltersSentCounter;
        }

        public synchronized List<DynamicFiltersFetchRequest> getDynamicFiltersFetchRequests() {
            return ImmutableList.copyOf(this.dynamicFiltersFetchRequests);
        }

        private TaskInfo buildTaskInfo() {
            return new TaskInfo(buildTaskStatus(), this.initialTaskInfo.lastHeartbeat(), this.initialTaskInfo.outputBuffers(), this.initialTaskInfo.noMoreSplits(), this.initialTaskInfo.stats(), this.initialTaskInfo.estimatedMemory(), this.initialTaskInfo.needsPlan());
        }

        private TaskStatus buildTaskStatus() {
            this.statusFetchCounter++;
            switch (this.failureScenario.ordinal()) {
                case TestHttpRemoteTask.TRACE_HTTP /* 0 */:
                    break;
                case 1:
                case BenchmarkWindowOperator.Context.NUMBER_OF_GROUP_COLUMNS /* 2 */:
                    if (this.statusFetchCounter == 10) {
                        this.taskInstanceId = NEW_TASK_INSTANCE_ID;
                        this.version = 0L;
                        break;
                    }
                    break;
                case 3:
                    if (this.statusFetchCounter >= 10) {
                        this.httpClient.get().close();
                        throw new RejectedExecutionException();
                    }
                    break;
                default:
                    throw new UnsupportedOperationException();
            }
            TaskId taskId = this.initialTaskStatus.getTaskId();
            String str = this.taskInstanceId;
            long j = this.version + 1;
            this.version = j;
            return new TaskStatus(taskId, str, j, this.taskState, this.initialTaskStatus.getSelf(), "fake", false, this.initialTaskStatus.getFailures(), this.initialTaskStatus.getQueuedPartitionedDrivers(), this.initialTaskStatus.getRunningPartitionedDrivers(), this.initialTaskStatus.getOutputBufferStatus(), this.initialTaskStatus.getOutputDataSize(), this.initialTaskStatus.getWriterInputDataSize(), this.initialTaskStatus.getPhysicalWrittenDataSize(), this.initialTaskStatus.getMaxWriterCount(), this.initialTaskStatus.getMemoryReservation(), this.initialTaskStatus.getPeakMemoryReservation(), this.initialTaskStatus.getRevocableMemoryReservation(), this.initialTaskStatus.getFullGcCount(), this.initialTaskStatus.getFullGcTime(), ((Long) this.dynamicFilterDomains.map((v0) -> {
                return v0.getVersion();
            }).orElse(0L)).longValue(), this.initialTaskStatus.getQueuedPartitionedSplitsWeight(), this.initialTaskStatus.getRunningPartitionedSplitsWeight());
        }
    }

    @Timeout(30)
    @Test
    public void testRemoteTaskMismatch() throws Exception {
        runTest(FailureScenario.TASK_MISMATCH);
    }

    @Timeout(30)
    @Test
    public void testRejectedExecutionWhenVersionIsHigh() throws Exception {
        runTest(FailureScenario.TASK_MISMATCH_WHEN_VERSION_IS_HIGH);
    }

    @Timeout(30)
    @Test
    public void testRejectedExecution() throws Exception {
        runTest(FailureScenario.REJECTED_EXECUTION);
    }

    @Timeout(30)
    @Test
    public void testRegular() throws Exception {
        TestingTaskResource testingTaskResource = new TestingTaskResource(new AtomicLong(System.nanoTime()), FailureScenario.NO_FAILURE);
        HttpRemoteTaskFactory createHttpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource);
        HttpRemoteTask createRemoteTask = createRemoteTask(createHttpRemoteTaskFactory, ImmutableSet.of());
        testingTaskResource.setInitialTaskInfo(createRemoteTask.getTaskInfo());
        createRemoteTask.start();
        createRemoteTask.addSplits(ImmutableMultimap.of(TaskTestUtils.TABLE_SCAN_NODE_ID, new Split(TestingHandles.TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())));
        poll(() -> {
            return testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID) != null;
        });
        poll(() -> {
            return testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).getSplits().size() == 1;
        });
        createRemoteTask.noMoreSplits(TaskTestUtils.TABLE_SCAN_NODE_ID);
        poll(() -> {
            return testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).isNoMoreSplits();
        });
        createRemoteTask.cancel();
        poll(() -> {
            return createRemoteTask.getTaskStatus().getState().isDone();
        });
        poll(() -> {
            return createRemoteTask.getTaskInfo().taskStatus().getState().isDone();
        });
        createHttpRemoteTaskFactory.stop();
    }

    @Timeout(30)
    @Test
    public void testDynamicFilterFetcherFailure() throws Exception {
        Symbol newSymbol = new SymbolAllocator().newSymbol("DF_SYMBOL1", BigintType.BIGINT);
        Reference symbolReference = newSymbol.toSymbolReference();
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        ImmutableMap of = ImmutableMap.of(dynamicFilterId, Domain.singleValue(BigintType.BIGINT, 1L));
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("column1");
        QueryId queryId = new QueryId("test");
        TestingTaskResource testingTaskResource = new TestingTaskResource(new AtomicLong(System.nanoTime()), FailureScenario.NO_FAILURE);
        DynamicFilterService dynamicFilterService = new DynamicFilterService(TestingPlannerContext.PLANNER_CONTEXT.getMetadata(), TestingPlannerContext.PLANNER_CONTEXT.getFunctionManager(), new TypeOperators(), new DynamicFilterConfig());
        HttpRemoteTaskFactory createHttpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource, dynamicFilterService, new QueryManagerConfig().setRemoteTaskMaxErrorDuration(new Duration(2.0d, TimeUnit.SECONDS)));
        HttpRemoteTask createRemoteTask = createRemoteTask(createHttpRemoteTaskFactory, ImmutableSet.of());
        testingTaskResource.setInitialTaskInfo(createRemoteTask.getTaskInfo());
        dynamicFilterService.registerQuery(queryId, SessionTestUtils.TEST_SESSION, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(new StageId(queryId, 1), TRACE_HTTP, 1);
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference)), ImmutableMap.of(newSymbol, testingColumnHandle));
        createRemoteTask.start();
        testingTaskResource.setDynamicFilterFailure(new RuntimeException("DF fetch failed"), 1);
        testingTaskResource.setDynamicFilterDomains(new DynamicFiltersCollector.VersionedDynamicFilterDomains(1L, of));
        createDynamicFilter.isBlocked().get();
        ((AbstractLongAssert) Assertions.assertThat(testingTaskResource.getDynamicFiltersFetchCounter()).describedAs(testingTaskResource.getDynamicFiltersFetchRequests().toString(), new Object[TRACE_HTTP])).isGreaterThanOrEqualTo(2L);
        Assertions.assertThat(createRemoteTask.getDynamicFiltersFetcher().isRunning()).isTrue();
        testingTaskResource.setDynamicFilterFailure(new RuntimeException("DF fetch failed"), Integer.MAX_VALUE);
        testingTaskResource.setDynamicFilterDomains(new DynamicFiltersCollector.VersionedDynamicFilterDomains(2L, of));
        Assert.assertEventually(new Duration(30.0d, TimeUnit.SECONDS), () -> {
            Assertions.assertThat(createRemoteTask.getTaskStatus().getState()).isEqualTo(TaskState.FAILED);
        });
        Assertions.assertThat(createRemoteTask.getDynamicFiltersFetcher().isRunning()).isFalse();
        createHttpRemoteTaskFactory.stop();
    }

    @Timeout(30)
    @Test
    public void testDynamicFilterFetcherVersionMismatch() throws Exception {
        Symbol newSymbol = new SymbolAllocator().newSymbol("DF_SYMBOL1", BigintType.BIGINT);
        Reference symbolReference = newSymbol.toSymbolReference();
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        ImmutableMap of = ImmutableMap.of(dynamicFilterId, Domain.singleValue(BigintType.BIGINT, 1L));
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("column1");
        QueryId queryId = new QueryId("test");
        TestingTaskResource testingTaskResource = new TestingTaskResource(new AtomicLong(System.nanoTime()), FailureScenario.NO_FAILURE);
        DynamicFilterService dynamicFilterService = new DynamicFilterService(TestingPlannerContext.PLANNER_CONTEXT.getMetadata(), TestingPlannerContext.PLANNER_CONTEXT.getFunctionManager(), new TypeOperators(), new DynamicFilterConfig());
        HttpRemoteTaskFactory createHttpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource, dynamicFilterService);
        HttpRemoteTask createRemoteTask = createRemoteTask(createHttpRemoteTaskFactory, ImmutableSet.of());
        testingTaskResource.setInitialTaskInfo(createRemoteTask.getTaskInfo());
        dynamicFilterService.registerQuery(queryId, SessionTestUtils.TEST_SESSION, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(new StageId(queryId, 1), TRACE_HTTP, 1);
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference)), ImmutableMap.of(newSymbol, testingColumnHandle));
        createRemoteTask.start();
        testingTaskResource.setDynamicFilterDomains(new DynamicFiltersCollector.VersionedDynamicFilterDomains(1L, of));
        createDynamicFilter.isBlocked().get();
        Assertions.assertThat(createRemoteTask.getDynamicFiltersFetcher().isRunning()).isTrue();
        createRemoteTask.getDynamicFiltersFetcher().updateDynamicFiltersVersionAndFetchIfNecessary(10L);
        Assert.assertEventually(new Duration(30.0d, TimeUnit.SECONDS), () -> {
            Assertions.assertThat(createRemoteTask.getTaskStatus().getState()).isEqualTo(TaskState.FAILED);
        });
        Assertions.assertThat(createRemoteTask.getDynamicFiltersFetcher().isRunning()).isFalse();
        createHttpRemoteTaskFactory.stop();
    }

    @Timeout(30)
    @Test
    public void testDynamicFilters() throws Exception {
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        DynamicFilterId dynamicFilterId2 = new DynamicFilterId("df2");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("DF_SYMBOL1", BigintType.BIGINT);
        Symbol newSymbol2 = symbolAllocator.newSymbol("DF_SYMBOL2", BigintType.BIGINT);
        Reference symbolReference = newSymbol.toSymbolReference();
        Reference symbolReference2 = newSymbol2.toSymbolReference();
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("column1");
        TestingColumnHandle testingColumnHandle2 = new TestingColumnHandle("column2");
        QueryId queryId = new QueryId("test");
        TestingTaskResource testingTaskResource = new TestingTaskResource(new AtomicLong(System.nanoTime()), FailureScenario.NO_FAILURE);
        DynamicFilterService dynamicFilterService = new DynamicFilterService(TestingPlannerContext.PLANNER_CONTEXT.getMetadata(), TestingPlannerContext.PLANNER_CONTEXT.getFunctionManager(), new TypeOperators(), new DynamicFilterConfig());
        HttpRemoteTaskFactory createHttpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource, dynamicFilterService);
        HttpRemoteTask createRemoteTask = createRemoteTask(createHttpRemoteTaskFactory, ImmutableSet.of());
        ImmutableMap of = ImmutableMap.of(dynamicFilterId, Domain.singleValue(BigintType.BIGINT, 1L));
        testingTaskResource.setInitialTaskInfo(createRemoteTask.getTaskInfo());
        testingTaskResource.setDynamicFilterDomains(new DynamicFiltersCollector.VersionedDynamicFilterDomains(1L, of));
        dynamicFilterService.registerQuery(queryId, SessionTestUtils.TEST_SESSION, ImmutableSet.of(dynamicFilterId, dynamicFilterId2), ImmutableSet.of(dynamicFilterId, dynamicFilterId2), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(new StageId(queryId, 1), TRACE_HTTP, 1);
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference), new DynamicFilters.Descriptor(dynamicFilterId2, symbolReference2)), ImmutableMap.of(newSymbol, testingColumnHandle, newSymbol2, testingColumnHandle2));
        CompletableFuture isBlocked = createDynamicFilter.isBlocked();
        createRemoteTask.start();
        isBlocked.get();
        Assertions.assertThat(createDynamicFilter.getCurrentPredicate()).isEqualTo(TupleDomain.withColumnDomains(ImmutableMap.of(testingColumnHandle, Domain.singleValue(BigintType.BIGINT, 1L))));
        Assertions.assertThat(testingTaskResource.getDynamicFiltersFetchCounter()).isEqualTo(1L);
        Assert.assertEventually(new Duration(15.0d, TimeUnit.SECONDS), () -> {
            io.airlift.testing.Assertions.assertGreaterThanOrEqual(Long.valueOf(testingTaskResource.getStatusFetchCounter()), 3L);
        });
        ((AbstractLongAssert) Assertions.assertThat(testingTaskResource.getDynamicFiltersFetchCounter()).describedAs(testingTaskResource.getDynamicFiltersFetchRequests().toString(), new Object[TRACE_HTTP])).isEqualTo(1L);
        CompletableFuture isBlocked2 = createDynamicFilter.isBlocked();
        testingTaskResource.setDynamicFilterDomains(new DynamicFiltersCollector.VersionedDynamicFilterDomains(2L, ImmutableMap.of(dynamicFilterId2, Domain.singleValue(BigintType.BIGINT, 2L))));
        isBlocked2.get();
        Assertions.assertThat(createDynamicFilter.getCurrentPredicate()).isEqualTo(TupleDomain.withColumnDomains(ImmutableMap.of(testingColumnHandle, Domain.singleValue(BigintType.BIGINT, 1L), testingColumnHandle2, Domain.singleValue(BigintType.BIGINT, 2L))));
        ((AbstractLongAssert) Assertions.assertThat(testingTaskResource.getDynamicFiltersFetchCounter()).describedAs(testingTaskResource.getDynamicFiltersFetchRequests().toString(), new Object[TRACE_HTTP])).isEqualTo(2L);
        io.airlift.testing.Assertions.assertGreaterThanOrEqual(Long.valueOf(testingTaskResource.getStatusFetchCounter()), 4L);
        createHttpRemoteTaskFactory.stop();
    }

    @Timeout(30)
    @Test
    public void testOutboundDynamicFilters() throws Exception {
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        DynamicFilterId dynamicFilterId2 = new DynamicFilterId("df2");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("DF_SYMBOL1", BigintType.BIGINT);
        Symbol newSymbol2 = symbolAllocator.newSymbol("DF_SYMBOL2", BigintType.BIGINT);
        Reference symbolReference = newSymbol.toSymbolReference();
        Reference symbolReference2 = newSymbol2.toSymbolReference();
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("column1");
        TestingColumnHandle testingColumnHandle2 = new TestingColumnHandle("column2");
        QueryId queryId = new QueryId("test");
        TestingTaskResource testingTaskResource = new TestingTaskResource(new AtomicLong(System.nanoTime()), FailureScenario.NO_FAILURE);
        DynamicFilterService dynamicFilterService = new DynamicFilterService(TestingPlannerContext.PLANNER_CONTEXT.getMetadata(), TestingPlannerContext.PLANNER_CONTEXT.getFunctionManager(), new TypeOperators(), new DynamicFilterConfig());
        dynamicFilterService.registerQuery(queryId, SessionTestUtils.TEST_SESSION, ImmutableSet.of(dynamicFilterId, dynamicFilterId2), ImmutableSet.of(dynamicFilterId, dynamicFilterId2), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(new StageId(queryId, 1), TRACE_HTTP, 1);
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference), new DynamicFilters.Descriptor(dynamicFilterId2, symbolReference2)), ImmutableMap.of(newSymbol, testingColumnHandle, newSymbol2, testingColumnHandle2));
        CompletableFuture isBlocked = createDynamicFilter.isBlocked();
        dynamicFilterService.addTaskDynamicFilters(new TaskId(new StageId(queryId.getId(), 1), 1, TRACE_HTTP), ImmutableMap.of(dynamicFilterId, Domain.singleValue(BigintType.BIGINT, 1L)));
        isBlocked.get();
        Assertions.assertThat(createDynamicFilter.getCurrentPredicate()).isEqualTo(TupleDomain.withColumnDomains(ImmutableMap.of(testingColumnHandle, Domain.singleValue(BigintType.BIGINT, 1L))));
        HttpRemoteTaskFactory createHttpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource, dynamicFilterService);
        HttpRemoteTask createRemoteTask = createRemoteTask(createHttpRemoteTaskFactory, ImmutableSet.of(dynamicFilterId, dynamicFilterId2));
        testingTaskResource.setInitialTaskInfo(createRemoteTask.getTaskInfo());
        createRemoteTask.start();
        Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
            Assertions.assertThat(testingTaskResource.getDynamicFiltersSentCounter()).isEqualTo(1L);
        });
        Assertions.assertThat(testingTaskResource.getCreateOrUpdateCounter()).isEqualTo(1L);
        addSplit(createRemoteTask, testingTaskResource, 1);
        addSplit(createRemoteTask, testingTaskResource, 2);
        Assertions.assertThat(testingTaskResource.getDynamicFiltersSentCounter()).isEqualTo(1L);
        Assertions.assertThat(testingTaskResource.getCreateOrUpdateCounter()).isEqualTo(3L);
        Assertions.assertThat(testingTaskResource.getLatestDynamicFilterFromCoordinator()).isEqualTo(ImmutableMap.of(dynamicFilterId, Domain.singleValue(BigintType.BIGINT, 1L)));
        CompletableFuture isBlocked2 = createDynamicFilter.isBlocked();
        dynamicFilterService.addTaskDynamicFilters(new TaskId(new StageId(queryId.getId(), 1), 1, TRACE_HTTP), ImmutableMap.of(dynamicFilterId2, Domain.singleValue(BigintType.BIGINT, 2L)));
        isBlocked2.get();
        Assertions.assertThat(createDynamicFilter.getCurrentPredicate()).isEqualTo(TupleDomain.withColumnDomains(ImmutableMap.of(testingColumnHandle, Domain.singleValue(BigintType.BIGINT, 1L), testingColumnHandle2, Domain.singleValue(BigintType.BIGINT, 2L))));
        Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
            Assertions.assertThat(testingTaskResource.getDynamicFiltersSentCounter()).isEqualTo(2L);
        });
        Assertions.assertThat(testingTaskResource.getCreateOrUpdateCounter()).isEqualTo(4L);
        Assertions.assertThat(testingTaskResource.getLatestDynamicFilterFromCoordinator()).isEqualTo(ImmutableMap.of(dynamicFilterId2, Domain.singleValue(BigintType.BIGINT, 2L)));
        createHttpRemoteTaskFactory.stop();
    }

    @Timeout(300)
    @Test
    public void testAdaptiveRemoteTaskRequestSize() throws Exception {
        TestingTaskResource testingTaskResource = new TestingTaskResource(new AtomicLong(System.nanoTime()), FailureScenario.NO_FAILURE);
        Session build = TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("tiny").setSystemProperty("remote_task_adaptive_update_request_size_enabled", "true").setSystemProperty("remote_task_max_request_size", "10kB").setSystemProperty("remote_task_request_size_headroom", "1kB").setSystemProperty("remote_task_guaranteed_splits_per_request", "1").build();
        HttpRemoteTaskFactory createHttpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource);
        HttpRemoteTask createRemoteTask = createRemoteTask(createHttpRemoteTaskFactory, ImmutableSet.of(), build);
        testingTaskResource.setInitialTaskInfo(createRemoteTask.getTaskInfo());
        createRemoteTask.start();
        HashMultimap create = HashMultimap.create();
        for (int i = TRACE_HTTP; i < 100; i++) {
            create.put(TaskTestUtils.TABLE_SCAN_NODE_ID, new Split(TestingHandles.TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit()));
        }
        createRemoteTask.addSplits(create);
        poll(() -> {
            return testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID) != null;
        });
        poll(() -> {
            return testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).getSplits().size() == 100;
        });
        Assertions.assertThat(testingTaskResource.getCreateOrUpdateCounter() > 1).isTrue();
        createRemoteTask.noMoreSplits(TaskTestUtils.TABLE_SCAN_NODE_ID);
        poll(() -> {
            return testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).isNoMoreSplits();
        });
        createRemoteTask.cancel();
        poll(() -> {
            return createRemoteTask.getTaskStatus().getState().isDone();
        });
        poll(() -> {
            return createRemoteTask.getTaskInfo().taskStatus().getState().isDone();
        });
        createHttpRemoteTaskFactory.stop();
    }

    @Test
    public void testAdjustSplitBatchSize() {
        TestingTaskResource testingTaskResource = new TestingTaskResource(new AtomicLong(System.nanoTime()), FailureScenario.NO_FAILURE);
        HttpRemoteTask createRemoteTask = createRemoteTask(createHttpRemoteTaskFactory(testingTaskResource), ImmutableSet.of(), TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("tiny").setSystemProperty("remote_task_adaptive_update_request_size_enabled", "true").setSystemProperty("remote_task_max_request_size", "100kB").setSystemProperty("remote_task_request_size_headroom", "10kB").setSystemProperty("remote_task_guaranteed_splits_per_request", "1").build());
        testingTaskResource.setInitialTaskInfo(createRemoteTask.getTaskInfo());
        HashSet hashSet = new HashSet();
        for (int i = TRACE_HTTP; i < 1000; i++) {
            hashSet.add(new ScheduledSplit(i, TaskTestUtils.TABLE_SCAN_NODE_ID, new Split(TestingHandles.TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())));
        }
        Assertions.assertThat(createRemoteTask.adjustSplitBatchSize(ImmutableList.of(new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, hashSet, true)), 1000000L, 500)).isTrue();
        io.airlift.testing.Assertions.assertLessThan(Integer.valueOf(createRemoteTask.splitBatchSize.get()), 250);
        Assertions.assertThat(createRemoteTask.adjustSplitBatchSize(ImmutableList.of(new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, hashSet, true)), 1000L, 100)).isFalse();
        io.airlift.testing.Assertions.assertGreaterThan(Integer.valueOf(createRemoteTask.splitBatchSize.get()), 250);
    }

    private void runTest(FailureScenario failureScenario) throws Exception {
        AtomicLong atomicLong = new AtomicLong(System.nanoTime());
        TestingTaskResource testingTaskResource = new TestingTaskResource(atomicLong, failureScenario);
        HttpRemoteTaskFactory createHttpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource);
        HttpRemoteTask createRemoteTask = createRemoteTask(createHttpRemoteTaskFactory, ImmutableSet.of());
        testingTaskResource.setInitialTaskInfo(createRemoteTask.getTaskInfo());
        createRemoteTask.start();
        waitUntilIdle(atomicLong);
        createHttpRemoteTaskFactory.stop();
        ((AbstractBooleanAssert) Assertions.assertThat(createRemoteTask.getTaskStatus().getState().isDone()).describedAs(String.format("TaskStatus is not in a done state: %s", createRemoteTask.getTaskStatus()), new Object[TRACE_HTTP])).isTrue();
        ErrorCode errorCode = ((ExecutionFailureInfo) Iterables.getOnlyElement(createRemoteTask.getTaskStatus().getFailures())).getErrorCode();
        switch (failureScenario.ordinal()) {
            case 1:
            case BenchmarkWindowOperator.Context.NUMBER_OF_GROUP_COLUMNS /* 2 */:
                ((AbstractBooleanAssert) Assertions.assertThat(createRemoteTask.getTaskInfo().taskStatus().getState().isDone()).describedAs(String.format("TaskInfo is not in a done state: %s", createRemoteTask.getTaskInfo()), new Object[TRACE_HTTP])).isTrue();
                Assertions.assertThat(errorCode).isEqualTo(StandardErrorCode.REMOTE_TASK_MISMATCH.toErrorCode());
                return;
            case 3:
                Assertions.assertThat(errorCode).isEqualTo(StandardErrorCode.REMOTE_TASK_ERROR.toErrorCode());
                return;
            default:
                throw new UnsupportedOperationException();
        }
    }

    private void addSplit(RemoteTask remoteTask, TestingTaskResource testingTaskResource, int i) throws InterruptedException {
        remoteTask.addSplits(ImmutableMultimap.of(TaskTestUtils.TABLE_SCAN_NODE_ID, new Split(TestingHandles.TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())));
        poll(() -> {
            return testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID) != null;
        });
        poll(() -> {
            return testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).getSplits().size() == i;
        });
    }

    private HttpRemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory, Set<DynamicFilterId> set) {
        return createRemoteTask(httpRemoteTaskFactory, set, SessionTestUtils.TEST_SESSION);
    }

    private HttpRemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory, Set<DynamicFilterId> set, Session session) {
        return httpRemoteTaskFactory.createRemoteTask(session, Span.getInvalid(), new TaskId(new StageId("test", 1), 2, TRACE_HTTP), new InternalNode("node-id", URI.create("http://fake.invalid/"), new NodeVersion("version"), false), false, TaskTestUtils.PLAN_FRAGMENT, ImmutableMultimap.of(), PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.BROADCAST), new NodeTaskMap.PartitionedSplitCountTracker(partitionedSplitsInfo -> {
        }), set, Optional.empty(), true);
    }

    private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskResource testingTaskResource) {
        return createHttpRemoteTaskFactory(testingTaskResource, new DynamicFilterService(TestingPlannerContext.PLANNER_CONTEXT.getMetadata(), TestingPlannerContext.PLANNER_CONTEXT.getFunctionManager(), new TypeOperators(), new DynamicFilterConfig()));
    }

    private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskResource testingTaskResource, DynamicFilterService dynamicFilterService) {
        return createHttpRemoteTaskFactory(testingTaskResource, dynamicFilterService, new QueryManagerConfig());
    }

    private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(final TestingTaskResource testingTaskResource, final DynamicFilterService dynamicFilterService, final QueryManagerConfig queryManagerConfig) {
        return (HttpRemoteTaskFactory) new Bootstrap(new Module[]{new JsonModule(), new HandleJsonModule(), new Module() { // from class: io.trino.server.remotetask.TestHttpRemoteTask.1
            public void configure(Binder binder) {
                binder.bind(JsonMapper.class).in(Scopes.SINGLETON);
                binder.bind(Metadata.class).toInstance(MetadataManager.createTestMetadataManager());
                JsonBinder.jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
                JsonBinder.jsonBinder(binder).addDeserializerBinding(TypeSignature.class).to(TypeSignatureDeserializer.class);
                JsonBinder.jsonBinder(binder).addKeyDeserializerBinding(TypeSignature.class).to(TypeSignatureKeyDeserializer.class);
                JsonBinder.jsonBinder(binder).addKeyDeserializerBinding(Symbol.class).to(SymbolKeyDeserializer.class);
                JsonCodecBinder.jsonCodecBinder(binder).bindJsonCodec(TaskStatus.class);
                JsonCodecBinder.jsonCodecBinder(binder).bindJsonCodec(DynamicFiltersCollector.VersionedDynamicFilterDomains.class);
                JsonBinder.jsonBinder(binder).addSerializerBinding(Block.class).to(BlockJsonSerde.Serializer.class);
                JsonBinder.jsonBinder(binder).addDeserializerBinding(Block.class).to(BlockJsonSerde.Deserializer.class);
                JsonCodecBinder.jsonCodecBinder(binder).bindJsonCodec(TaskInfo.class);
                JsonCodecBinder.jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class);
                JsonCodecBinder.jsonCodecBinder(binder).bindJsonCodec(FailTaskRequest.class);
                binder.bind(TypeManager.class).toInstance(InternalTypeManager.TESTING_TYPE_MANAGER);
                binder.bind(BlockEncodingManager.class).in(Scopes.SINGLETON);
                binder.bind(BlockEncodingSerde.class).to(InternalBlockEncodingSerde.class).in(Scopes.SINGLETON);
                binder.bind(OpenTelemetry.class).toInstance(OpenTelemetry.noop());
                JsonBinder.jsonBinder(binder).addSerializerBinding(Span.class).to(SpanSerialization.SpanSerializer.class);
                JsonBinder.jsonBinder(binder).addDeserializerBinding(Span.class).to(SpanSerialization.SpanDeserializer.class);
            }

            @Provides
            private HttpRemoteTaskFactory createHttpRemoteTaskFactory(JsonMapper jsonMapper, JsonCodec<TaskStatus> jsonCodec, JsonCodec<DynamicFiltersCollector.VersionedDynamicFilterDomains> jsonCodec2, JsonCodec<TaskInfo> jsonCodec3, JsonCodec<TaskUpdateRequest> jsonCodec4, JsonCodec<FailTaskRequest> jsonCodec5) {
                TestingHttpClient testingHttpClient = new TestingHttpClient(new JaxrsTestingHttpProcessor(URI.create("http://fake.invalid/"), new Object[]{TestingTaskResource.this, jsonMapper}).setTrace(false));
                TestingTaskResource.this.setHttpClient(testingHttpClient);
                return new HttpRemoteTaskFactory(queryManagerConfig, TestHttpRemoteTask.TASK_MANAGER_CONFIG, testingHttpClient, new BaseTestSqlTaskManager.MockLocationFactory(), jsonCodec, jsonCodec2, jsonCodec3, jsonCodec4, jsonCodec5, Tracing.noopTracer(), new RemoteTaskStats(), dynamicFilterService);
            }
        }}).doNotInitializeLogging().quiet().initialize().getInstance(HttpRemoteTaskFactory.class);
    }

    private static void poll(BooleanSupplier booleanSupplier) throws InterruptedException {
        long nanoTime = System.nanoTime() + FAIL_TIMEOUT.roundTo(TimeUnit.NANOSECONDS);
        while (!booleanSupplier.getAsBoolean()) {
            long nanoTime2 = (nanoTime - System.nanoTime()) / 1000000;
            if (nanoTime2 <= 0) {
                throw new AssertionError(String.format("Timeout of %s reached", FAIL_TIMEOUT));
            }
            Thread.sleep(Math.min(POLL_TIMEOUT.toMillis(), nanoTime2));
        }
    }

    private static void waitUntilIdle(AtomicLong atomicLong) throws InterruptedException {
        long nanoTime = System.nanoTime();
        while (true) {
            long nanoTime2 = (System.nanoTime() - atomicLong.get()) / 1000000;
            long nanoTime3 = (System.nanoTime() - nanoTime) / 1000000;
            long millis = IDLE_TIMEOUT.toMillis() - nanoTime2;
            if (FAIL_TIMEOUT.toMillis() - nanoTime3 < millis) {
                throw new AssertionError(String.format("Activity doesn't stop after %s", FAIL_TIMEOUT));
            }
            if (millis < 0) {
                return;
            } else {
                Thread.sleep(millis);
            }
        }
    }
}
