package io.pravega.common.io.filesystem;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.Thread;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/common/io/filesystem/FileModificationEventWatcher.class */
public class FileModificationEventWatcher extends Thread implements FileModificationMonitor {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log;
    private static final AtomicInteger THREAD_NUM;
    private final Path watchedFilePath;
    private final Consumer<WatchEvent<?>> callback;
    private final Thread.UncaughtExceptionHandler uncaughtExceptionalHandler;
    private final boolean loopContinuously;
    private boolean isWatchRegistered;
    static final /* synthetic */ boolean $assertionsDisabled;

    public FileModificationEventWatcher(@NonNull Path path, @NonNull Consumer<WatchEvent<?>> consumer) throws FileNotFoundException {
        this(path, consumer, true, true);
        if (path == null) {
            throw new NullPointerException("fileToWatch is marked non-null but is null");
        }
        if (consumer == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
    }

    @VisibleForTesting
    FileModificationEventWatcher(@NonNull Path path, @NonNull Consumer<WatchEvent<?>> consumer, boolean z, boolean z2) throws FileNotFoundException {
        super("pravega-file-watcher-" + THREAD_NUM.incrementAndGet());
        this.uncaughtExceptionalHandler = (thread, th) -> {
            logException(th);
        };
        this.isWatchRegistered = false;
        if (path == null) {
            throw new NullPointerException("fileToWatch is marked non-null but is null");
        }
        if (consumer == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        Exceptions.checkNotNullOrEmpty(path.toString(), "fileToWatch");
        if (z2 && !path.toFile().exists()) {
            throw new FileNotFoundException(String.format("File [%s] does not exist.", path));
        }
        this.watchedFilePath = path;
        this.callback = consumer;
        this.loopContinuously = z;
        setUncaughtExceptionHandler(this.uncaughtExceptionalHandler);
    }

    @VisibleForTesting
    String getWatchedFileName() {
        Path fileName = this.watchedFilePath.getFileName();
        if (fileName != null) {
            return fileName.toString();
        }
        throw new IllegalStateException("File name is null");
    }

    @VisibleForTesting
    Path getWatchedDirectory() {
        return this.watchedFilePath.getParent();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        WatchKey watchKey = null;
        WatchService watchService = null;
        try {
            try {
                WatchService newWatchService = FileSystems.getDefault().newWatchService();
                log.debug("Done creating watch service for watching file at path: {}", this.watchedFilePath);
                String watchedFileName = getWatchedFileName();
                Path watchedDirectory = getWatchedDirectory();
                log.debug("Directory being watched is {}", watchedDirectory);
                if (!$assertionsDisabled && watchedDirectory == null) {
                    throw new AssertionError();
                }
                watchedDirectory.register(newWatchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE);
                log.debug("Registered the watch for the file: {}", this.watchedFilePath);
                this.isWatchRegistered = true;
                while (true) {
                    if (Thread.currentThread().isInterrupted()) {
                        break;
                    }
                    try {
                        watchKey = retrieveWatchKeyFrom(newWatchService);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        logException(e);
                    }
                    if (watchKey != null) {
                        Optional<WatchEvent<?>> findAny = watchKey.pollEvents().stream().filter(watchEvent -> {
                            return watchEvent.context().toString().contains(watchedFileName);
                        }).findAny();
                        if (findAny.isPresent()) {
                            log.info("Detected that the file [{}] has modified", this.watchedFilePath);
                            this.callback.accept(findAny.get());
                        }
                        boolean reset = watchKey.reset();
                        log.debug("Done resetting watch key.");
                        if (!reset) {
                            log.info("No longer watching file [{}]", this.watchedFilePath);
                            break;
                        }
                    }
                    if (!this.loopContinuously) {
                        break;
                    }
                }
                if (watchKey != null) {
                    watchKey.cancel();
                }
                if (newWatchService != null) {
                    try {
                        newWatchService.close();
                    } catch (IOException e2) {
                        log.warn("Error closing watch service", e2);
                    }
                }
                log.info("Thread [{}], watching for modifications in file [{}] exiting,", getName(), this.watchedFilePath);
            } catch (Throwable th) {
                if (0 != 0) {
                    watchKey.cancel();
                }
                if (0 != 0) {
                    try {
                        watchService.close();
                    } catch (IOException e3) {
                        log.warn("Error closing watch service", e3);
                    }
                }
                throw th;
            }
        } catch (IOException e4) {
            logException(e4);
            throw new RuntimeException(e4);
        }
    }

    private WatchKey retrieveWatchKeyFrom(WatchService watchService) throws InterruptedException {
        WatchKey take = watchService.take();
        log.info("Retrieved and removed watch key for watching file at path: {}", this.watchedFilePath);
        Thread.sleep(200L);
        return take;
    }

    @Override // io.pravega.common.io.filesystem.FileModificationMonitor
    public void startMonitoring() {
        setDaemon(true);
        start();
        log.info("Completed setting up monitor for watching modifications to file: {}", this.watchedFilePath);
    }

    @VisibleForTesting
    boolean isWatchRegistered() {
        return this.isWatchRegistered;
    }

    @Override // io.pravega.common.io.filesystem.FileModificationMonitor
    public void stopMonitoring() {
        interrupt();
        log.info("Stopped the monitor that was watching modifications to file {}", this.watchedFilePath);
    }

    private void logException(Throwable th) {
        log.warn("Thread [{}], watching for modifications in file [{}], encountered exception with cause [{}]", new Object[]{getName(), this.watchedFilePath, th.getMessage()});
    }

    static {
        $assertionsDisabled = !FileModificationEventWatcher.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(FileModificationEventWatcher.class);
        THREAD_NUM = new AtomicInteger();
    }
}
