package io.streamthoughts.kafka.connect.filepulse.fs;

import io.streamthoughts.kafka.connect.filepulse.clean.BatchFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.DelegateBatchFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResult;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResultSet;
import io.streamthoughts.kafka.connect.filepulse.clean.GenericFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectKey;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitor.class */
public class DefaultFileSystemMonitor implements FileSystemMonitor {
    private static final long TASK_CONFIGURATION_DEFAULT_TIMEOUT = 15000;
    private static final int MAX_SCHEDULE_ATTEMPTS = 3;
    private final FileSystemListing<?> fsListing;
    private final StateBackingStore<FileObject> store;
    private StateSnapshot<FileObject> fileState;
    private final SourceOffsetPolicy offsetPolicy;
    private final BatchFileCleanupPolicy cleaner;
    private final Long allowTasksReconfigurationAfterTimeoutMs;
    private final Predicate<FileObjectStatus> cleanablePredicate;
    private static final Logger LOG = LoggerFactory.getLogger(DefaultFileSystemMonitor.class);
    private static final Duration ON_START_READ_END_LOG_TIMEOUT = Duration.ofSeconds(30);
    private static final Duration DEFAULT_READ_END_LOG_TIMEOUT = Duration.ofSeconds(5);
    private static final Comparator<FileObjectMeta> BY_LAST_MODIFIED = Comparator.comparingLong((v0) -> {
        return v0.lastModified();
    });
    private final Map<FileObjectKey, FileObjectMeta> scheduled = new ConcurrentHashMap();
    private final Map<FileObjectKey, FileObjectMeta> scanned = new ConcurrentHashMap();
    private final LinkedBlockingQueue<FileObject> cleanable = new LinkedBlockingQueue<>();
    private Long nextAllowedTasksReconfiguration = -1L;
    private final AtomicBoolean taskReconfigurationRequested = new AtomicBoolean(false);
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicBoolean changed = new AtomicBoolean(false);
    private final AtomicBoolean fileSystemListingEnabled = new AtomicBoolean(true);

    public DefaultFileSystemMonitor(Long l, FileSystemListing<?> fileSystemListing, GenericFileCleanupPolicy genericFileCleanupPolicy, final Predicate<FileObjectStatus> predicate, SourceOffsetPolicy sourceOffsetPolicy, StateBackingStore<FileObject> stateBackingStore) {
        Objects.requireNonNull(fileSystemListing, "'fsListening' should not be null");
        Objects.requireNonNull(genericFileCleanupPolicy, "'cleanPolicy' should not be null");
        Objects.requireNonNull(sourceOffsetPolicy, "'offsetPolicy' should not be null");
        Objects.requireNonNull(stateBackingStore, "'store' should not null");
        Objects.requireNonNull(predicate, "'cleanablePredicate' should not null");
        this.fsListing = fileSystemListing;
        this.allowTasksReconfigurationAfterTimeoutMs = l;
        this.cleanablePredicate = predicate;
        if (genericFileCleanupPolicy instanceof FileCleanupPolicy) {
            this.cleaner = new DelegateBatchFileCleanupPolicy((FileCleanupPolicy) genericFileCleanupPolicy);
        } else {
            if (!(genericFileCleanupPolicy instanceof BatchFileCleanupPolicy)) {
                throw new IllegalArgumentException("Cleaner must be one of 'FileCleanupPolicy', 'BatchFileCleanupPolicy' not " + genericFileCleanupPolicy.getClass().getName());
            }
            this.cleaner = (BatchFileCleanupPolicy) genericFileCleanupPolicy;
        }
        this.cleaner.setStorage(fileSystemListing.storage());
        this.offsetPolicy = sourceOffsetPolicy;
        this.store = stateBackingStore;
        LOG.info("Initializing FileSystemMonitor");
        this.store.setUpdateListener(new StateBackingStore.UpdateListener<FileObject>() { // from class: io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor.1
            public void onStateRemove(String str) {
            }

            public void onStateUpdate(String str, FileObject fileObject) {
                FileObjectKey of = FileObjectKey.of(str);
                FileObjectStatus status = fileObject.status();
                DefaultFileSystemMonitor.LOG.debug("Received status '{} 'for: {}", status, fileObject);
                if (predicate.test(status)) {
                    DefaultFileSystemMonitor.this.cleanable.add(fileObject.withKey(of));
                    if (DefaultFileSystemMonitor.this.scanned.remove(of) != null) {
                        DefaultFileSystemMonitor.this.changed.set(true);
                        return;
                    }
                    return;
                }
                if (status.isOneOf(new FileObjectStatus[]{FileObjectStatus.CLEANED, FileObjectStatus.INVALID}) && DefaultFileSystemMonitor.this.scheduled.remove(of) == null && status.isOneOf(new FileObjectStatus[]{FileObjectStatus.CLEANED})) {
                    DefaultFileSystemMonitor.LOG.debug("Received cleaned status but no object-file currently scheduled for: '{}'. This warn should only occurred during recovering step", str);
                }
            }
        });
        if (this.store.isStarted()) {
            LOG.warn("The StateBackingStore used to synchronize this connector with tasks processing files is already started. You can ignore that warning if the connector  is recovering from a crash or resuming after being paused.");
        } else {
            this.store.start();
        }
        readStatesToEnd(ON_START_READ_END_LOG_TIMEOUT);
        recoverPreviouslyCompletedSources();
        cleanUpCompletedFiles();
        LOG.info("Initialized FileSystemMonitor");
    }

