package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.Version;
import io.streamthoughts.kafka.connect.filepulse.config.CommonSourceConfig;
import io.streamthoughts.kafka.connect.filepulse.config.SourceConnectorConfig;
import io.streamthoughts.kafka.connect.filepulse.config.SourceTaskConfig;
import io.streamthoughts.kafka.connect.filepulse.fs.CompositeFileListFilter;
import io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor;
import io.streamthoughts.kafka.connect.filepulse.fs.DefaultTaskFileURIProvider;
import io.streamthoughts.kafka.connect.filepulse.fs.DelegateTaskFileURIProvider;
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemMonitor;
import io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStoreConfig;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.class */
public class FilePulseSourceConnector extends SourceConnector {
    private static final Logger LOG = LoggerFactory.getLogger(FilePulseSourceConnector.class);
    private static final long DEFAULT_MAX_TIMEOUT = 5000;
    private static final String CONNECT_NAME_CONFIG = "name";
    private Map<String, String> configProperties;
    private FileSystemMonitorThread fsMonitorThread;
    private SourceConnectorConfig connectorConfig;
    private FileSystemMonitor fsMonitor;
    private String connectorGroupName;
    private TaskPartitioner partitioner;
    private StateBackingStoreAccess sharedStore;
    private final AtomicInteger taskConfigsGeneration = new AtomicInteger(0);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        this.connectorGroupName = map.get(CONNECT_NAME_CONFIG);
        LOG.info("Starting FilePulse source connector: {}", this.connectorGroupName);
        try {
            this.configProperties = new HashMap(map);
            this.configProperties.put(KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_NAME_CONFIG, this.connectorGroupName);
            this.configProperties.put(KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG, "true");
            this.connectorConfig = new SourceConnectorConfig(this.configProperties);
            try {
                String str = this.connectorGroupName;
                SourceConnectorConfig sourceConnectorConfig = this.connectorConfig;
                Objects.requireNonNull(sourceConnectorConfig);
                this.sharedStore = new StateBackingStoreAccess(str, sourceConnectorConfig::getStateBackingStore, false);
                this.partitioner = this.connectorConfig.getTaskPartitioner();
                this.fsMonitor = createFileSystemMonitor(this.connectorConfig, this.sharedStore.get().getResource());
                this.fsMonitorThread = new FileSystemMonitorThread(this.context, this.fsMonitor, this.connectorConfig.getListingInterval());
                this.fsMonitorThread.setUncaughtExceptionHandler((thread, th) -> {
                    LOG.info("Uncaught error from file system monitoring thread [{}]", thread.getName(), th);
                    this.context.raiseError(new ConnectException("Unexpected error from FileSystemMonitorThread", th));
                });
                this.fsMonitorThread.start();
                LOG.info("Started FilePulse source connector: {}", this.connectorGroupName);
            } catch (Exception e) {
                closeResources();
                throw e;
            }
        } catch (ConfigException e2) {
            throw new ConnectException("Failed to initialize FilePulseSourceConnector due to configuration error", e2);
        }
    }

    private FileSystemMonitor createFileSystemMonitor(SourceConnectorConfig sourceConnectorConfig, StateBackingStore<FileObject> stateBackingStore) {
        FileSystemListing<?> fileSystemListing = sourceConnectorConfig.getFileSystemListing();
        fileSystemListing.setFilter(new CompositeFileListFilter(sourceConnectorConfig.getFileSystemListingFilter()));
        DefaultFileSystemMonitor defaultFileSystemMonitor = new DefaultFileSystemMonitor(sourceConnectorConfig.allowTasksReconfigurationAfterTimeoutMs(), fileSystemListing, sourceConnectorConfig.getFsCleanupPolicy(), sourceConnectorConfig.getFsCleanupPolicyPredicate(), sourceConnectorConfig.getSourceOffsetPolicy(), stateBackingStore, sourceConnectorConfig.getTaskFilerOrder());
        defaultFileSystemMonitor.setStateDefaultReadTimeout(sourceConnectorConfig.getStateDefaultReadTimeoutMs());
        defaultFileSystemMonitor.setStateInitialReadTimeout(sourceConnectorConfig.getStateInitialReadTimeoutMs());
        defaultFileSystemMonitor.setFileSystemListingEnabled(!sourceConnectorConfig.isFileListingTaskDelegationEnabled());
        return defaultFileSystemMonitor;
    }

    public Class<? extends Task> taskClass() {
        return FilePulseSourceTask.class;
    }

    public List<Map<String, String>> taskConfigs(int i) {
        LOG.info("Creating new tasks configurations (maxTasks={})", Integer.valueOf(i));
        if (this.connectorConfig.isFileListingTaskDelegationEnabled()) {
            ArrayList arrayList = new ArrayList(i);
            IntStream.range(0, i).forEachOrdered(i2 -> {
                arrayList.add(createTaskConfig(i2, i, 0L, null));
            });
            return arrayList;
        }
        List<List<String>> partitionAndGet = partitionAndGet(i);
        long andIncrement = this.taskConfigsGeneration.getAndIncrement();
        ArrayList arrayList2 = new ArrayList(partitionAndGet.size());
        if (!partitionAndGet.isEmpty()) {
            IntStream.range(0, partitionAndGet.size()).forEachOrdered(i3 -> {
                arrayList2.add(createTaskConfig(i3, partitionAndGet.size(), andIncrement, (List) partitionAndGet.get(i3)));
            });
        } else {
            if (andIncrement > 0) {
                LOG.info("No object file was found - skip task reconfiguration.");
                return arrayList2;
            }
            LOG.info("No object file was found - resetting all tasks with an empty config.");
            IntStream.range(0, i).forEachOrdered(i4 -> {
                arrayList2.add(createTaskConfig(i4, i, andIncrement, Collections.emptyList()));
            });
        }
        IntStream.range(0, partitionAndGet.size()).forEachOrdered(i5 -> {
            LOG.info("Created config for task_id={} with '{}' object files (task_config_gen={}).", new Object[]{Integer.valueOf(i5), Integer.valueOf(((List) partitionAndGet.get(i5)).size()), Long.valueOf(andIncrement)});
        });
        return arrayList2;
    }

    private List<List<String>> partitionAndGet(int i) {
        return (List) this.partitioner.partition(this.fsMonitor.listFilesToSchedule(this.connectorConfig.getMaxScheduledFiles()), i).stream().map(list -> {
            return (List) list.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toList());
        }).collect(Collectors.toList());
    }

    private Map<String, String> createTaskConfig(int i, int i2, long j, List<String> list) {
        HashMap hashMap = new HashMap(this.configProperties);
        hashMap.put(SourceTaskConfig.TASK_GENERATION_ID, String.valueOf(j));
        if (this.connectorConfig.isFileListingTaskDelegationEnabled()) {
            hashMap.put(SourceTaskConfig.FILE_URIS_PROVIDER_CONFIG, DelegateTaskFileURIProvider.class.getName());
            hashMap.put(CommonSourceConfig.TASK_PARTITIONER_CLASS_CONFIG, HashByURITaskPartitioner.class.getName());
            hashMap.put(DelegateTaskFileURIProvider.Config.TASK_ID_CONFIG, String.valueOf(i));
            hashMap.put(DelegateTaskFileURIProvider.Config.TASK_COUNT_CONFIG, String.valueOf(i2));
            hashMap.put(KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG, "true");
        } else {
            hashMap.put(SourceTaskConfig.FILE_URIS_PROVIDER_CONFIG, DefaultTaskFileURIProvider.class.getName());
            hashMap.put(DefaultTaskFileURIProvider.Config.FILE_OBJECT_URIS_CONFIG, String.join(",", list));
            hashMap.put(KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG, "false");
        }
        return hashMap;
    }

    public void stop() {
        LOG.info("Stopping FilePulse source connector");
        closeResources();
        LOG.info("Stopped FilePulse source connector");
    }

    private void closeResources() {
        if (this.closed.compareAndSet(false, true)) {
            LOG.info("Closing resources for FilePulse source connector");
            try {
                if (this.fsMonitorThread != null) {
                    try {
                        this.fsMonitorThread.shutdown(5000L);
                        this.fsMonitorThread.join(5000L);
                    } catch (InterruptedException e) {
                        LOG.warn("Failed to close file-system monitoring thread. Error: {}", e.getMessage());
                        Thread.currentThread().interrupt();
                    }
                }
                if (this.partitioner != null) {
                    try {
                        this.partitioner.close();
                    } catch (Exception e2) {
                        LOG.warn("Failed to close TaskPartition. Error: {}", e2.getMessage());
                    }
                }
                LOG.info("Closed resources for FilePulse source connector");
            } finally {
                closeSharedStateBackingStore();
            }
        }
    }

    public ConfigDef config() {
        return SourceConnectorConfig.getConf();
    }

    private void closeSharedStateBackingStore() {
        try {
            if (this.sharedStore != null) {
                this.sharedStore.close();
            }
        } catch (Exception e) {
            LOG.error("Failed to shared StateBackingStore '{}'", this.connectorGroupName);
        }
    }
}
