package io.streamthoughts.kafka.connect.filepulse.scanner;

import io.streamthoughts.kafka.connect.filepulse.clean.BatchFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.DelegateBatchFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResult;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResultSet;
import io.streamthoughts.kafka.connect.filepulse.clean.GenericFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.internal.KeyValuePair;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.FSDirectoryWalker;
import io.streamthoughts.kafka.connect.filepulse.source.SourceFile;
import io.streamthoughts.kafka.connect.filepulse.source.SourceMetadata;
import io.streamthoughts.kafka.connect.filepulse.source.SourceStatus;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.util.ConnectorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/scanner/LocalFileSystemScanner.class */
public class LocalFileSystemScanner implements FileSystemScanner {
    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemScanner.class);
    private static final long READ_CONFIG_ON_START_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30);
    private static final Comparator<SourceMetadata> BY_LAST_MODIFIED = Comparator.comparingLong((v0) -> {
        return v0.lastModified();
    });
    private final String sourceDirectoryPath;
    private final FSDirectoryWalker fsWalker;
    private final StateBackingStore<SourceFile> store;
    private final Map<String, SourceMetadata> scheduled = new ConcurrentHashMap();
    private LinkedBlockingQueue<SourceFile> completed = new LinkedBlockingQueue<>();
    private StateSnapshot<SourceFile> fileState;
    private final OffsetManager offsetManager;
    private final BatchFileCleanupPolicy cleaner;
    private ScanStatus status;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/scanner/LocalFileSystemScanner$ScanStatus.class */
    public enum ScanStatus {
        CREATED,
        READY,
        STARTED,
        STOPPED
    }

    public LocalFileSystemScanner(String str, FSDirectoryWalker fSDirectoryWalker, GenericFileCleanupPolicy genericFileCleanupPolicy, final OffsetManager offsetManager, StateBackingStore<SourceFile> stateBackingStore) {
        Objects.requireNonNull(fSDirectoryWalker, "fsWalker can't be null");
        Objects.requireNonNull(str, "scanDirectoryPath can't be null");
        Objects.requireNonNull(genericFileCleanupPolicy, "cleaner can't be null");
        this.sourceDirectoryPath = str;
        this.fsWalker = fSDirectoryWalker;
        if (genericFileCleanupPolicy instanceof FileCleanupPolicy) {
            this.cleaner = new DelegateBatchFileCleanupPolicy((FileCleanupPolicy) genericFileCleanupPolicy);
        } else {
            if (!(genericFileCleanupPolicy instanceof BatchFileCleanupPolicy)) {
                throw new IllegalArgumentException("Cleaner must be one of 'FileCleanupPolicy', 'BatchFileCleanupPolicy', or the variants that are consumer aware and/or Acknowledging not " + genericFileCleanupPolicy.getClass().getName());
            }
            this.cleaner = (BatchFileCleanupPolicy) genericFileCleanupPolicy;
        }
        this.offsetManager = offsetManager;
        this.store = stateBackingStore;
        this.status = ScanStatus.CREATED;
        LOG.info("Creating local filesystem scanner");
        this.store.setUpdateListener(new StateBackingStore.UpdateListener<SourceFile>() { // from class: io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.1
            @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore.UpdateListener
            public void onStateRemove(String str2) {
            }

            @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore.UpdateListener
            public void onStateUpdate(String str2, SourceFile sourceFile) {
                SourceStatus status = sourceFile.status();
                if (status.isOneOf(SourceStatus.completed())) {
                    LocalFileSystemScanner.this.completed.add(sourceFile);
                } else if (status.isOneOf(new SourceStatus[]{SourceStatus.CLEANED})) {
                    String partitionJson = offsetManager.toPartitionJson(sourceFile.metadata());
                    if (((SourceMetadata) LocalFileSystemScanner.this.scheduled.remove(partitionJson)) == null) {
                        LocalFileSystemScanner.LOG.warn("Received cleaned status but no file currently scheduled for partition : '{}', this warn should only occurred during recovering step", partitionJson);
                    }
                }
            }
        });
        if (this.store.isStarted()) {
            LOG.warn("The StateBackingStore used to synchronize this connector with tasks processing files is already started. You can ignore that warning if the connector  is recovering from a crash or resuming after being paused.");
        } else {
            this.store.start();
        }
        readStatesToEnd(READ_CONFIG_ON_START_TIMEOUT_MS);
        recoverPreviouslyCompletedSources();
        this.status = ScanStatus.READY;
        LOG.info("Finished initializing local filesystem scanner");
    }

    private void recoverPreviouslyCompletedSources() {
        LOG.info("Recovering completed files from a previous execution");
        this.fileState.states().values().stream().filter(sourceFile -> {
            return sourceFile.status().isOneOf(SourceStatus.completed());
        }).forEach(sourceFile2 -> {
            this.completed.add(sourceFile2);
        });
        LOG.info("Finished recovering previously completed files : " + this.completed);
    }

    private boolean readStatesToEnd(long j) {
        try {
            this.store.refresh(j, TimeUnit.MILLISECONDS);
            this.fileState = this.store.snapshot();
            LOG.debug("Finished reading to end of log and updated states snapshot, new states log position: {}", Long.valueOf(this.fileState.offset()));
            return true;
        } catch (TimeoutException e) {
            LOG.warn("Didn't reach end of states log quickly enough", e);
            return false;
        }
    }

    public void scan(ConnectorContext connectorContext) {
        cleanUpCompletedFiles();
        if (updateFiles()) {
            LOG.info("Requesting task reconfiguration");
            connectorContext.requestTaskReconfiguration();
        }
    }

    private void cleanUpCompletedFiles() {
        if (this.completed.isEmpty()) {
            return;
        }
        LOG.info("Cleaning up completed files '{}'", Integer.valueOf(this.completed.size()));
        ArrayList arrayList = new ArrayList(this.completed.size());
        this.completed.drainTo(arrayList);
        ((FileCleanupPolicyResultSet) this.cleaner.apply(arrayList)).forEach((sourceFile, fileCleanupPolicyResult) -> {
            if (fileCleanupPolicyResult.equals(FileCleanupPolicyResult.SUCCEED)) {
                this.store.put(this.offsetManager.toPartitionJson(sourceFile.metadata()), sourceFile.withStatus(SourceStatus.CLEANED));
            } else {
                LOG.info("Postpone clean up for file '{}'", sourceFile);
                this.completed.add(sourceFile);
            }
        });
        LOG.info("Finished cleaning all completed source files");
    }

    private synchronized boolean updateFiles() {
        if (!this.scheduled.isEmpty()) {
            LOG.info("Remaining in progress scheduled files: {}. Skip directory scan while waiting for tasks completion.", Integer.valueOf(this.scheduled.size()));
            return false;
        }
        LOG.info("Scanning local file system directory '{}'", this.sourceDirectoryPath);
        Collection<File> listFiles = this.fsWalker.listFiles(new File(this.sourceDirectoryPath));
        LOG.info("Completed scanned, number of files detected '{}' ", Integer.valueOf(listFiles.size()));
        if (!readStatesToEnd(TimeUnit.SECONDS.toMillis(5L))) {
            LOG.info("Finished scanning directory '{}'", this.sourceDirectoryPath);
            return false;
        }
        this.scheduled.putAll(toScheduled(listFiles, this.store.snapshot()));
        LOG.info("Finished lookup for new files : '{}' files selected", Integer.valueOf(this.scheduled.size()));
        notifyAll();
        return !this.scheduled.isEmpty() && this.status.equals(ScanStatus.STARTED);
    }

    private Map<String, SourceMetadata> toScheduled(Collection<File> collection, StateSnapshot<SourceFile> stateSnapshot) {
        return (Map) collection.stream().map(SourceMetadata::fromFile).map(sourceMetadata -> {
            return KeyValuePair.of(this.offsetManager.toPartitionJson(sourceMetadata), sourceMetadata);
        }).filter(keyValuePair -> {
            return maybeScheduled(stateSnapshot, (String) keyValuePair.key);
        }).collect(Collectors.toMap(keyValuePair2 -> {
            return (String) keyValuePair2.key;
        }, keyValuePair3 -> {
            return (SourceMetadata) keyValuePair3.value;
        }));
    }

    private boolean maybeScheduled(StateSnapshot<SourceFile> stateSnapshot, String str) {
        return !stateSnapshot.contains(str) || stateSnapshot.getForKey(str).status().isOneOf(SourceStatus.started());
    }

    public synchronized List<List<String>> partitionFilesAndGet(int i) {
        List<List<String>> groupPartitions;
        LOG.info("Retrieving source files to be scheduled found during last scan");
        long milliseconds = Time.SYSTEM.milliseconds();
        long j = milliseconds;
        while (true) {
            long j2 = j;
            if (!this.scheduled.isEmpty() || j2 - milliseconds >= 15000) {
                break;
            }
            try {
                LOG.info("No file to be scheduled, waiting for next input directory scan execution");
                wait(15000 - (j2 - milliseconds));
            } catch (InterruptedException e) {
            }
            j = Time.SYSTEM.milliseconds();
        }
        if (this.scheduled.isEmpty()) {
            LOG.warn("Directory could not be scanned quickly enough, or no file detected after connector started");
            groupPartitions = Collections.emptyList();
        } else {
            int min = Math.min(this.scheduled.size(), i);
            ArrayList arrayList = new ArrayList(this.scheduled.values());
            arrayList.sort(BY_LAST_MODIFIED);
            groupPartitions = ConnectorUtils.groupPartitions((List) arrayList.stream().map((v0) -> {
                return v0.absolutePath();
            }).collect(Collectors.toList()), min);
        }
        this.status = ScanStatus.STARTED;
        return groupPartitions;
    }

    public void close() {
        this.status = ScanStatus.STOPPED;
    }
}
