package io.deephaven.engine.updategraph.impl;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.SleepUtil;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.util.pools.MultiChunkPool;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.updategraph.NotificationAdapter;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.impl.BaseUpdateGraph;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.TestUseOnly;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;
import io.deephaven.util.function.ThrowingRunnable;
import io.deephaven.util.thread.NamingThreadFactory;
import io.deephaven.util.thread.ThreadInitializationFactory;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.class */
public class PeriodicUpdateGraph extends BaseUpdateGraph {
    public static final int NUM_THREADS_DEFAULT_UPDATE_GRAPH = Configuration.getInstance().getIntegerWithDefault("PeriodicUpdateGraph.updateThreads", -1);
    private static final Logger log = LoggerFactory.getLogger(PeriodicUpdateGraph.class);
    private final AtomicBoolean refreshRequested;
    private Thread refreshThread;
    private ScheduledExecutorService watchdogScheduler;
    private volatile int watchDogMillis;
    private volatile LongConsumer watchDogTimeoutProcedure;
    public static final String ALLOW_UNIT_TEST_MODE_PROP = "PeriodicUpdateGraph.allowUnitTestMode";
    private final boolean allowUnitTestMode;
    private int notificationAdditionDelay;
    private Random notificationRandomizer;
    private boolean unitTestMode;
    private ExecutorService unitTestRefreshThreadPool;
    public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP = "PeriodicUpdateGraph.targetCycleDurationMillis";
    private final long defaultTargetCycleDurationMillis;
    private volatile long targetCycleDurationMillis;
    private final ThreadInitializationFactory threadInitializationFactory;
    private final OperationInitializer operationInitializer;
    private final int updateThreads;
    private final long minimumInterCycleSleep;
    private final boolean interCycleYield;

    /* loaded from: input_file:io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph$Builder.class */
    public static final class Builder {
        private String name;
        private final boolean allowUnitTestMode = Configuration.getInstance().getBooleanWithDefault(PeriodicUpdateGraph.ALLOW_UNIT_TEST_MODE_PROP, false);
        private long targetCycleDurationMillis = Configuration.getInstance().getIntegerWithDefault(PeriodicUpdateGraph.DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);
        private long minimumCycleDurationToLogNanos = BaseUpdateGraph.DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS;
        private int numUpdateThreads = -1;
        private ThreadInitializationFactory threadInitializationFactory = runnable -> {
            return runnable;
        };
        private OperationInitializer operationInitializer = ExecutionContext.getContext().getOperationInitializer();

        public Builder(String str) {
            this.name = str;
        }

        public Builder targetCycleDurationMillis(long j) {
            this.targetCycleDurationMillis = j;
            return this;
        }

        public Builder minimumCycleDurationToLogNanos(long j) {
            this.minimumCycleDurationToLogNanos = j;
            return this;
        }

        public Builder numUpdateThreads(int i) {
            this.numUpdateThreads = i;
            return this;
        }

        public Builder threadInitializationFactory(ThreadInitializationFactory threadInitializationFactory) {
            this.threadInitializationFactory = threadInitializationFactory;
            return this;
        }

        public Builder operationInitializer(OperationInitializer operationInitializer) {
            this.operationInitializer = operationInitializer;
            return this;
        }

        public PeriodicUpdateGraph build() {
            return (PeriodicUpdateGraph) BaseUpdateGraph.buildOrThrow(this.name, this::construct);
        }

        public PeriodicUpdateGraph existingOrBuild() {
            return (PeriodicUpdateGraph) ((PeriodicUpdateGraph) BaseUpdateGraph.existingOrBuild(this.name, this::construct)).cast();
        }

