package io.mantisrx.master.scheduler;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.Props;
import com.netflix.spectator.impl.Preconditions;
import io.mantisrx.master.events.LifecycleEventsProto;
import io.mantisrx.master.jobcluster.job.worker.WorkerState;
import io.mantisrx.server.master.scheduler.MantisScheduler;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.functions.Action1;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:io/mantisrx/master/scheduler/AgentsErrorMonitorActor.class */
public class AgentsErrorMonitorActor extends AbstractActorWithTimers implements IAgentsErrorMonitor {
    private final Logger logger;
    private static final long ERROR_CHECK_WINDOW_MILLIS = 120000;
    private static final int ERROR_CHECK_WINDOW_COUNT = 3;
    private static final long TOO_OLD_MILLIS = 3600000;
    private static final long DISABLE_DURATION_MILLIS = 60000;
    private Action1<String> slaveEnabler;
    private Action1<String> slaveDisabler;
    private long too_old_mills;
    private int error_check_window_count;
    private long error_check_window_millis;
    private long disableDurationMillis;
    private final Map<String, HostErrors> hostErrorMap;
    private static final String CHECK_HOST_TIMER_KEY = "CHECK_HOST";
    private Optional<MantisScheduler> mantisSchedulerOptional;
    AbstractActor.Receive initializedBehavior;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/scheduler/AgentsErrorMonitorActor$CheckHostHealthMessage.class */
    public static class CheckHostHealthMessage {
        long now;

        public CheckHostHealthMessage() {
            this.now = -1L;
        }

        public CheckHostHealthMessage(long j) {
            this.now = -1L;
            this.now = j;
        }

        public long getCurrentTime() {
            return this.now == -1 ? System.currentTimeMillis() : this.now;
        }
    }

    /* loaded from: input_file:io/mantisrx/master/scheduler/AgentsErrorMonitorActor$HostErrorMapRequest.class */
    static class HostErrorMapRequest {
        HostErrorMapRequest() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/scheduler/AgentsErrorMonitorActor$HostErrorMapResponse.class */
    public static class HostErrorMapResponse {
        private final Map<String, HostErrors> errorMap;

        public HostErrorMapResponse(Map<String, HostErrors> map) {
            this.errorMap = map;
        }