    private void recoverPreviouslyCompletedSources() {
        LOG.info("Recovering completed files from a previous execution");
        Stream filter = this.fileState.states().entrySet().stream().map(entry -> {
            return ((FileObject) entry.getValue()).withKey(FileObjectKey.of((String) entry.getKey()));
        }).filter(fileObject -> {
            return this.cleanablePredicate.test(fileObject.status());
        });
        LinkedBlockingQueue<FileObject> linkedBlockingQueue = this.cleanable;
        Objects.requireNonNull(linkedBlockingQueue);
        filter.forEach((v1) -> {
            r1.add(v1);
        });
        LOG.info("Finished recovering previously completed files : " + this.cleanable);
    }

    private boolean readStatesToEnd(Duration duration) {
        try {
            this.store.refresh(duration.toMillis(), TimeUnit.MILLISECONDS);
            this.fileState = this.store.snapshot();
            LOG.debug("Finished reading to end of log and updated states snapshot, new states log position: {}", Long.valueOf(this.fileState.offset()));
            return true;
        } catch (TimeoutException e) {
            LOG.warn("Failed to reach end of states log quickly enough", e);
            return false;
        }
    }

    public void invoke(ConnectorContext connectorContext) {
        cleanUpCompletedFiles();
        if (!this.running.get() || !this.fileSystemListingEnabled.get()) {
            if (this.fileSystemListingEnabled.get()) {
                LOG.info("The connector is not completely started or is being shut down. Skip filesystem listing.");
            }
        } else if (this.taskReconfigurationRequested.get()) {
            LOG.info("Task reconfiguration requested. Skip filesystem listing.");
        } else if (updateFiles()) {
            LOG.info("Requesting task reconfiguration");
            this.taskReconfigurationRequested.set(true);
            connectorContext.requestTaskReconfiguration();
        }
    }

    public void setFileSystemListingEnabled(boolean z) {
        this.fileSystemListingEnabled.set(z);
    }

    private void cleanUpCompletedFiles() {
        if (this.cleanable.isEmpty()) {
            LOG.debug("Skipped cleanup. No object file completed.");
            return;
        }
        LOG.info("Cleaning up completed object files '{}'", Integer.valueOf(this.cleanable.size()));
        ArrayList arrayList = new ArrayList(this.cleanable.size());
        this.cleanable.drainTo(arrayList);
        ((FileCleanupPolicyResultSet) this.cleaner.apply(arrayList)).forEach((fileObject, fileCleanupPolicyResult) -> {
            if (fileCleanupPolicyResult.equals(FileCleanupPolicyResult.SUCCEED)) {
                this.store.putAsync(((FileObjectKey) fileObject.key().get()).original(), fileObject.withStatus(FileObjectStatus.CLEANED));
            } else {
                LOG.warn("Postpone clean up for object file: '{}'", fileObject.metadata().stringURI());
                this.cleanable.add(fileObject);
            }
        });
        LOG.info("Finished cleaning all completed object files");
    }