        private PeriodicUpdateGraph construct() {
            return new PeriodicUpdateGraph(this.name, this.allowUnitTestMode, this.targetCycleDurationMillis, this.minimumCycleDurationToLogNanos, this.numUpdateThreads, this.threadInitializationFactory, this.operationInitializer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph$ConcurrentNotificationProcessor.class */
    public class ConcurrentNotificationProcessor implements BaseUpdateGraph.NotificationProcessor {
        private final Thread[] updateThreads;
        private final IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> satisfiedNotifications = new IntrusiveDoublyLinkedQueue<>(IntrusiveDoublyLinkedNode.Adapter.getInstance());
        private final AtomicInteger outstandingNotifications = new AtomicInteger(0);
        private final Semaphore pendingNormalNotificationsCheckNeeded = new Semaphore(0, false);
        private volatile boolean running = true;
        private volatile boolean isHealthy = true;

        public ConcurrentNotificationProcessor(@NotNull ThreadFactory threadFactory, int i) {
            this.updateThreads = new Thread[i];
            for (int i2 = 0; i2 < i; i2++) {
                this.updateThreads[i2] = threadFactory.newThread(this::processSatisfiedNotifications);
                this.updateThreads[i2].start();
            }
        }

        private void processSatisfiedNotifications() {
            PeriodicUpdateGraph.log.info().append(Thread.currentThread().getName()).append(": starting to poll for satisfied notifications");
            NotificationQueue.Notification notification = null;
            while (this.running) {
                try {
                    synchronized (this.satisfiedNotifications) {
                        while (this.running) {
                            NotificationQueue.Notification notification2 = (NotificationQueue.Notification) this.satisfiedNotifications.poll();
                            notification = notification2;
                            if (notification2 != null) {
                                break;
                            } else {
                                try {
                                    this.satisfiedNotifications.wait();
                                } catch (InterruptedException e) {
                                }
                            }
                        }
                    }
                    if (notification == null) {
                        break;
                    }
                    PeriodicUpdateGraph.this.runNotification(notification);
                    notification = null;
                    this.outstandingNotifications.decrementAndGet();
                    this.pendingNormalNotificationsCheckNeeded.release();
                } finally {
                    if (notification != null) {
                        this.isHealthy = false;
                        this.outstandingNotifications.decrementAndGet();
                        this.pendingNormalNotificationsCheckNeeded.release();
                    }
                    PeriodicUpdateGraph.log.info().append(Thread.currentThread().getName()).append(": terminating");
                }
            }
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void submit(@NotNull NotificationQueue.Notification notification) {
            this.outstandingNotifications.incrementAndGet();
            synchronized (this.satisfiedNotifications) {
                this.satisfiedNotifications.offer(notification);
                this.satisfiedNotifications.notify();
            }
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void submitAll(@NotNull IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> intrusiveDoublyLinkedQueue) {
            this.outstandingNotifications.addAndGet(intrusiveDoublyLinkedQueue.size());
            synchronized (this.satisfiedNotifications) {
                this.satisfiedNotifications.transferAfterTailFrom(intrusiveDoublyLinkedQueue);
                this.satisfiedNotifications.notifyAll();
            }
        }

        @TestUseOnly
        protected void submitAt(@NotNull NotificationQueue.Notification notification, int i) {
            this.outstandingNotifications.incrementAndGet();
            synchronized (this.satisfiedNotifications) {
                this.satisfiedNotifications.insert(notification, Math.min(i, this.satisfiedNotifications.size()));
                this.satisfiedNotifications.notify();
            }
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public int outstandingNotificationsCount() {
            return this.outstandingNotifications.get();
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void doWork() {
            try {
                this.pendingNormalNotificationsCheckNeeded.acquire();
                Assert.eqTrue(this.isHealthy, "isHealthy");
            } catch (InterruptedException e) {
            }
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void doAllWork() {
            while (outstandingNotificationsCount() > 0) {
                doWork();
            }
            Assert.eqTrue(this.isHealthy, "isHealthy");
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void shutdown() {
            this.running = false;
            synchronized (this.satisfiedNotifications) {
                this.satisfiedNotifications.clear();
                this.satisfiedNotifications.notifyAll();
            }
            for (Thread thread : this.updateThreads) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                }
            }
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void onNotificationAdded() {
            this.pendingNormalNotificationsCheckNeeded.release();
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void beforeNotificationsDrained() {
            this.pendingNormalNotificationsCheckNeeded.drainPermits();
        }

        int threadCount() {
            return this.updateThreads.length;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @TestUseOnly
    /* loaded from: input_file:io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph$ControlledNotificationProcessor.class */
    public class ControlledNotificationProcessor implements BaseUpdateGraph.NotificationProcessor {
        private final Semaphore pendingNormalNotificationsCheckNeeded = new Semaphore(0, false);

        private ControlledNotificationProcessor() {
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void submit(@NotNull NotificationQueue.Notification notification) {
            PeriodicUpdateGraph.this.runNotification(notification);
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void submitAll(@NotNull IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> intrusiveDoublyLinkedQueue) {
            while (true) {
                NotificationQueue.Notification notification = (NotificationQueue.Notification) intrusiveDoublyLinkedQueue.poll();
                if (notification == null) {
                    return;
                } else {
                    PeriodicUpdateGraph.this.runNotification(notification);
                }
            }
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public int outstandingNotificationsCount() {
            return 0;
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void doWork() {
            Assert.statementNeverExecuted();
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void doAllWork() {
            Assert.statementNeverExecuted();
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void shutdown() {
            Assert.statementNeverExecuted();
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void onNotificationAdded() {
            this.pendingNormalNotificationsCheckNeeded.release();
        }

        @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
        public void beforeNotificationsDrained() {
            this.pendingNormalNotificationsCheckNeeded.drainPermits();
        }

        private boolean blockUntilNotificationAdded(long j) {
            try {
                return this.pendingNormalNotificationsCheckNeeded.tryAcquire(j, TimeUnit.NANOSECONDS);
            } catch (InterruptedException e) {
                Assert.statementNeverExecuted();
                return false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph$NotificationProcessorThreadFactory.class */
    public class NotificationProcessorThreadFactory extends NamingThreadFactory {
        private NotificationProcessorThreadFactory(@NotNull ThreadGroup threadGroup, @NotNull String str) {
            super(threadGroup, PeriodicUpdateGraph.class, str, true);
        }

        public Thread newThread(@NotNull Runnable runnable) {
            return super.newThread(PeriodicUpdateGraph.this.threadInitializationFactory.createInitializer(() -> {
                PeriodicUpdateGraph.this.configureRefreshThread();
                runnable.run();
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @TestUseOnly
    /* loaded from: input_file:io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph$UnitTestThreadFactory.class */
    public class UnitTestThreadFactory extends NamingThreadFactory {
        private UnitTestThreadFactory() {
            super(PeriodicUpdateGraph.class, "unitTestRefresh");
        }

        public Thread newThread(@NotNull Runnable runnable) {
            return super.newThread(() -> {
                PeriodicUpdateGraph.this.configureUnitTestRefreshThread();
                runnable.run();
            });
        }
    }

    public static Builder newBuilder(String str) {
        return new Builder(str);
    }

    public PeriodicUpdateGraph(String str, boolean z, long j, long j2, int i, ThreadInitializationFactory threadInitializationFactory, OperationInitializer operationInitializer) {
        super(str, z, log, j2);
        this.refreshRequested = new AtomicBoolean();
        this.watchDogMillis = 0;
        this.notificationRandomizer = new Random(0L);
        this.minimumInterCycleSleep = Configuration.getInstance().getIntegerWithDefault("PeriodicUpdateGraph.minimumInterCycleSleep", 0);
        this.interCycleYield = Configuration.getInstance().getBooleanWithDefault("PeriodicUpdateGraph.interCycleYield", false);
        this.allowUnitTestMode = z;
        this.defaultTargetCycleDurationMillis = j;
        this.targetCycleDurationMillis = j;
        this.threadInitializationFactory = threadInitializationFactory;
        this.operationInitializer = operationInitializer;
        if (i <= 0) {
            this.updateThreads = Runtime.getRuntime().availableProcessors();
        } else {
            this.updateThreads = i;
        }
    }

    public LogOutput append(@NotNull LogOutput logOutput) {
        return logOutput.append("PeriodicUpdateGraph-").append(getName());
    }

    @NotNull
    private BaseUpdateGraph.NotificationProcessor makeNotificationProcessor() {
        return this.updateThreads > 1 ? new ConcurrentNotificationProcessor(new NotificationProcessorThreadFactory(new ThreadGroup("PeriodicUpdateGraph-updateExecutors"), "updateExecutor"), this.updateThreads) : new BaseUpdateGraph.QueueNotificationProcessor();
    }

    @TestUseOnly
    private BaseUpdateGraph.NotificationProcessor makeRandomizedNotificationProcessor(final Random random, int i, final int i2) {
        return new ConcurrentNotificationProcessor(new NotificationProcessorThreadFactory(new ThreadGroup("PeriodicUpdateGraph-randomizedUpdatedExecutors"), "randomizedUpdateExecutor"), i) { // from class: io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph.1
            private NotificationQueue.Notification addRandomDelay(@NotNull NotificationQueue.Notification notification) {
                return i2 <= 0 ? notification : new NotificationAdapter(notification) { // from class: io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph.1.1
                    public void run() {
                        int nextInt = random.nextInt(i2);
                        PeriodicUpdateGraph.this.logDependencies().append(Thread.currentThread().getName()).append(": Sleeping for  ").append(nextInt).append("ms").endl();
                        SleepUtil.sleep(nextInt);
                        super.run();
                    }
                };
            }

            @Override // io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph.ConcurrentNotificationProcessor, io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
            public void submit(@NotNull NotificationQueue.Notification notification) {
                if (notification instanceof BaseUpdateGraph.UpdateSourceRefreshNotification) {
                    super.submit(notification);
                } else if (notification instanceof NotificationQueue.ErrorNotification) {
                    submitAt(notification, 0);
                } else {
                    submitAt(addRandomDelay(notification), random.nextInt(outstandingNotificationsCount() + 1));
                }
            }

            @Override // io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph.ConcurrentNotificationProcessor, io.deephaven.engine.updategraph.impl.BaseUpdateGraph.NotificationProcessor
            public void submitAll(@NotNull IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> intrusiveDoublyLinkedQueue) {
                intrusiveDoublyLinkedQueue.forEach(this::submit);
            }
        };
    }

    public int parallelismFactor() {
        if (this.notificationProcessor == null) {
            return this.updateThreads;
        }
        if (this.notificationProcessor instanceof ConcurrentNotificationProcessor) {
            return ((ConcurrentNotificationProcessor) this.notificationProcessor).threadCount();
        }
        return 1;
    }

    public void setTargetCycleDurationMillis(long j) {
        this.targetCycleDurationMillis = Math.max(j, 0L);
    }

    public long getTargetCycleDurationMillis() {
        return this.targetCycleDurationMillis;
    }

    @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph
    public boolean isCycleOnBudget(long j) {
        return j <= TimeUnit.MILLISECONDS.toNanos(this.targetCycleDurationMillis);
    }

    public void resetTargetCycleDuration() {
        this.targetCycleDurationMillis = this.defaultTargetCycleDurationMillis;
    }

    public void enableUnitTestMode() {
        if (this.unitTestMode) {
            return;
        }
        if (!this.allowUnitTestMode) {
            throw new IllegalStateException("PeriodicUpdateGraph.allowUnitTestMode=false");
        }
        if (this.refreshThread != null) {
            throw new IllegalStateException("PeriodicUpdateGraph.refreshThread is executing!");
        }
        resetLock();
        this.unitTestMode = true;
        this.unitTestRefreshThreadPool = makeUnitTestRefreshExecutor();
        this.updatePerformanceTracker.enableUnitTestMode();
    }

    public boolean isUnitTestModeAllowed() {
        return this.allowUnitTestMode;
    }

    public void setWatchDogMillis(int i) {
        this.watchDogMillis = Math.max(i, 0);
    }

    public int getWatchDogMillis() {
        return this.watchDogMillis;
    }

    public void setWatchDogTimeoutProcedure(LongConsumer longConsumer) {
        this.watchDogTimeoutProcedure = longConsumer;
    }

    public void start() {
        Assert.eqTrue(this.running, "running");
        Assert.eqFalse(this.unitTestMode, "unitTestMode");
        Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode");
        synchronized (this) {
            if (this.watchdogScheduler == null) {
                this.watchdogScheduler = Executors.newSingleThreadScheduledExecutor(new NamingThreadFactory(PeriodicUpdateGraph.class, "watchdogScheduler", true) { // from class: io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph.2
                    public Thread newThread(@NotNull Runnable runnable) {
                        return super.newThread(PeriodicUpdateGraph.this.threadInitializationFactory.createInitializer(runnable));
                    }
                });
            }
            if (this.notificationProcessor instanceof PoisonedNotificationProcessor) {
                this.notificationProcessor = makeNotificationProcessor();
            }
            if (this.refreshThread == null) {
                this.refreshThread = new Thread(this.threadInitializationFactory.createInitializer(() -> {
                    configureRefreshThread();
                    while (this.running) {
                        Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode");
                        refreshTablesAndFlushNotifications();
                    }
                }), "PeriodicUpdateGraph." + getName() + ".refreshThread");
                this.refreshThread.setDaemon(true);
                log.info().append("PeriodicUpdateGraph starting with ").append(this.updateThreads).append(" notification processing threads").endl();
                this.updatePerformanceTracker.start();
                this.refreshThread.start();
            }
        }
    }

    public void stop() {
        this.running = false;
        this.notificationProcessor.shutdown();
        exclusiveLock().doLocked(() -> {
        });
    }

    @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph
    public void addSource(@NotNull Runnable runnable) {
        if (!this.allowUnitTestMode) {
            super.addSource(runnable);
            start();
        } else if (runnable instanceof DynamicNode) {
            ((DynamicNode) runnable).setRefreshing(true);
        }
    }

    @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph
    public void addNotification(@NotNull NotificationQueue.Notification notification) {
        if (this.notificationAdditionDelay > 0) {
            SleepUtil.sleep(this.notificationRandomizer.nextInt(this.notificationAdditionDelay));
        }
        super.addNotification(notification);
    }

    @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph
    public boolean maybeAddNotification(@NotNull NotificationQueue.Notification notification, long j) {
        if (this.notificationAdditionDelay > 0) {
            SleepUtil.sleep(this.notificationRandomizer.nextInt(this.notificationAdditionDelay));
        }
        return super.maybeAddNotification(notification, j);
    }

    public void requestRefresh() {
        if (!this.running) {
            throw new IllegalStateException("Cannot request refresh when UpdateGraph is no longer running.");
        }
        this.refreshRequested.set(true);
        synchronized (this.refreshRequested) {
            this.refreshRequested.notify();
        }
    }

    @TestUseOnly
    public void resetForUnitTests(boolean z) {
        resetForUnitTests(z, false, 0, 0, 0, 0);
    }

    @TestUseOnly
    public void resetForUnitTests(boolean z, boolean z2, int i, int i2, int i3, int i4) {
        ArrayList arrayList = new ArrayList();
        this.notificationRandomizer = new Random(i);
        this.notificationAdditionDelay = i4;
        Assert.assertion(this.unitTestMode, "unitTestMode");
        resetForUnitTests(z, arrayList);
        if (z2) {
            this.notificationProcessor = makeRandomizedNotificationProcessor(this.notificationRandomizer, i2, i3);
        } else {
            this.notificationProcessor = makeNotificationProcessor();
        }
        if (this.refreshThread != null) {
            arrayList.add("UpdateGraph refreshThread isAlive");
        }
        try {
            this.unitTestRefreshThreadPool.submit(() -> {
                ensureUnlocked("unit test run pool thread", arrayList);
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            arrayList.add("Failed to ensure UpdateGraph unlocked from unit test run thread pool: " + String.valueOf(e));
        }
        this.unitTestRefreshThreadPool.shutdownNow();
        try {
            if (!this.unitTestRefreshThreadPool.awaitTermination(1L, TimeUnit.SECONDS)) {
                arrayList.add("Failed to cleanup jobs in unit test run thread pool");
            }
        } catch (InterruptedException e2) {
            arrayList.add("Interrupted while trying to cleanup jobs in unit test run thread pool");
        }
        this.unitTestRefreshThreadPool = makeUnitTestRefreshExecutor();
        if (!arrayList.isEmpty()) {
            String str = "UpdateGraph reset for unit tests reported errors:\n\t" + String.join("\n\t", arrayList);
            System.err.println(str);
            if (z) {
                throw new IllegalStateException(str);
            }
        }
        resetLock();
    }

    @TestUseOnly
    public void startCycleForUnitTests() {
        startCycleForUnitTests(true);
    }

    @TestUseOnly
    public void startCycleForUnitTests(boolean z) {
        Assert.assertion(this.unitTestMode, "unitTestMode");
        try {
            this.unitTestRefreshThreadPool.submit(() -> {
                startCycleForUnitTestsInternal(z);
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new UncheckedDeephavenException(e);
        }
    }

    @TestUseOnly
    private void startCycleForUnitTestsInternal(boolean z) {
        this.isUpdateThread.set(true);
        exclusiveLock().lock();
        Assert.eqNull(this.refreshScope, "refreshScope");
        this.refreshScope = new LivenessScope();
        LivenessScopeStack.push(this.refreshScope);
        this.logicalClock.startUpdateCycle();
        if (z) {
            markSourcesRefreshedForUnitTests();
        }
    }

    @TestUseOnly
    public void markSourcesRefreshedForUnitTests() {
        Assert.assertion(this.unitTestMode, "unitTestMode");
        updateSourcesLastSatisfiedStep(true);
    }

    @TestUseOnly
    public void completeCycleForUnitTests() {
        completeCycleForUnitTests(false);
    }

    private void completeCycleForUnitTests(boolean z) {
        Assert.assertion(this.unitTestMode, "unitTestMode");
        if (!z) {
            long currentStep = this.logicalClock.currentStep();
            Assert.assertion(satisfied(currentStep), "satisfied()", Long.valueOf(currentStep), "currentStep");
        }
        try {
            this.unitTestRefreshThreadPool.submit(this::completeCycleForUnitTestsInternal).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!z) {
                throw new UncheckedDeephavenException(e);
            }
        }
    }

    @TestUseOnly
    private void completeCycleForUnitTestsInternal() {
        SafeCloseable safeCloseable = () -> {
            if (this.refreshScope != null) {
                LivenessScopeStack.pop(this.refreshScope);
                this.refreshScope.release();
                this.refreshScope = null;
            }
            exclusiveLock().unlock();
            this.isUpdateThread.remove();
        };
        try {
            flushNotificationsAndCompleteCycle(false);
            if (safeCloseable != null) {
                safeCloseable.close();
            }
        } catch (Throwable th) {
            if (safeCloseable != null) {
                try {
                    safeCloseable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestUseOnly
    public <T extends Exception> void runWithinUnitTestCycle(@NotNull ThrowingRunnable<T> throwingRunnable) throws Exception {
        runWithinUnitTestCycle(throwingRunnable, true);
    }

    @TestUseOnly
    public <T extends Exception> void runWithinUnitTestCycle(@NotNull ThrowingRunnable<T> throwingRunnable, boolean z) throws Exception {
        startCycleForUnitTests(z);
        boolean z2 = false;
        try {
            try {
                throwingRunnable.run();
                completeCycleForUnitTests(false);
            } finally {
            }
        } catch (Throwable th) {
            completeCycleForUnitTests(z2);
            throw th;
        }
    }

    @TestUseOnly
    public void refreshUpdateSourceForUnitTests(@NotNull Runnable runnable) {
        Assert.assertion(this.unitTestMode, "unitTestMode");
        try {
            this.unitTestRefreshThreadPool.submit(runnable).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new UncheckedDeephavenException(e);
        }
    }

    @TestUseOnly
    public boolean flushOneNotificationForUnitTests() {
        return flushOneNotificationForUnitTests(false);
    }

    @TestUseOnly
    public boolean flushOneNotificationForUnitTests(boolean z) {
        Assert.assertion(this.unitTestMode, "unitTestMode");
        BaseUpdateGraph.NotificationProcessor notificationProcessor = this.notificationProcessor;
        try {
            try {
                this.notificationProcessor = new ControlledNotificationProcessor();
                boolean booleanValue = ((Boolean) this.unitTestRefreshThreadPool.submit(() -> {
                    return Boolean.valueOf(flushOneNotificationForUnitTestsInternal(z));
                }).get()).booleanValue();
                this.notificationProcessor = notificationProcessor;
                return booleanValue;
            } catch (InterruptedException | ExecutionException e) {
                throw new UncheckedDeephavenException(e);
            }
        } catch (Throwable th) {
            this.notificationProcessor = notificationProcessor;
            throw th;
        }
    }

    @TestUseOnly
    private boolean flushOneNotificationForUnitTestsInternal(boolean z) {
        IntrusiveDoublyLinkedQueue intrusiveDoublyLinkedQueue = new IntrusiveDoublyLinkedQueue(IntrusiveDoublyLinkedNode.Adapter.getInstance());
        this.notificationProcessor.beforeNotificationsDrained();
        synchronized (this.pendingNormalNotifications) {
            intrusiveDoublyLinkedQueue.transferAfterTailFrom(this.pendingNormalNotifications);
        }
        boolean z2 = !intrusiveDoublyLinkedQueue.isEmpty();
        NotificationQueue.Notification notification = null;
        Iterator it = intrusiveDoublyLinkedQueue.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            NotificationQueue.Notification notification2 = (NotificationQueue.Notification) it.next();
            Assert.eqFalse(notification2.isTerminal(), "notification.isTerminal()");
            Assert.eqFalse(notification2.mustExecuteWithUpdateGraphLock(), "notification.mustExecuteWithUpdateGraphLock()");
            if (notification2.canExecute(this.logicalClock.currentStep())) {
                notification = notification2;
                it.remove();
                break;
            }
        }
        synchronized (this.pendingNormalNotifications) {
            this.pendingNormalNotifications.transferBeforeHeadFrom(intrusiveDoublyLinkedQueue);
        }
        if (notification != null) {
            this.notificationProcessor.submit(notification);
            if (z) {
                Assert.statementNeverExecuted("Flushed a notification in unit test mode, but expected only unsatisfied pending notifications");
            }
        } else if (z2 && !z) {
            Assert.statementNeverExecuted("Did not flush any notifications in unit test mode, yet there were outstanding notifications");
        }
        return notification != null;
    }

    @TestUseOnly
    public void flushAllNormalNotificationsForUnitTests() {
        flushAllNormalNotificationsForUnitTests(() -> {
            return true;
        }, 0L).run();
    }

    @TestUseOnly
    public Runnable flushAllNormalNotificationsForUnitTests(@NotNull BooleanSupplier booleanSupplier, long j) {
        Assert.assertion(this.unitTestMode, "unitTestMode");
        Assert.geqZero(j, "timeoutMillis");
        BaseUpdateGraph.NotificationProcessor notificationProcessor = this.notificationProcessor;
        ControlledNotificationProcessor controlledNotificationProcessor = new ControlledNotificationProcessor();
        this.notificationProcessor = controlledNotificationProcessor;
        Future<?> submit = this.unitTestRefreshThreadPool.submit(() -> {
            long nanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(j);
            while (true) {
                boolean flushOneNotificationForUnitTestsInternal = flushOneNotificationForUnitTestsInternal(false);
                if (!flushOneNotificationForUnitTestsInternal && booleanSupplier.getAsBoolean()) {
                    return;
                }
                if (!flushOneNotificationForUnitTestsInternal && !controlledNotificationProcessor.blockUntilNotificationAdded(nanoTime - System.nanoTime())) {
                    Assert.statementNeverExecuted("Unit test failure due to timeout after " + j + " ms");
                }
            }
        });
        return () -> {
            try {
                try {
                    submit.get();
                    this.notificationProcessor = notificationProcessor;
                } catch (InterruptedException | ExecutionException e) {
                    throw new UncheckedDeephavenException(e);
                }
            } catch (Throwable th) {
                this.notificationProcessor = notificationProcessor;
                throw th;
            }
        };
    }

    @TestUseOnly
    public void wakeRefreshThreadForUnitTests() {
        this.notificationProcessor.onNotificationAdded();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph
    public void refreshTablesAndFlushNotifications() {
        long nanoTime = System.nanoTime();
        ScheduledFuture<?> scheduledFuture = null;
        long j = this.watchDogMillis;
        LongConsumer longConsumer = this.watchDogTimeoutProcedure;
        if (j > 0 && longConsumer != null) {
            scheduledFuture = this.watchdogScheduler.schedule(() -> {
                longConsumer.accept(j);
            }, j, TimeUnit.MILLISECONDS);
        }
        super.refreshTablesAndFlushNotifications();
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }
        if (this.interCycleYield) {
            Thread.yield();
        }
        waitForNextCycle(nanoTime);
    }

    private void waitForNextCycle(long j) {
        long nanoTime = System.nanoTime();
        long nanos = j + TimeUnit.MILLISECONDS.toNanos(this.targetCycleDurationMillis);
        if (this.minimumInterCycleSleep > 0) {
            nanos = Math.max(nanos, nanoTime + TimeUnit.MILLISECONDS.toNanos(this.minimumInterCycleSleep));
        }
        maybeFlushUpdatePerformance(nanoTime, nanos);
        waitForEndTime(nanos);
    }

    private void waitForEndTime(long j) {
        while (true) {
            long nanoTime = j - System.nanoTime();
            if (nanoTime <= 0 || this.refreshRequested.get()) {
                return;
            }
            synchronized (this.refreshRequested) {
                if (this.refreshRequested.get()) {
                    return;
                }
                long j2 = nanoTime / 1000000;
                try {
                    this.refreshRequested.wait(j2, (int) (nanoTime - (j2 * 1000000)));
                } catch (InterruptedException e) {
                    log.warn().append("Interrupted while waiting on refreshRequested. Ignoring: ").append(e).endl();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.deephaven.engine.updategraph.impl.BaseUpdateGraph
    public void refreshAllTables() {
        this.refreshRequested.set(false);
        super.refreshAllTables();
    }

    private ExecutorService makeUnitTestRefreshExecutor() {
        return Executors.newFixedThreadPool(1, new UnitTestThreadFactory());
    }

    private void configureRefreshThread() {
        SystemicObjectTracker.markThreadSystemic();
        MultiChunkPool.enableDedicatedPoolForThisThread();
        this.isUpdateThread.set(true);
        ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(this.operationInitializer).build().open();
    }

    private void configureUnitTestRefreshThread() {
        Thread currentThread = Thread.currentThread();
        Thread.UncaughtExceptionHandler uncaughtExceptionHandler = currentThread.getUncaughtExceptionHandler();
        currentThread.setUncaughtExceptionHandler((thread, th) -> {
            ensureUnlocked("unit test run pool thread exception handler", null);
            uncaughtExceptionHandler.uncaughtException(thread, th);
        });
        this.isUpdateThread.set(true);
        ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(this.operationInitializer).build().open();
    }

    public static PeriodicUpdateGraph getInstance(String str) {
        return (PeriodicUpdateGraph) BaseUpdateGraph.getInstance(str).cast();
    }
}
