/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.helios.agent;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.spotify.helios.agent.AgentModel;
import com.spotify.helios.agent.Execution;
import com.spotify.helios.agent.PortAllocator;
import com.spotify.helios.agent.Reaper;
import com.spotify.helios.agent.Supervisor;
import com.spotify.helios.agent.SupervisorFactory;
import com.spotify.helios.common.descriptors.Goal;
import com.spotify.helios.common.descriptors.Job;
import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.Task;
import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.servicescommon.PersistentAtomicReference;
import com.spotify.helios.servicescommon.Reactor;
import com.spotify.helios.servicescommon.ReactorFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Agent
extends AbstractIdleService {
    public static final Map<JobId, Execution> EMPTY_EXECUTIONS = Collections.emptyMap();
    private static final Logger log = LoggerFactory.getLogger(Agent.class);
    private static final long UPDATE_INTERVAL = TimeUnit.SECONDS.toMillis(30L);
    private static final Predicate<Execution> PORT_ALLOCATION_PENDING = new Predicate<Execution>(){

        public boolean apply(Execution execution) {
            assert (execution != null);
            return execution.getGoal() != Goal.UNDEPLOY && execution.getPorts() == null;
        }
    };
    private static final Predicate<Execution> PORTS_ALLOCATED = new Predicate<Execution>(){

        public boolean apply(Execution execution) {
            assert (execution != null);
            return execution.getPorts() != null;
        }
    };
    private final AgentModel model;
    private final SupervisorFactory supervisorFactory;
    private final ModelListener modelListener = new ModelListener();
    private final Supervisor.Listener supervisorListener = new SupervisorListener();
    private final Map<JobId, Supervisor> supervisors = Maps.newHashMap();
    private final Reactor reactor;
    private final PersistentAtomicReference<Map<JobId, Execution>> executions;
    private final PortAllocator portAllocator;
    private final Reaper reaper;

    public Agent(AgentModel model, SupervisorFactory supervisorFactory, ReactorFactory reactorFactory, PersistentAtomicReference<Map<JobId, Execution>> executions, PortAllocator portAllocator, Reaper reaper) {
        this.model = (AgentModel)Preconditions.checkNotNull((Object)model, (Object)"model");
        this.supervisorFactory = (SupervisorFactory)Preconditions.checkNotNull((Object)supervisorFactory, (Object)"supervisorFactory");
        this.executions = (PersistentAtomicReference)Preconditions.checkNotNull(executions, (Object)"executions");
        this.portAllocator = (PortAllocator)Preconditions.checkNotNull((Object)portAllocator, (Object)"portAllocator");
        this.reactor = (Reactor)Preconditions.checkNotNull((Object)reactorFactory.create("agent", new Update(), UPDATE_INTERVAL), (Object)"reactor");
        this.reaper = (Reaper)Preconditions.checkNotNull((Object)reaper, (Object)"reaper");
    }

    protected void startUp() throws Exception {
        for (Map.Entry<JobId, Execution> entry : this.executions.get().entrySet()) {
            Execution execution = entry.getValue();
            Job job = execution.getJob();
            if (execution.getPorts() == null) continue;
            this.createSupervisor(job, execution.getPorts());
        }
        this.model.addListener(this.modelListener);
        this.reactor.startAsync().awaitRunning();
        this.reactor.signal();
    }

    protected void shutDown() throws Exception {
        this.reactor.stopAsync().awaitTerminated();
        for (Supervisor supervisor : this.supervisors.values()) {
            supervisor.close();
            supervisor.join();
        }
    }

    private Supervisor createSupervisor(Job job, Map<String, Integer> portAllocation) {
        log.debug("creating job supervisor: {}", (Object)job);
        TaskStatus taskStatus = this.model.getTaskStatus(job.getId());
        String containerId = taskStatus == null ? null : taskStatus.getContainerId();
        Supervisor supervisor = this.supervisorFactory.create(job, containerId, portAllocation, this.supervisorListener);
        this.supervisors.put(job.getId(), supervisor);
        return supervisor;
    }

    private class Update
    implements Reactor.Callback {
        private Update() {
        }

        @Override
        public void run(boolean timeout) throws InterruptedException {
            Supervisor supervisor;
            Execution execution;
            Agent.this.reaper.reap(new Supplier<Set<String>>(){

                public Set<String> get() {
                    HashSet active = Sets.newHashSet();
                    for (Supervisor supervisor : Agent.this.supervisors.values()) {
                        String containerId = supervisor.containerId();
                        if (containerId == null) continue;
                        active.add(containerId);
                    }
                    return active;
                }
            });
            Map<JobId, Task> tasks = Agent.this.model.getTasks();
            log.debug("tasks: {}", tasks);
            log.debug("executions: {}", Agent.this.executions.get());
            log.debug("supervisors: {}", (Object)Agent.this.supervisors);
            HashMap newExecutions = Maps.newHashMap((Map)((Map)Agent.this.executions.get()));
            for (Map.Entry<JobId, Task> entry : tasks.entrySet()) {
                JobId jobId = entry.getKey();
                Task task = entry.getValue();
                Execution existing = (Execution)newExecutions.get(jobId);
                if (existing != null) {
                    if (existing.getGoal() == task.getGoal()) continue;
                    execution = existing.withGoal(task.getGoal());
                    newExecutions.put(jobId, execution);
                    continue;
                }
                newExecutions.put(jobId, Execution.of(task.getJob()).withGoal(task.getGoal()));
            }
            for (Map.Entry<Object, Object> entry : newExecutions.entrySet()) {
                JobId jobId = (JobId)entry.getKey();
                Execution execution2 = (Execution)entry.getValue();
                if (tasks.containsKey(jobId)) continue;
                log.debug("Setting UNDEPLOY goal for removed job: {}", (Object)execution2.getJob());
                entry.setValue(execution2.withGoal(Goal.UNDEPLOY));
            }
            ImmutableMap pending = ImmutableMap.copyOf((Map)Maps.filterValues((Map)newExecutions, (Predicate)PORT_ALLOCATION_PENDING));
            if (!pending.isEmpty()) {
                ImmutableSet.Builder builder = ImmutableSet.builder();
                Map map = Maps.filterValues((Map)newExecutions, (Predicate)PORTS_ALLOCATED);
                for (Map.Entry entry : map.entrySet()) {
                    builder.addAll(((Execution)entry.getValue()).getPorts().values());
                }
                for (Map.Entry entry : pending.entrySet()) {
                    JobId jobId3 = (JobId)entry.getKey();
                    Execution execution3 = (Execution)entry.getValue();
                    Job job = execution3.getJob();
                    Map<String, Integer> ports = Agent.this.portAllocator.allocate(job.getPorts(), (Set<Integer>)builder.build());
                    log.debug("Allocated ports for job {}: {}", (Object)jobId3, ports);
                    if (ports != null) {
                        newExecutions.put(jobId3, execution3.withPorts(ports));
                        builder.addAll(ports.values());
                        continue;
                    }
                    log.warn("Unable to allocate ports for job: {}", (Object)job);
                }
            }
            if (!newExecutions.equals(Agent.this.executions.get())) {
                Agent.this.executions.setUnchecked(ImmutableMap.copyOf((Map)newExecutions));
            }
            for (Map.Entry entry : ImmutableSet.copyOf(Agent.this.supervisors.entrySet())) {
                JobId jobId = (JobId)entry.getKey();
                supervisor = (Supervisor)entry.getValue();
                if (!supervisor.isStopping() || !supervisor.isDone()) continue;
                log.debug("releasing stopped supervisor: {}", (Object)jobId);
                Agent.this.supervisors.remove(jobId);
                supervisor.close();
                Agent.this.reactor.signal();
            }
            for (Map.Entry entry : ((Map)Agent.this.executions.get()).entrySet()) {
                JobId jobId = (JobId)entry.getKey();
                Execution execution4 = (Execution)entry.getValue();
                Supervisor supervisor2 = (Supervisor)Agent.this.supervisors.get(jobId);
                if (supervisor2 != null || execution4.getGoal() != Goal.START || execution4.getPorts() == null) continue;
                Agent.this.createSupervisor(execution4.getJob(), execution4.getPorts());
            }
            for (Map.Entry entry : Agent.this.supervisors.entrySet()) {
                JobId jobId = (JobId)entry.getKey();
                supervisor = (Supervisor)entry.getValue();
                execution = (Execution)((Map)Agent.this.executions.get()).get(jobId);
                supervisor.setGoal(execution.getGoal());
            }
            HashSet hashSet = Sets.newHashSet();
            for (Map.Entry entry : ((Map)Agent.this.executions.get()).entrySet()) {
                Supervisor supervisor3;
                JobId jobId4 = (JobId)entry.getKey();
                execution = (Execution)entry.getValue();
                if (execution.getGoal() != Goal.UNDEPLOY || (supervisor3 = (Supervisor)Agent.this.supervisors.get(jobId4)) != null) continue;
                hashSet.add(jobId4);
                log.debug("Removing task: {}", (Object)jobId4);
                Agent.this.model.removeTaskStatus(jobId4);
            }
            if (!hashSet.isEmpty()) {
                Map map = Maps.filterKeys((Map)((Map)Agent.this.executions.get()), (Predicate)Predicates.not((Predicate)Predicates.in((Collection)hashSet)));
                Agent.this.executions.setUnchecked(ImmutableMap.copyOf((Map)map));
            }
        }
    }

    private class SupervisorListener
    implements Supervisor.Listener {
        private SupervisorListener() {
        }

        @Override
        public void stateChanged(Supervisor supervisor) {
            Agent.this.reactor.signal();
        }
    }

    private class ModelListener
    implements AgentModel.Listener {
        private ModelListener() {
        }

        @Override
        public void tasksChanged(AgentModel model) {
            Agent.this.reactor.signal();
        }
    }
}

