package io.trino.execution.executor.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.trino.execution.executor.scheduler.TaskControl;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@ThreadSafe
/* loaded from: input_file:io/trino/execution/executor/scheduler/FairScheduler.class */
public final class FairScheduler implements AutoCloseable {
    private static final Logger LOG = Logger.get(FairScheduler.class);
    public static final long QUANTUM_NANOS = TimeUnit.MILLISECONDS.toNanos(1000);
    private final ListeningExecutorService taskExecutor;
    private final ThreadPoolExecutor executor;
    private final Reservation<TaskControl> concurrencyControl;
    private final Ticker ticker;

    @GuardedBy("this")
    private boolean closed;
    private final BlockingSchedulingQueue<Group, TaskControl> queue = new BlockingSchedulingQueue<>();
    private final Gate paused = new Gate(true);
    private final ExecutorService schedulerExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("fair-scheduler-%d"));

    public FairScheduler(int i, String str, Ticker ticker) {
        this.ticker = (Ticker) Objects.requireNonNull(ticker, "ticker is null");
        this.concurrencyControl = new Reservation<>(i);
        this.executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Threads.daemonThreadsNamed(str));
        this.taskExecutor = MoreExecutors.listeningDecorator(this.executor);
    }

    public static FairScheduler newInstance(int i) {
        return newInstance(i, Ticker.systemTicker());
    }

    public static FairScheduler newInstance(int i, Ticker ticker) {
        FairScheduler fairScheduler = new FairScheduler(i, "fair-scheduler-runner-%d", ticker);
        fairScheduler.start();
        return fairScheduler;
    }

    public void start() {
        this.schedulerExecutor.submit(this::runScheduler);
    }

    public void pause() {
        this.paused.close();
    }

    public void resume() {
        this.paused.open();
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        Iterator<TaskControl> it = this.queue.finishAll().iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        this.taskExecutor.shutdownNow();
        this.schedulerExecutor.shutdownNow();
    }

    public synchronized Group createGroup(String str) {
        Preconditions.checkArgument(!this.closed, "Already closed");
        Group group = new Group(str);
        this.queue.startGroup(group);
        return group;
    }

    public synchronized void removeGroup(Group group) {
        Preconditions.checkArgument(!this.closed, "Already closed");
        Iterator<TaskControl> it = this.queue.finishGroup(group).iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    public Set<Integer> getTasks(Group group) {
        return (Set) this.queue.getTasks(group).stream().map((v0) -> {
            return v0.id();
        }).collect(ImmutableSet.toImmutableSet());
    }

    public synchronized ListenableFuture<Void> submit(Group group, int i, Schedulable schedulable) {
        Preconditions.checkArgument(!this.closed, "Already closed");
        TaskControl taskControl = new TaskControl(group, i, this.ticker);
        return this.taskExecutor.submit(() -> {
            runTask(schedulable, taskControl);
        }, (Object) null);
    }

    private void runTask(Schedulable schedulable, TaskControl taskControl) {
        taskControl.setThread(Thread.currentThread());
        if (makeRunnableAndAwait(taskControl, 0L)) {
            try {
                try {
                    schedulable.run(new SchedulerContext(this, taskControl));
                    if (taskControl.getState() == TaskControl.State.RUNNING) {
                        this.concurrencyControl.release(taskControl);
                    }
                    this.queue.finish(taskControl.group(), taskControl);
                    taskControl.transitionToFinished();
                } catch (Exception e) {
                    LOG.error(e);
                    if (taskControl.getState() == TaskControl.State.RUNNING) {
                        this.concurrencyControl.release(taskControl);
                    }
                    this.queue.finish(taskControl.group(), taskControl);
                    taskControl.transitionToFinished();
                }
            } catch (Throwable th) {
                if (taskControl.getState() == TaskControl.State.RUNNING) {
                    this.concurrencyControl.release(taskControl);
                }
                this.queue.finish(taskControl.group(), taskControl);
                taskControl.transitionToFinished();
                throw th;
            }
        }
    }

    private boolean makeRunnableAndAwait(TaskControl taskControl, long j) {
        if (taskControl.transitionToWaiting() && this.queue.enqueue(taskControl.group(), taskControl, j)) {
            return awaitReadyAndTransitionToRunning(taskControl);
        }
        return false;
    }

    private boolean awaitReadyAndTransitionToRunning(TaskControl taskControl) {
        if (taskControl.awaitReady()) {
            if (taskControl.transitionToRunning()) {
                return true;
            }
            this.concurrencyControl.release(taskControl);
            return false;
        }
        if (!taskControl.isReady()) {
            return false;
        }
        this.concurrencyControl.release(taskControl);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean yield(TaskControl taskControl) {
        Preconditions.checkState(taskControl.getThread() == Thread.currentThread(), "yield() may only be called from the task thread");
        long elapsed = taskControl.elapsed();
        if (elapsed < QUANTUM_NANOS) {
            return true;
        }
        this.concurrencyControl.release(taskControl);
        return makeRunnableAndAwait(taskControl, elapsed);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean block(TaskControl taskControl, ListenableFuture<?> listenableFuture) {
        Preconditions.checkState(taskControl.getThread() == Thread.currentThread(), "block() may only be called from the task thread");
        long elapsed = taskControl.elapsed();
        this.concurrencyControl.release(taskControl);
        if (!taskControl.transitionToBlocked() || !this.queue.block(taskControl.group(), taskControl, elapsed)) {
            return false;
        }
        Objects.requireNonNull(taskControl);
        listenableFuture.addListener(taskControl::markUnblocked, MoreExecutors.directExecutor());
        taskControl.awaitUnblock();
        return makeRunnableAndAwait(taskControl, 0L);
    }

    private void runScheduler() {
        while (true) {
            try {
                this.paused.awaitOpen();
                this.concurrencyControl.reserve();
                TaskControl dequeue = this.queue.dequeue(QUANTUM_NANOS);
                this.concurrencyControl.register(dequeue);
                if (!dequeue.markReady()) {
                    this.concurrencyControl.release(dequeue);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            } catch (Exception e2) {
                LOG.error(e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getStartNanos(TaskControl taskControl) {
        return taskControl.getStartNanos();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getScheduledNanos(TaskControl taskControl) {
        return taskControl.getScheduledNanos();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getWaitNanos(TaskControl taskControl) {
        return taskControl.getWaitNanos();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getBlockedNanos(TaskControl taskControl) {
        return taskControl.getBlockedNanos();
    }

    public String diagnostics() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.queue);
        sb.append("Task executor: pool=%s, active=%s, queue=%s\n".formatted(Integer.valueOf(this.executor.getPoolSize()), Integer.valueOf(this.executor.getActiveCount()), Integer.valueOf(this.executor.getQueue().size())));
        sb.append("Concurrency control: slots=%s, available=%s\n".formatted(Integer.valueOf(this.concurrencyControl.totalSlots()), Integer.valueOf(this.concurrencyControl.availableSlots())));
        sb.append("Reservations:\n");
        if (this.concurrencyControl.totalSlots() - this.concurrencyControl.availableSlots() == 1) {
            sb.append("    (pending)\n");
        }
        this.concurrencyControl.reservations().forEach(taskControl -> {
            sb.append("    ").append(taskControl).append("\n");
        });
        return sb.toString();
    }

    public String toString() {
        return new StringJoiner(", ", FairScheduler.class.getSimpleName() + "[", "]").add("queue=" + String.valueOf(this.queue)).add("concurrencyControl=" + String.valueOf(this.concurrencyControl)).add("closed=" + this.closed).toString();
    }
}
