/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.styx;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.spotify.styx.Yaml;
import com.spotify.styx.YamlScheduleDefinition;
import com.spotify.styx.model.DataEndpoint;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.schedule.ScheduleSource;
import com.sun.nio.file.SensitivityWatchEventModifier;
import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LocalFileScheduleSource
implements ScheduleSource {
    private static final WatchEvent.Kind<?>[] EVENTS = new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE};
    private static final String LOCAL_DIR_CONFIG_KEY = "styx.source.local.dir";
    private static final long POLL_TIMEOUT_MILLIS = 100L;
    private static final Logger LOG = LoggerFactory.getLogger(LocalFileScheduleSource.class);
    private final Config config;
    private final Closer closer;
    private final ScheduledExecutorService executor;
    private final Consumer<Workflow> changeListener;
    private final Consumer<Workflow> removeListener;
    private final Map<String, Set<Workflow>> workflows = Maps.newHashMap();
    private volatile boolean running;

    LocalFileScheduleSource(Config config, Closer closer, ScheduledExecutorService executor, Consumer<Workflow> changeListener, Consumer<Workflow> removeListener) {
        this.config = Objects.requireNonNull(config);
        this.closer = Objects.requireNonNull(closer);
        this.executor = Objects.requireNonNull(executor);
        this.changeListener = Objects.requireNonNull(changeListener);
        this.removeListener = Objects.requireNonNull(removeListener);
    }

    public void start() {
        WatchService watcher;
        Stream<Path> list;
        Path path;
        if (!this.config.hasPath(LOCAL_DIR_CONFIG_KEY)) {
            LOG.error("Configuration key '{}' not set", (Object)LOCAL_DIR_CONFIG_KEY);
            throw new RuntimeException("Can't load local file schedule source: not configured");
        }
        String sourceDir = this.config.getString(LOCAL_DIR_CONFIG_KEY);
        try {
            path = Paths.get(sourceDir, new String[0]);
        }
        catch (InvalidPathException e) {
            LOG.error("Invalid path: {}", (Object)sourceDir, (Object)e);
            throw new RuntimeException("Can't load local file schedule source: invalid path", e);
        }
        try {
            list = Files.list(path);
        }
        catch (IOException e) {
            LOG.error("Failed to List: {}", (Object)sourceDir, (Object)e);
            throw new RuntimeException("Can't load local file schedule source: initial listing failed", e);
        }
        list.filter(this::isYamlFile).forEach(this::readFile);
        try {
            watcher = FileSystems.getDefault().newWatchService();
            path.register(watcher, EVENTS, SensitivityWatchEventModifier.HIGH);
        }
        catch (IOException e) {
            LOG.error("Could not watch: {}", (Object)path, (Object)e);
            throw new RuntimeException("Can't load local file schedule source", e);
        }
        this.running = true;
        this.closer.register(() -> {
            this.running = false;
        });
        this.executor.submit(() -> this.poll(path, watcher));
    }

    private void poll(Path watchPath, WatchService watchService) {
        LOG.info("Watching {} for schedule definitions", (Object)watchPath);
        try {
            while (this.running) {
                WatchKey key = watchService.poll(100L, TimeUnit.MILLISECONDS);
                if (key == null) continue;
                for (WatchEvent<?> event : key.pollEvents()) {
                    WatchEvent pathEvent;
                    Path file;
                    WatchEvent.Kind<?> kind = event.kind();
                    if (kind == StandardWatchEventKinds.OVERFLOW || !this.isYamlFile(file = watchPath.resolve((Path)(pathEvent = LocalFileScheduleSource.cast(event)).context()))) continue;
                    String componentId = this.componentId(file);
                    LOG.debug("{} event for component {}, from file {}", new Object[]{kind, componentId, file});
                    if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
                        this.readFile(file);
                    }
                    if (kind != StandardWatchEventKinds.ENTRY_DELETE) continue;
                    Set<Workflow> deleted = this.workflows.getOrDefault(componentId, Collections.emptySet());
                    deleted.forEach(this.removeListener);
                    deleted.clear();
                }
                boolean valid = key.reset();
                if (valid) continue;
                break;
            }
        }
        catch (InterruptedException e) {
            LOG.warn("interrupted", (Throwable)e);
        }
        LOG.info("Stopped watching {}", (Object)watchPath);
    }

    private boolean isYamlFile(Path file) {
        String fileName = file.getFileName().toString();
        return fileName.endsWith(".yaml") || fileName.endsWith(".yml");
    }

    private void readFile(Path file) {
        try {
            for (Workflow workflow : this.readWorkflows(file)) {
                this.workflows.computeIfAbsent(workflow.componentId(), k -> Sets.newHashSet()).add(workflow);
                this.changeListener.accept(workflow);
            }
        }
        catch (IOException e) {
            LOG.warn("Failed to read schedule definition {}", (Object)file, (Object)e);
        }
    }

    private List<Workflow> readWorkflows(Path path) throws IOException {
        byte[] bytes = Files.readAllBytes(path);
        LOG.debug("Read yaml file \n{}", (Object)ByteString.of((byte[])bytes).utf8());
        YamlScheduleDefinition definitions = Yaml.parseScheduleDefinition(bytes);
        LOG.debug("Parsed schedule definitions: {}", (Object)definitions);
        String componentId = this.componentId(path);
        URI componentUri = path.toUri();
        return definitions.schedules().stream().map(schedule -> Workflow.create((String)componentId, (URI)componentUri, (DataEndpoint)schedule)).collect(Collectors.toList());
    }

    private String componentId(Path path) {
        return path.getFileName().toString();
    }

    private static <T> WatchEvent<T> cast(WatchEvent<?> event) {
        return event;
    }
}