        public Map<String, HostErrors> getMap() {
            return this.errorMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/scheduler/AgentsErrorMonitorActor$HostErrors.class */
    public static class HostErrors {
        private static final Logger logger = LoggerFactory.getLogger(HostErrors.class);
        private final String hostname;
        private final Action1<String> slaveEnabler;
        private final long error_check_window_millis;
        private final int windowCount;
        private long lastActivityAt = System.currentTimeMillis();
        private final List<Long> errors = new ArrayList();

        HostErrors(String str, Action1<String> action1, long j, int i) {
            this.hostname = str;
            this.slaveEnabler = action1;
            this.error_check_window_millis = j;
            this.windowCount = i;
        }

        long getLastActivityAt() {
            return this.lastActivityAt;
        }

        boolean addAndGetIsTooManyErrors(LifecycleEventsProto.WorkerStatusEvent workerStatusEvent) {
            logger.info("InaddGetisTooManyErrors for host {}", this.hostname);
            this.lastActivityAt = workerStatusEvent.getTimestamp();
            if (WorkerState.isErrorState(workerStatusEvent.getWorkerState())) {
                this.errors.add(Long.valueOf(this.lastActivityAt));
                logger.info("Registering error {}", this.errors);
            } else if (workerStatusEvent.getWorkerState() == WorkerState.Started && !this.errors.isEmpty()) {
                this.errors.clear();
                logger.info("{} cleared of errors, reenabling host ", this.hostname);
                this.slaveEnabler.call(this.hostname);
            }
            Iterator<Long> it = this.errors.iterator();
            while (it.hasNext()) {
                if (this.lastActivityAt - it.next().longValue() > this.error_check_window_millis) {
                    it.remove();
                }
            }
            logger.info("No of errors in window is {} ", Integer.valueOf(this.errors.size()));
            return this.errors.size() > this.windowCount;
        }

        List<Long> getErrorTimestampList() {
            return Collections.unmodifiableList(this.errors);
        }
    }

    /* loaded from: input_file:io/mantisrx/master/scheduler/AgentsErrorMonitorActor$InitializeAgentsErrorMonitor.class */
    public static class InitializeAgentsErrorMonitor {
        private final MantisScheduler scheduler;

        public InitializeAgentsErrorMonitor(MantisScheduler mantisScheduler) {
            Preconditions.checkNotNull(mantisScheduler, "MantisScheduler cannot be null");
            this.scheduler = mantisScheduler;
        }

        public MantisScheduler getScheduler() {
            return this.scheduler;
        }
    }

    public static Props props(long j, int i, long j2, long j3) {
        return Props.create(AgentsErrorMonitorActor.class, new Object[]{Long.valueOf(j), Integer.valueOf(i), Long.valueOf(j2), Long.valueOf(j3)});
    }

    public static Props props() {
        return Props.create(AgentsErrorMonitorActor.class, new Object[]{Long.valueOf(TOO_OLD_MILLIS), Integer.valueOf(ERROR_CHECK_WINDOW_COUNT), Long.valueOf(ERROR_CHECK_WINDOW_MILLIS), Long.valueOf(DISABLE_DURATION_MILLIS)});
    }

    public AgentsErrorMonitorActor() {
        this(TOO_OLD_MILLIS, ERROR_CHECK_WINDOW_COUNT, ERROR_CHECK_WINDOW_MILLIS, DISABLE_DURATION_MILLIS);
    }

    public AgentsErrorMonitorActor(long j, int i, long j2, long j3) {
        this.logger = LoggerFactory.getLogger(AgentsErrorMonitorActor.class);
        this.slaveEnabler = str -> {
            this.logger.warn("SlaveEnabler not initialized yet!");
        };
        this.slaveDisabler = str2 -> {
            this.logger.warn("SlaveDisabler not initialized yet!");
        };
        this.hostErrorMap = new HashMap();
        this.mantisSchedulerOptional = Optional.empty();
        this.too_old_mills = j > 0 ? j : TOO_OLD_MILLIS;
        this.error_check_window_count = i > 0 ? i : ERROR_CHECK_WINDOW_COUNT;
        this.error_check_window_millis = j2 > 1000 ? j2 : ERROR_CHECK_WINDOW_MILLIS;
        this.disableDurationMillis = j3 > -1 ? j3 : DISABLE_DURATION_MILLIS;
        this.initializedBehavior = receiveBuilder().match(LifecycleEventsProto.WorkerStatusEvent.class, workerStatusEvent -> {
            onWorkerEvent(workerStatusEvent);
        }).match(CheckHostHealthMessage.class, checkHostHealthMessage -> {
            onCheckHostHealth();
        }).match(HostErrorMapRequest.class, hostErrorMapRequest -> {
            onHostErrorMapRequest();
        }).matchAny(obj -> {
            this.logger.warn("unexpected message '{}' received by AgentsErrorMonitorActor actor ", obj);
        }).build();
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(InitializeAgentsErrorMonitor.class, initializeAgentsErrorMonitor -> {
            onInitialize(initializeAgentsErrorMonitor);
        }).matchAny(obj -> {
            this.logger.warn("unexpected message '{}' received by AgentsErrorMonitorActor actor ", obj);
        }).build();
    }

    public void onInitialize(InitializeAgentsErrorMonitor initializeAgentsErrorMonitor) {
        this.mantisSchedulerOptional = Optional.of(initializeAgentsErrorMonitor.getScheduler());
        this.slaveDisabler = str -> {
            this.mantisSchedulerOptional.get().disableVM(str, this.disableDurationMillis);
        };
        this.slaveEnabler = str2 -> {
            this.mantisSchedulerOptional.get().enableVM(str2);
        };
        getContext().become(this.initializedBehavior);
        getTimers().startPeriodicTimer(CHECK_HOST_TIMER_KEY, new CheckHostHealthMessage(), Duration.create(this.error_check_window_millis, TimeUnit.MILLISECONDS));
    }

    @Override // io.mantisrx.master.scheduler.IAgentsErrorMonitor
    public void onCheckHostHealth() {
        Instant now = Instant.now();
        Iterator<HostErrors> it = this.hostErrorMap.values().iterator();
        while (it.hasNext()) {
            long epochMilli = now.toEpochMilli() - it.next().getLastActivityAt();
            if (epochMilli > this.too_old_mills) {
                this.logger.debug("No Events from host since {} evicting", Long.valueOf(epochMilli));
                it.remove();
            }
        }
    }

    @Override // io.mantisrx.master.scheduler.IAgentsErrorMonitor
    public void onWorkerEvent(LifecycleEventsProto.WorkerStatusEvent workerStatusEvent) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("onWorkerEvent " + workerStatusEvent + " is error state " + WorkerState.isErrorState(workerStatusEvent.getWorkerState()));
        }
        if (workerStatusEvent.getHostName().isPresent() && WorkerState.isErrorState(workerStatusEvent.getWorkerState())) {
            String str = workerStatusEvent.getHostName().get();
            this.logger.info("Registering worker error on host {}", str);
            if (this.hostErrorMap.computeIfAbsent(str, str2 -> {
                return new HostErrors(str2, this.slaveEnabler, this.error_check_window_millis, this.error_check_window_count);
            }).addAndGetIsTooManyErrors(workerStatusEvent)) {
                this.logger.warn("Host {} has too many errors in a short duration, disabling..", str);
                this.slaveDisabler.call(str);
            }
        }
    }

    @Override // io.mantisrx.master.scheduler.IAgentsErrorMonitor
    public void onHostErrorMapRequest() {
        getSender().tell(new HostErrorMapResponse(Collections.unmodifiableMap(this.hostErrorMap)), getSelf());
    }
}
