/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateDirectory {
    private static final Pattern PATH_NAME = Pattern.compile("\\d+_\\d+");
    static final String LOCK_FILE_NAME = ".lock";
    private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
    private final File stateDir;
    private final boolean createStateDirectory;
    private final HashMap<TaskId, FileChannel> channels = new HashMap();
    private final HashMap<TaskId, LockAndOwner> locks = new HashMap();
    private final Time time;
    private FileChannel globalStateChannel;
    private FileLock globalStateLock;

    public StateDirectory(StreamsConfig config, Time time, boolean createStateDirectory) {
        this.time = time;
        this.createStateDirectory = createStateDirectory;
        String stateDirName = config.getString("state.dir");
        File baseDir = new File(stateDirName);
        if (this.createStateDirectory && !baseDir.exists() && !baseDir.mkdirs()) {
            throw new ProcessorStateException(String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
        }
        this.stateDir = new File(baseDir, config.getString("application.id"));
        if (this.createStateDirectory && !this.stateDir.exists() && !this.stateDir.mkdir()) {
            throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", this.stateDir.getPath()));
        }
    }

    public File directoryForTask(TaskId taskId) {
        File taskDir = new File(this.stateDir, taskId.toString());
        if (this.createStateDirectory && !taskDir.exists() && !taskDir.mkdir()) {
            throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
        }
        return taskDir;
    }

    File globalStateDir() {
        File dir = new File(this.stateDir, "global");
        if (this.createStateDirectory && !dir.exists() && !dir.mkdir()) {
            throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
        }
        return dir;
    }

    private String logPrefix() {
        return String.format("stream-thread [%s]", Thread.currentThread().getName());
    }

    synchronized boolean lock(TaskId taskId) throws IOException {
        FileChannel channel;
        File lockFile;
        if (!this.createStateDirectory) {
            return true;
        }
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            log.trace("{} Found cached state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
            return true;
        }
        if (lockAndOwner != null) {
            return false;
        }
        try {
            lockFile = new File(this.directoryForTask(taskId), LOCK_FILE_NAME);
        }
        catch (ProcessorStateException e) {
            return false;
        }
        try {
            channel = this.getOrCreateFileChannel(taskId, lockFile.toPath());
        }
        catch (NoSuchFileException e) {
            return false;
        }
        FileLock lock = this.tryLock(channel);
        if (lock != null) {
            this.locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));
            log.debug("{} Acquired state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
        }
        return lock != null;
    }

    synchronized boolean lockGlobalState() throws IOException {
        FileChannel channel;
        if (!this.createStateDirectory) {
            return true;
        }
        if (this.globalStateLock != null) {
            log.trace("{} Found cached state dir lock for the global task", (Object)this.logPrefix());
            return true;
        }
        File lockFile = new File(this.globalStateDir(), LOCK_FILE_NAME);
        try {
            channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        }
        catch (NoSuchFileException e) {
            return false;
        }
        FileLock fileLock = this.tryLock(channel);
        if (fileLock == null) {
            channel.close();
            return false;
        }
        this.globalStateChannel = channel;
        this.globalStateLock = fileLock;
        log.debug("{} Acquired global state dir lock", (Object)this.logPrefix());
        return true;
    }

    synchronized void unlockGlobalState() throws IOException {
        if (this.globalStateLock == null) {
            return;
        }
        this.globalStateLock.release();
        this.globalStateChannel.close();
        this.globalStateLock = null;
        this.globalStateChannel = null;
        log.debug("{} Released global state dir lock", (Object)this.logPrefix());
    }

    synchronized void unlock(TaskId taskId) throws IOException {
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            this.locks.remove(taskId);
            lockAndOwner.lock.release();
            log.debug("{} Released state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
            FileChannel fileChannel = this.channels.remove(taskId);
            if (fileChannel != null) {
                fileChannel.close();
            }
        }
    }

    public synchronized void clean() {
        try {
            this.cleanRemovedTasks(0L, true);
        }
        catch (Exception e) {
            throw new StreamsException(e);
        }
        try {
            if (this.stateDir.exists()) {
                Utils.delete(this.globalStateDir().getAbsoluteFile());
            }
        }
        catch (IOException e) {
            log.error("{} Failed to delete global state directory due to an unexpected exception", (Object)this.logPrefix(), (Object)e);
            throw new StreamsException(e);
        }
    }

    public synchronized void cleanRemovedTasks(long cleanupDelayMs) {
        try {
            this.cleanRemovedTasks(cleanupDelayMs, false);
        }
        catch (Exception cannotHappen) {
            throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void cleanRemovedTasks(long cleanupDelayMs, boolean manualUserCall) throws Exception {
        File[] taskDirs = this.listTaskDirectories();
        if (taskDirs == null || taskDirs.length == 0) {
            return;
        }
        for (File taskDir : taskDirs) {
            String dirName = taskDir.getName();
            TaskId id = TaskId.parse(dirName);
            if (this.locks.containsKey(id)) continue;
            try {
                long lastModifiedMs;
                long now;
                if (!this.lock(id) || (now = this.time.milliseconds()) <= (lastModifiedMs = taskDir.lastModified()) + cleanupDelayMs && !manualUserCall) continue;
                if (!manualUserCall) {
                    log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", this.logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
                } else {
                    log.info("{} Deleting state directory {} for task {} as user calling cleanup.", this.logPrefix(), dirName, id);
                }
                Utils.delete(taskDir);
            }
            catch (OverlappingFileLockException e) {
                if (!manualUserCall) continue;
                log.error("{} Failed to get the state directory lock.", (Object)this.logPrefix(), (Object)e);
                throw e;
            }
            catch (IOException e) {
                log.error("{} Failed to delete the state directory.", (Object)this.logPrefix(), (Object)e);
                if (!manualUserCall) continue;
                throw e;
            }
            finally {
                block19: {
                    try {
                        this.unlock(id);
                    }
                    catch (IOException e) {
                        log.error("{} Failed to release the state directory lock.", (Object)this.logPrefix());
                        if (!manualUserCall) break block19;
                        throw e;
                    }
                }
            }
        }
    }

    File[] listTaskDirectories() {
        return !this.stateDir.exists() ? new File[]{} : this.stateDir.listFiles(pathname -> pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches());
    }

    private FileChannel getOrCreateFileChannel(TaskId taskId, Path lockPath) throws IOException {
        if (!this.channels.containsKey(taskId)) {
            this.channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
        }
        return this.channels.get(taskId);
    }

    private FileLock tryLock(FileChannel channel) throws IOException {
        try {
            return channel.tryLock();
        }
        catch (OverlappingFileLockException e) {
            return null;
        }
    }

    private static class LockAndOwner {
        final FileLock lock;
        final String owningThread;

        LockAndOwner(String owningThread, FileLock lock) {
            this.owningThread = owningThread;
            this.lock = lock;
        }
    }
}