    private synchronized boolean updateFiles() {
        boolean isEmpty = this.scheduled.isEmpty();
        if (!isEmpty && this.allowTasksReconfigurationAfterTimeoutMs.longValue() == Long.MAX_VALUE) {
            LOG.info("Scheduled files still being processed: {}. Skip filesystem listing while waiting for tasks completion", Integer.valueOf(this.scheduled.size()));
            return false;
        }
        boolean readStatesToEnd = readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
        if (isEmpty && !readStatesToEnd) {
            LOG.warn("Failed to read state changelog. Skip filesystem listing due to timeout");
            return false;
        }
        LOG.info("Starting to list object files using: {}", this.fsListing.getClass().getSimpleName());
        long milliseconds = Time.SYSTEM.milliseconds();
        Collection listObjects = this.fsListing.listObjects();
        LOG.info("Completed object files listing. '{}' object files found in {}ms", Integer.valueOf(listObjects.size()), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds));
        StateSnapshot snapshot = this.store.snapshot();
        Map<? extends FileObjectKey, ? extends FileObjectMeta> filter = FileObjectCandidatesFilter.filter(this.offsetPolicy, fileObjectKey -> {
            FileObject fileObject = (FileObject) snapshot.getForKey(fileObjectKey.original());
            if (fileObject == null) {
                return true;
            }
            FileObjectStatus status = fileObject.status();
            return (this.cleanablePredicate.test(status) || status.isDone()) ? false : true;
        }, listObjects);
        if (!isEmpty) {
            if (this.scheduled.keySet().containsAll(filter.keySet())) {
                LOG.info("Scheduled files still being processed ({}) and no new files found. Skip task reconfiguration", Integer.valueOf(this.scheduled.size()));
                return false;
            }
            if (this.nextAllowedTasksReconfiguration.longValue() == -1) {
                this.nextAllowedTasksReconfiguration = Long.valueOf(Time.SYSTEM.milliseconds() + this.allowTasksReconfigurationAfterTimeoutMs.longValue());
            }
            long max = Math.max(0L, this.nextAllowedTasksReconfiguration.longValue() - Time.SYSTEM.milliseconds());
            if (max > 0) {
                LOG.info("Scheduled files still being processed ({}) but new files detected. Waiting for {} ms before allowing task reconfiguration", Integer.valueOf(this.scheduled.size()), Long.valueOf(max));
                return false;
            }
        }
        this.nextAllowedTasksReconfiguration = -1L;
        this.scanned.putAll(filter);
        notifyAll();
        LOG.info("Finished lookup for new object files: '{}' files can be scheduled for processing", Integer.valueOf(this.scanned.size()));
        return !this.scanned.isEmpty() && this.running.get();
    }

    public List<FileObjectMeta> listFilesToSchedule(int i) {
        if (!this.running.get()) {
            LOG.info("Started FileSystemMonitor");
            this.running.set(true);
            return Collections.emptyList();
        }
        try {
            long milliseconds = Time.SYSTEM.milliseconds();
            for (long j = milliseconds; this.scanned.isEmpty() && j - milliseconds < TASK_CONFIGURATION_DEFAULT_TIMEOUT; j = Time.SYSTEM.milliseconds()) {
                try {
                    synchronized (this) {
                        LOG.info("No file to be scheduled, waiting for next filesystem scan execution");
                        wait(Math.max(0L, TASK_CONFIGURATION_DEFAULT_TIMEOUT - (j - milliseconds)));
                    }
                } catch (InterruptedException e) {
                }
            }
            List linkedList = new LinkedList();
            if (!this.scanned.isEmpty()) {
                int i2 = 0;
                do {
                    this.changed.set(false);
                    LOG.info("Preparing next scheduling using the object files found during last iteration (attempt={}/{}).", Integer.valueOf(i2 + 1), Integer.valueOf(MAX_SCHEDULE_ATTEMPTS));
                    if (!readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT)) {
                        LOG.warn("Failed to read state changelog while scheduling object files. Timeout.");
                    }
                    if (this.scanned.size() <= i) {
                        this.scheduled.putAll(this.scanned);
                    } else {
                        Iterator<Map.Entry<FileObjectKey, FileObjectMeta>> it = this.scanned.entrySet().iterator();
                        while (this.scheduled.size() < i && it.hasNext()) {
                            Map.Entry<FileObjectKey, FileObjectMeta> next = it.next();
                            this.scheduled.put(next.getKey(), next.getValue());
                        }
                    }
                    linkedList = new ArrayList(this.scheduled.values());
                    i2++;
                    if (this.changed.get()) {
                        if (i2 == MAX_SCHEDULE_ATTEMPTS) {
                            LOG.warn("Failed to prepare the object files after attempts: {}.", Integer.valueOf(MAX_SCHEDULE_ATTEMPTS));
                            this.scheduled.clear();
                            List<FileObjectMeta> emptyList = Collections.emptyList();
                            this.scanned.clear();
                            this.taskReconfigurationRequested.set(false);
                            return emptyList;
                        }
                        LOG.warn("State updates was received while preparing the object files to be scheduled");
                    }
                    if (!this.changed.get()) {
                        break;
                    }
                } while (i2 < MAX_SCHEDULE_ATTEMPTS);
            }
            if (linkedList.isEmpty()) {
                LOG.warn("Filesystem could not be scanned quickly enough, or no object file was detected after starting the connector.");
            }
            List<FileObjectMeta> list = (List) linkedList.stream().sorted(BY_LAST_MODIFIED).collect(Collectors.toList());
            this.scanned.clear();
            this.taskReconfigurationRequested.set(false);
            return list;
        } catch (Throwable th) {
            this.scanned.clear();
            this.taskReconfigurationRequested.set(false);
            throw th;
        }
    }

    public void close() {
        if (this.running.compareAndSet(true, false)) {
            try {
                LOG.info("Closing FileSystemMonitor resources");
                readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
                cleanUpCompletedFiles();
                LOG.info("Closed FileSystemMonitor resources");
            } catch (Exception e) {
                LOG.warn("Unexpected error while closing FileSystemMonitor.", e);
            }
        }
    }
}
