package io.mantisrx.server.worker.jobmaster.rules;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.Props;
import io.mantisrx.runtime.descriptor.JobScalingRule;
import io.mantisrx.server.worker.jobmaster.JobScalerContext;
import io.mantisrx.server.worker.jobmaster.rules.CoordinatorActor;
import io.mantisrx.shaded.com.google.common.base.Strings;
import java.text.ParseException;
import java.time.Duration;
import java.util.Date;
import java.util.TimeZone;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/server/worker/jobmaster/rules/ScheduleRuleActor.class */
public class ScheduleRuleActor extends AbstractActorWithTimers {
    private static final Logger log = LoggerFactory.getLogger(ScheduleRuleActor.class);
    final JobScalerContext jobScalerContext;
    final JobScalingRule rule;
    CronExpression currentCron;
    Duration currentDuration;
    private static final String TICK_TIMER_KEY = "TickTimer";
    private static final String STOP_TIMER_KEY = "StopTimer";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/worker/jobmaster/rules/ScheduleRuleActor$Stop.class */
    public static class Stop {
        static final Stop INSTANCE = new Stop();

        private Stop() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/worker/jobmaster/rules/ScheduleRuleActor$Tick.class */
    public static class Tick {
        static final Tick INSTANCE = new Tick();

        private Tick() {
        }
    }

    public static Props Props(JobScalerContext jobScalerContext, JobScalingRule jobScalingRule) {
        return Props.create(ScheduleRuleActor.class, new Object[]{jobScalerContext, jobScalingRule});
    }

    public ScheduleRuleActor(JobScalerContext jobScalerContext, JobScalingRule jobScalingRule) {
        this.jobScalerContext = jobScalerContext;
        this.rule = jobScalingRule;
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(Tick.class, this::handleTick).match(Stop.class, this::handleStop).build();
    }

    public void preStart() throws Exception {
        super.preStart();
        log.info("ScheduleRuleActor started");
        handleScheduleMessage();
    }

    public void postStop() throws Exception {
        log.info("ScheduleRuleActor stopped, cancelling timers");
        getTimers().cancel(TICK_TIMER_KEY);
        getTimers().cancel(STOP_TIMER_KEY);
        super.postStop();
    }

    private void handleScheduleMessage() {
        log.info("setup schedule for user rule: {}", this.rule);
        getTimers().cancel(TICK_TIMER_KEY);
        getTimers().cancel(STOP_TIMER_KEY);
        String scheduleCron = this.rule.getTriggerConfig().getScheduleCron();
        log.info("Parsing cron expression: {}", scheduleCron);
        try {
            this.currentCron = new CronExpression(scheduleCron);
            this.currentCron.setTimeZone(TimeZone.getDefault());
            Date date = new Date();
            Date nextValidTimeAfter = this.currentCron.getNextValidTimeAfter(date);
            if (nextValidTimeAfter == null) {
                log.error("No next valid time found for cron expression '{}'", scheduleCron);
                throw new IllegalArgumentException("invalid cron expression: " + scheduleCron);
            }
            long time = nextValidTimeAfter.getTime() - date.getTime();
            if (time < 0) {
                log.error("Computed delay is negative: {} ms from {}", Long.valueOf(time), nextValidTimeAfter);
                throw new IllegalArgumentException("invalid cron expression compared to current time: " + scheduleCron);
            }
            log.info("Scheduling Tick to trigger in {} ms at {}", Long.valueOf(time), nextValidTimeAfter);
            getTimers().startSingleTimer(TICK_TIMER_KEY, Tick.INSTANCE, scala.concurrent.duration.Duration.create(time, "millis"));
            if (Strings.isNullOrEmpty(this.rule.getTriggerConfig().getScheduleDuration())) {
                return;
            }
            this.currentDuration = Duration.parse(this.rule.getTriggerConfig().getScheduleDuration());
            long millis = this.currentDuration.toMillis();
            long j = time + millis;
            log.info("Scheduling Stop to trigger in {} ms at {}", Long.valueOf(j), new Date(nextValidTimeAfter.getTime() + millis));
            getTimers().startSingleTimer(STOP_TIMER_KEY, Stop.INSTANCE, scala.concurrent.duration.Duration.create(j, "millis"));
        } catch (ParseException e) {
            log.error("Invalid cron expression '{}': {}", scheduleCron, e.getMessage());
        }
    }

    private void handleTick(Tick tick) {
        log.info("Tick triggered at {}", new Date());
        if (this.currentCron == null) {
            log.error("No active schedule found to handle Tick.");
            return;
        }
        log.info("Schedule actor to activate rule: {}", this.rule.getRuleId());
        getContext().getParent().tell(CoordinatorActor.ActivateRuleRequest.of(this.jobScalerContext.getJobId(), this.rule), getSelf());
        Date date = new Date();
        Date nextValidTimeAfter = this.currentCron.getNextValidTimeAfter(date);
        if (nextValidTimeAfter == null) {
            log.warn("No subsequent valid time found for cron expression '{}'", this.currentCron.getCronExpression());
            return;
        }
        long time = nextValidTimeAfter.getTime() - date.getTime();
        if (time < 0) {
            log.error("Computed delay for next Tick is negative: {} ms", Long.valueOf(time));
        } else {
            log.info("Rescheduling Tick to trigger in {} ms at {}", Long.valueOf(time), nextValidTimeAfter);
            getTimers().startSingleTimer(TICK_TIMER_KEY, Tick.INSTANCE, scala.concurrent.duration.Duration.create(time, "millis"));
        }
    }

    private void handleStop(Stop stop) {
        log.info("Deactivate rule as schedule duration finish: {}", this.rule.getRuleId());
        getContext().getParent().tell(CoordinatorActor.DeactivateRuleRequest.of(this.jobScalerContext.getJobId(), this.rule.getRuleId()), getSelf());
        Date date = new Date();
        Date nextValidTimeAfter = this.currentCron.getNextValidTimeAfter(date);
        if (nextValidTimeAfter == null) {
            log.info("No more scheduling from cron spec after stop '{}'", this.currentCron.getCronExpression());
            return;
        }
        long time = (nextValidTimeAfter.getTime() + this.currentDuration.toMillis()) - date.getTime();
        if (time < 0) {
            log.warn("Computed delay for next Stop is negative: {} ms, ignore", Long.valueOf(time));
            return;
        }
        log.info("Scheduling Stop to trigger in {} ms at {}", Long.valueOf(time), new Date(nextValidTimeAfter.getTime() + this.currentDuration.toMillis()));
        getTimers().startSingleTimer(STOP_TIMER_KEY, Stop.INSTANCE, scala.concurrent.duration.Duration.create(time, "millis"));
    }
}
