package water;

import java.lang.Thread;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.apache.log4j.Logger;
import water.fvec.Frame;
import water.fvec.NewChunk;
import water.parser.FVecParseWriter;

/* loaded from: input_file:water/FrameSizeMonitor.class */
public class FrameSizeMonitor implements Runnable, Thread.UncaughtExceptionHandler {
    private static final int SLEEP_MS = 100;
    private static final int MB = 1048576;
    private static final float FIRST_CHECK_PROGRESS = 0.02f;
    private final Key<Job> jobKey;
    private final Set<FVecParseWriter> writers = new HashSet();
    private final long totalMemory = getTotalMemory();
    private long committedMemory = 0;
    private static final Logger LOG = Logger.getLogger((Class<?>) FrameSizeMonitor.class);
    private static final ConcurrentMap<Key<Job>, FrameSizeMonitor> registry = new ConcurrentHashMap();
    private static final String ENABLED_PROP = "util.frameSizeMonitor.enabled";
    private static final boolean ENABLED = H2O.getSysBoolProperty(ENABLED_PROP, false);
    private static final String SAFE_COEF_PROP = "util.frameSizeMonitor.safetyCoefficient";
    private static final String SAFE_FREE_MEM_DEFAULT_COEF = "0.2";
    private static final float SAFE_FREE_MEM_COEF = Float.parseFloat(H2O.getSysProperty(SAFE_COEF_PROP, SAFE_FREE_MEM_DEFAULT_COEF));

    FrameSizeMonitor(Key<Job> key) {
        this.jobKey = key;
    }

    public static void get(Key<Job> key, Consumer<FrameSizeMonitor> consumer) {
        if (ENABLED) {
            consumer.accept(registry.computeIfAbsent(key, key2 -> {
                if (((Job) key.get()).stop_requested()) {
                    throw new IllegalStateException("Memory is running low. Forcefully terminating.");
                }
                FrameSizeMonitor frameSizeMonitor = new FrameSizeMonitor(key);
                Thread thread = new Thread(frameSizeMonitor, "FrameSizeMonitor-" + ((Job) key.get())._result);
                thread.setUncaughtExceptionHandler(frameSizeMonitor);
                thread.start();
                return frameSizeMonitor;
            }));
        }
    }

    private static void finish(Key<Job> key) {
        synchronized (registry) {
            registry.remove(key);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        float f = 0.02f;
        Job job = this.jobKey.get();
        while (true) {
            if (!job.isRunning() || f >= 1.0f) {
                break;
            }
            if (!MemoryManager.canAlloc()) {
                LOG.info("FrameSizeMonitor: MemoryManager is running low on memory, stopping job " + this.jobKey + " writing frame " + job._result);
                job.fail(new RuntimeException("Aborting due to critically low memory."));
                break;
            }
            float progress = job.progress();
            if (progress >= f) {
                if (isMemoryUsageOverLimit() && isFrameSizeOverLimit(progress, job)) {
                    job.fail(new RuntimeException("Aborting due to projected memory usage too high."));
                    break;
                }
                f = f < 0.1f ? progress + 0.01f : progress + 0.1f;
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("FrameSizeMonitor: waiting for progress " + progress + " to jump over " + f);
            }
            synchronized (this) {
                try {
                    wait(100L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (LOG.isDebugEnabled()) {
            if (!job.isStopped()) {
                job.get();
            }
            if (job.isDone()) {
                LOG.debug("FrameSizeMonitor: finished monitoring job " + this.jobKey + ", final frame size is " + (((Frame) job._result.get()).byteSize() / 1048576) + " MB");
            }
        }
        finish(this.jobKey);
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        LOG.error(th);
        finish(this.jobKey);
    }

    private boolean isMemoryUsageOverLimit() {
        long availableMemory = getAvailableMemory();
        long j = ((float) (this.totalMemory * 2)) * SAFE_FREE_MEM_COEF;
        if (availableMemory < j) {
            LOG.debug("FrameSizeMonitor: Checking output of job " + this.jobKey + " because the available memory " + (availableMemory / 1048576) + " MB is lower than threshold " + (j / 1048576) + " MB (" + SAFE_FREE_MEM_COEF + " of " + (this.totalMemory / 1048576) + " MB total memory)");
            return true;
        }
        LOG.debug("FrameSizeMonitor: Overall memory usage is ok, still have " + (availableMemory / 1048576) + " MB available of " + (j / 1048576) + " MB required.");
        return false;
    }

    private boolean isFrameSizeOverLimit(float f, Job job) {
        long j = this.committedMemory;
        long inProgressMemory = getInProgressMemory();
        long j2 = ((float) inProgressMemory) + (((float) j) / f);
        long j3 = (j2 - j) - inProgressMemory;
        long availableMemory = getAvailableMemory();
        long j4 = ((float) availableMemory) - (((float) this.totalMemory) * SAFE_FREE_MEM_COEF);
        if (LOG.isDebugEnabled()) {
            LOG.debug("FrameSizeMonitor: Frame " + job._result + ": \n committed: " + (j / 1048576) + " MB\n loading: " + (inProgressMemory / 1048576) + " MB\n progress: " + f + "\n projected additional: " + (j3 / 1048576) + " MB\n projected total: " + (j2 / 1048576) + " MB\n availableMemory: " + (availableMemory / 1048576) + " MB\n totalMemory: " + (this.totalMemory / 1048576) + " MB\n usableMemory: " + (j4 / 1048576) + " MB\n enough: " + (j3 <= j4));
        }
        if (j3 > j4) {
            LOG.error("FrameSizeMonitor: Stopping job " + this.jobKey + " writing frame " + job._result + " because the projected size of " + (j3 / 1048576) + " MB  does not safely fit in " + (availableMemory / 1048576) + " MB of available memory.");
            return true;
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("FrameSizeMonitor: Projected memory " + (j3 / 1048576) + "MB for frame " + job._result + " fits safely into " + (availableMemory / 1048576) + " MB of available memory.");
        return false;
    }

    private long getInProgressMemory() {
        long j = 0;
        synchronized (this.writers) {
            Iterator<FVecParseWriter> it = this.writers.iterator();
            while (it.hasNext()) {
                NewChunk[] nvs = it.next().getNvs();
                if (nvs != null) {
                    j += getUsedMemory(nvs);
                }
            }
        }
        return j;
    }

    private long getUsedMemory(NewChunk[] newChunkArr) {
        long j = 0;
        for (NewChunk newChunk : newChunkArr) {
            if (newChunk != null) {
                j += newChunk.byteSize();
            }
        }
        return j;
    }

    private long getTotalMemory() {
        return H2O.SELF._heartbeat.get_kv_mem() + H2O.SELF._heartbeat.get_pojo_mem() + H2O.SELF._heartbeat.get_free_mem();
    }

    private long getAvailableMemory() {
        return H2O.SELF._heartbeat.get_free_mem();
    }

    public static void register(Key<Job> key, FVecParseWriter fVecParseWriter) {
        get(key, frameSizeMonitor -> {
            frameSizeMonitor.register(fVecParseWriter);
        });
    }

    public void register(FVecParseWriter fVecParseWriter) {
        synchronized (this.writers) {
            this.writers.add(fVecParseWriter);
        }
    }

    public static void closed(Key<Job> key, FVecParseWriter fVecParseWriter, long j) {
        get(key, frameSizeMonitor -> {
            frameSizeMonitor.closed(fVecParseWriter, j);
        });
    }

    public void closed(FVecParseWriter fVecParseWriter, long j) {
        synchronized (this.writers) {
            this.writers.remove(fVecParseWriter);
            this.committedMemory += j;
        }
    }
}
