package io.streamthoughts.kafka.connect.filepulse.scanner.local;

import io.streamthoughts.kafka.connect.filepulse.scanner.local.codec.CodecHandler;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.codec.CodecManager;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.CompositeFileListFilter;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/scanner/local/LocalFSDirectoryWalker.class */
public class LocalFSDirectoryWalker implements FSDirectoryWalker {
    private static final Logger LOG = LoggerFactory.getLogger(LocalFSDirectoryWalker.class);
    private FileListFilter filter;
    private CodecManager codecs;
    private LocalFSDirectoryWalkerConfig config;

    public LocalFSDirectoryWalker() {
        this(Collections.emptyList());
    }

    public LocalFSDirectoryWalker(List<FileListFilter> list) {
        Objects.requireNonNull(list, "filters can't be null");
        this.filter = new CompositeFileListFilter(list);
        this.codecs = new CodecManager();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.scanner.local.FSDirectoryWalker
    public void configure(Map<String, ?> map) {
        this.config = new LocalFSDirectoryWalkerConfig(map);
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.scanner.local.FSDirectoryWalker
    public Collection<File> listFiles(File file) throws IllegalArgumentException {
        List<File> listEligibleFiles = listEligibleFiles(file);
        return this.filter != null ? this.filter.filterFiles(listEligibleFiles) : listEligibleFiles;
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.scanner.local.FSDirectoryWalker
    public void setFilter(FileListFilter fileListFilter) {
        this.filter = fileListFilter;
    }

    private List<File> listEligibleFiles(File file) {
        LinkedList linkedList = new LinkedList();
        if (!isReadableAndNotHidden(file)) {
            if (!file.isHidden()) {
                LOG.warn("File doesn't exist or can't be read: {}", file.getAbsolutePath());
            }
            return linkedList;
        }
        LinkedList linkedList2 = new LinkedList();
        LinkedList linkedList3 = new LinkedList();
        try {
            DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(file.toPath());
            try {
                newDirectoryStream.forEach(path -> {
                    File file2 = path.toFile();
                    try {
                        if (file2.isFile()) {
                            CodecHandler codecIfCompressedOrNull = this.codecs.getCodecIfCompressedOrNull(file2);
                            if (codecIfCompressedOrNull != null) {
                                LOG.debug("Detecting compressed file : {}", file2.getCanonicalPath());
                                File decompress = codecIfCompressedOrNull.decompress(file2);
                                linkedList.addAll(listEligibleFiles(decompress));
                                linkedList2.add(decompress);
                            } else {
                                linkedList.add(file2);
                            }
                        } else {
                            linkedList3.add(file2);
                        }
                    } catch (IOException e) {
                        LOG.error("Skip input file {} - error while decompressing", file2.getName(), e);
                    }
                });
                if (newDirectoryStream != null) {
                    newDirectoryStream.close();
                }
                if (this.config.isRecursiveScanEnable()) {
                    linkedList.addAll((Collection) linkedList3.stream().filter(file2 -> {
                        return !linkedList2.contains(file2);
                    }).flatMap(file3 -> {
                        return listEligibleFiles(file3).stream();
                    }).collect(Collectors.toList()));
                }
                return linkedList;
            } finally {
            }
        } catch (IOException e) {
            LOG.warn("Error while listing directory {}: {}", file.getAbsolutePath(), e.getLocalizedMessage());
            throw new ConnectException(e);
        }
    }

    private boolean isReadableAndNotHidden(File file) {
        return file.exists() && file.canRead() && !file.isHidden();
    }
}
