/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.helios.agent;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.spotify.helios.agent.AgentModel;
import com.spotify.helios.agent.QueueingHistoryWriter;
import com.spotify.helios.common.Json;
import com.spotify.helios.common.descriptors.Descriptor;
import com.spotify.helios.common.descriptors.Goal;
import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.Task;
import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.servicescommon.coordination.Node;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCache;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClientProvider;
import com.spotify.helios.servicescommon.coordination.ZooKeeperPersistentNodeRemover;
import com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.curator.framework.state.ConnectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperAgentModel
extends AbstractIdleService
implements AgentModel {
    private static final Logger log = LoggerFactory.getLogger(ZooKeeperAgentModel.class);
    private static final String TASK_CONFIG_FILENAME = "task-config.json";
    private static final String TASK_HISTORY_FILENAME = "task-history.json";
    private static final String TASK_STATUS_FILENAME = "task-status.json";
    private static final String TASK_REMOVER_FILENAME = "remove.json";
    private static final Predicate<Node> TASK_GOAL_IS_UNDEPLOY = new TaskGoalIsUndeployPredicate();
    private final PersistentPathChildrenCache<Task> tasks;
    private final ZooKeeperUpdatingPersistentDirectory taskStatuses;
    private final ZooKeeperPersistentNodeRemover taskRemover;
    private final QueueingHistoryWriter historyWriter;
    private final String agent;
    private final CopyOnWriteArrayList<AgentModel.Listener> listeners = new CopyOnWriteArrayList();

    public ZooKeeperAgentModel(ZooKeeperClientProvider provider, String host, Path stateDirectory) throws IOException, InterruptedException {
        ZooKeeperClient client = provider.get("ZooKeeperAgentModel_ctor");
        this.agent = (String)Preconditions.checkNotNull((Object)host);
        Path taskConfigFile = stateDirectory.resolve(TASK_CONFIG_FILENAME);
        this.tasks = client.pathChildrenCache(Paths.configHostJobs(host), taskConfigFile, Json.type(Task.class));
        this.tasks.addListener(new JobsListener());
        Path taskStatusFile = stateDirectory.resolve(TASK_STATUS_FILENAME);
        this.taskStatuses = ZooKeeperUpdatingPersistentDirectory.create("agent-model-task-statuses", provider, taskStatusFile, Paths.statusHostJobs(host));
        Path removerFile = stateDirectory.resolve(TASK_REMOVER_FILENAME);
        this.taskRemover = ZooKeeperPersistentNodeRemover.create("agent-model-task-remover", provider, removerFile, TASK_GOAL_IS_UNDEPLOY, true);
        this.historyWriter = new QueueingHistoryWriter(host, client, stateDirectory.resolve(TASK_HISTORY_FILENAME));
    }

    protected void startUp() throws Exception {
        this.tasks.startAsync().awaitRunning();
        this.taskStatuses.startAsync().awaitRunning();
        this.taskRemover.startAsync().awaitRunning();
        this.historyWriter.startAsync().awaitRunning();
    }

    protected void shutDown() throws Exception {
        this.tasks.stopAsync().awaitTerminated();
        this.taskStatuses.stopAsync().awaitTerminated();
        this.taskRemover.stopAsync().awaitTerminated();
        this.historyWriter.stopAsync().awaitTerminated();
    }

    private JobId jobIdFromTaskPath(String path) {
        String prefix = Paths.configHostJobs(this.agent) + "/";
        return JobId.fromString((String)path.replaceFirst(prefix, ""));
    }

    @Override
    public Map<JobId, Task> getTasks() {
        HashMap tasks = Maps.newHashMap();
        for (Map.Entry<String, Task> entry : this.tasks.getNodes().entrySet()) {
            JobId id = this.jobIdFromTaskPath(entry.getKey());
            tasks.put(id, entry.getValue());
        }
        return tasks;
    }

    @Override
    public Map<JobId, TaskStatus> getTaskStatuses() {
        HashMap statuses = Maps.newHashMap();
        for (Map.Entry<String, byte[]> entry : this.taskStatuses.entrySet()) {
            try {
                JobId id = JobId.fromString((String)entry.getKey());
                TaskStatus status = (TaskStatus)Json.read((byte[])entry.getValue(), TaskStatus.class);
                statuses.put(id, status);
            }
            catch (IOException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }
        return statuses;
    }

    @Override
    public void setTaskStatus(JobId jobId, TaskStatus status) throws InterruptedException {
        log.debug("setting task status: {}", (Object)status);
        this.taskStatuses.put(jobId.toString(), status.toJsonBytes());
        this.historyWriter.saveHistoryItem(jobId, status);
    }

    @Override
    public TaskStatus getTaskStatus(JobId jobId) {
        byte[] data = this.taskStatuses.get(jobId.toString());
        if (data == null) {
            return null;
        }
        try {
            return (TaskStatus)Descriptor.parse((byte[])data, TaskStatus.class);
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    @Override
    public void removeTaskStatus(JobId jobId) throws InterruptedException {
        this.taskStatuses.remove((Object)jobId.toString());
    }

    @Override
    public void removeUndeployTombstone(JobId jobId) throws InterruptedException {
        String path = Paths.configHostJob(this.agent, jobId);
        this.taskRemover.remove(path);
    }

    @Override
    public void addListener(AgentModel.Listener listener) {
        this.listeners.add(listener);
        listener.tasksChanged(this);
    }

    @Override
    public void removeListener(AgentModel.Listener listener) {
        this.listeners.remove(listener);
    }

    protected void fireTasksUpdated() {
        for (AgentModel.Listener listener : this.listeners) {
            try {
                listener.tasksChanged(this);
            }
            catch (Exception e) {
                log.error("listener threw exception", (Throwable)e);
            }
        }
    }

    private static class TaskGoalIsUndeployPredicate
    implements Predicate<Node> {
        private TaskGoalIsUndeployPredicate() {
        }

        public boolean apply(Node node) {
            assert (node != null);
            try {
                Task task = (Task)Descriptor.parse((byte[])node.getBytes(), Task.class);
                return task.getGoal() == Goal.UNDEPLOY;
            }
            catch (IOException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }
    }

    private class JobsListener
    implements PersistentPathChildrenCache.Listener {
        private JobsListener() {
        }

        @Override
        public void nodesChanged(PersistentPathChildrenCache<?> cache) {
            ZooKeeperAgentModel.this.fireTasksUpdated();
        }

        @Override
        public void connectionStateChanged(ConnectionState state) {
        }
    }
}

