package com.spotify.helios.agent;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.spotify.helios.agent.AgentModel;
import com.spotify.helios.common.Json;
import com.spotify.helios.common.descriptors.Descriptor;
import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.Task;
import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.common.descriptors.TaskStatusEvent;
import com.spotify.helios.servicescommon.KafkaClientProvider;
import com.spotify.helios.servicescommon.KafkaRecord;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCache;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClientProvider;
import com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.curator.framework.state.ConnectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/helios/agent/ZooKeeperAgentModel.class */
public class ZooKeeperAgentModel extends AbstractIdleService implements AgentModel {
    private static final Logger log = LoggerFactory.getLogger(ZooKeeperAgentModel.class);
    private static final String TASK_CONFIG_FILENAME = "task-config.json";
    private static final String TASK_STATUS_FILENAME = "task-status.json";
    private final PersistentPathChildrenCache<Task> tasks;
    private final ZooKeeperUpdatingPersistentDirectory taskStatuses;
    private final TaskHistoryWriter historyWriter;
    private final KafkaSender kafkaSender;
    private final String agent;
    private final CopyOnWriteArrayList<AgentModel.Listener> listeners = new CopyOnWriteArrayList<>();

    /* loaded from: input_file:com/spotify/helios/agent/ZooKeeperAgentModel$JobsListener.class */
    private class JobsListener implements PersistentPathChildrenCache.Listener {
        private JobsListener() {
        }

        @Override // com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCache.Listener
        public void nodesChanged(PersistentPathChildrenCache<?> persistentPathChildrenCache) {
            ZooKeeperAgentModel.this.fireTasksUpdated();
        }

        @Override // com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCache.Listener
        public void connectionStateChanged(ConnectionState connectionState) {
        }
    }

    public ZooKeeperAgentModel(ZooKeeperClientProvider zooKeeperClientProvider, KafkaClientProvider kafkaClientProvider, String str, Path path, TaskHistoryWriter taskHistoryWriter) throws IOException, InterruptedException {
        ZooKeeperClient zooKeeperClient = zooKeeperClientProvider.get("ZooKeeperAgentModel_ctor");
        this.agent = (String) Preconditions.checkNotNull(str);
        this.tasks = zooKeeperClient.pathChildrenCache(Paths.configHostJobs(str), path.resolve(TASK_CONFIG_FILENAME), Json.type(Task.class));
        this.tasks.addListener(new JobsListener());
        this.taskStatuses = ZooKeeperUpdatingPersistentDirectory.create("agent-model-task-statuses", zooKeeperClientProvider, path.resolve(TASK_STATUS_FILENAME), Paths.statusHostJobs(str));
        this.historyWriter = taskHistoryWriter;
        this.kafkaSender = new KafkaSender(kafkaClientProvider.getDefaultProducer());
    }

    protected void startUp() throws Exception {
        this.tasks.startAsync().awaitRunning();
        this.taskStatuses.startAsync().awaitRunning();
        if (this.historyWriter != null) {
            this.historyWriter.startAsync().awaitRunning();
        }
    }

    protected void shutDown() throws Exception {
        this.tasks.stopAsync().awaitTerminated();
        this.taskStatuses.stopAsync().awaitTerminated();
        if (this.historyWriter != null) {
            this.historyWriter.stopAsync().awaitTerminated();
        }
    }

    private JobId jobIdFromTaskPath(String str) {
        return JobId.fromString(str.replaceFirst(Paths.configHostJobs(this.agent) + "/", ""));
    }

    @Override // com.spotify.helios.agent.AgentModel
    public Map<JobId, Task> getTasks() {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, Task> entry : this.tasks.getNodes().entrySet()) {
            newHashMap.put(jobIdFromTaskPath(entry.getKey()), entry.getValue());
        }
        return newHashMap;
    }

    @Override // com.spotify.helios.agent.AgentModel
    public Map<JobId, TaskStatus> getTaskStatuses() {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, byte[]> entry : this.taskStatuses.entrySet()) {
            try {
                newHashMap.put(JobId.fromString(entry.getKey()), (TaskStatus) Json.read(entry.getValue(), TaskStatus.class));
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
        return newHashMap;
    }

    @Override // com.spotify.helios.agent.AgentModel
    public void setTaskStatus(JobId jobId, TaskStatus taskStatus) throws InterruptedException {
        log.debug("setting task status: {}", taskStatus);
        this.taskStatuses.put(jobId.toString(), taskStatus.toJsonBytes());
        if (this.historyWriter != null) {
            try {
                this.historyWriter.saveHistoryItem(taskStatus);
            } catch (Exception e) {
                log.error("Error saving task status {} to ZooKeeper: {}", taskStatus, e);
            }
        }
        this.kafkaSender.send(KafkaRecord.of("HeliosTaskStatusEvents", new TaskStatusEvent(taskStatus, System.currentTimeMillis(), this.agent).toJsonBytes()));
    }

    @Override // com.spotify.helios.agent.AgentModel
    public TaskStatus getTaskStatus(JobId jobId) {
        byte[] bArr = this.taskStatuses.get(jobId.toString());
        if (bArr == null) {
            return null;
        }
        try {
            return Descriptor.parse(bArr, TaskStatus.class);
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // com.spotify.helios.agent.AgentModel
    public void removeTaskStatus(JobId jobId) throws InterruptedException {
        this.taskStatuses.remove((Object) jobId.toString());
    }

    @Override // com.spotify.helios.agent.AgentModel
    public void addListener(AgentModel.Listener listener) {
        this.listeners.add(listener);
        listener.tasksChanged(this);
    }

    @Override // com.spotify.helios.agent.AgentModel
    public void removeListener(AgentModel.Listener listener) {
        this.listeners.remove(listener);
    }

    protected void fireTasksUpdated() {
        Iterator<AgentModel.Listener> it = this.listeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().tasksChanged(this);
            } catch (Exception e) {
                log.error("listener threw exception", e);
            }
        }
    }
}
