/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.system.executor;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.system.executor.ScheduledCommand;
import io.zeebe.broker.system.executor.ScheduledExecutor;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.actor.ActorReference;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.time.ClockUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;
import org.slf4j.Logger;

public class ScheduledExecutorImpl
implements Actor,
ScheduledExecutor {
    public static final Logger LOG = Loggers.SYSTEM_LOGGER;
    protected static final String NAME = "scheduled-executor";
    protected final List<ScheduledCommandImpl> scheduledCommands = new ArrayList<ScheduledCommandImpl>();
    protected final ManyToOneConcurrentArrayQueue<Runnable> cmdQueue = new ManyToOneConcurrentArrayQueue(100);
    protected final Consumer<Runnable> cmdConsumer = Runnable::run;
    protected final AtomicBoolean isRunning = new AtomicBoolean(false);
    protected final ActorScheduler actorScheduler;
    protected ActorReference actorRef;

    public ScheduledExecutorImpl(ActorScheduler actorScheduler) {
        this.actorScheduler = actorScheduler;
    }

    @Override
    public ScheduledCommand schedule(Runnable command, Duration delay) {
        long dueDate = ClockUtil.getCurrentTimeInMillis() + delay.toMillis();
        ScheduledCommandImpl scheduledCommand = new ScheduledCommandImpl(command, dueDate);
        this.cmdQueue.add(() -> this.scheduledCommands.add(scheduledCommand));
        return scheduledCommand;
    }

    @Override
    public ScheduledCommand scheduleAtFixedRate(Runnable command, Duration period) {
        long dueDate = ClockUtil.getCurrentTimeInMillis();
        ScheduledCommandImpl scheduledCommand = new ScheduledCommandImpl(command, dueDate, period.toMillis());
        this.cmdQueue.add(() -> this.scheduledCommands.add(scheduledCommand));
        return scheduledCommand;
    }

    @Override
    public ScheduledCommand scheduleAtFixedRate(Runnable command, Duration initialDelay, Duration period) {
        long dueDate = ClockUtil.getCurrentTimeInMillis() + initialDelay.toMillis();
        ScheduledCommandImpl scheduledCommand = new ScheduledCommandImpl(command, dueDate, period.toMillis());
        this.cmdQueue.add(() -> this.scheduledCommands.add(scheduledCommand));
        return scheduledCommand;
    }

    public int doWork() throws Exception {
        int workCount = 0;
        workCount += this.cmdQueue.drain(this.cmdConsumer);
        long now = ClockUtil.getCurrentTimeInMillis();
        int i = 0;
        while (i < this.scheduledCommands.size() && this.isRunning.get()) {
            ScheduledCommandImpl scheduledCommand = this.scheduledCommands.get(i);
            if (scheduledCommand.getDueDate() <= now) {
                ++workCount;
                boolean reSchedule = this.executeCommand(scheduledCommand);
                if (reSchedule) {
                    ++i;
                    continue;
                }
                this.scheduledCommands.remove(i);
                continue;
            }
            ++i;
        }
        return workCount;
    }

    protected boolean executeCommand(ScheduledCommandImpl scheduledCommand) {
        boolean reSchedule = false;
        if (!scheduledCommand.isCancelled()) {
            try {
                scheduledCommand.getCommand().run();
                long period = scheduledCommand.getPeriod();
                if (period >= 0L) {
                    long nextDueDate = ClockUtil.getCurrentTimeInMillis() + period;
                    scheduledCommand.setDueDateInMillis(nextDueDate);
                    reSchedule = true;
                }
            }
            catch (Exception e) {
                LOG.error("Failed to execute scheduled command", (Throwable)e);
            }
        }
        return reSchedule;
    }

    public void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            this.actorRef = this.actorScheduler.schedule((Actor)this);
        }
    }

    public void stop() {
        this.stopAsync().join();
    }

    public CompletableFuture<Void> stopAsync() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (this.isRunning.compareAndSet(true, false)) {
            this.cmdQueue.add(() -> {
                this.actorRef.close();
                future.complete(null);
            });
        } else {
            future.complete(null);
        }
        return future;
    }

    public int getPriority(long now) {
        return 1;
    }

    public String name() {
        return NAME;
    }

    static class ScheduledCommandImpl
    implements ScheduledCommand {
        protected final Runnable command;
        protected final long periodInMillis;
        protected long dueDateInMillis;
        protected boolean isCancelled = false;

        ScheduledCommandImpl(Runnable command, long dueDateInMillis) {
            this(command, dueDateInMillis, -1L);
        }

        ScheduledCommandImpl(Runnable command, long dueDateInMillis, long periodInMillis) {
            this.command = command;
            this.periodInMillis = periodInMillis;
            this.dueDateInMillis = dueDateInMillis;
        }

        public Runnable getCommand() {
            return this.command;
        }

        @Override
        public long getPeriod() {
            return this.periodInMillis;
        }

        @Override
        public void cancel() {
            this.isCancelled = true;
        }

        @Override
        public boolean isCancelled() {
            return this.isCancelled;
        }

        @Override
        public long getDueDate() {
            return this.dueDateInMillis;
        }

        public void setDueDateInMillis(long dueDateInMillis) {
            this.dueDateInMillis = dueDateInMillis;
        }
    }
}

