package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.Version;
import io.streamthoughts.kafka.connect.filepulse.config.ConnectorConfig;
import io.streamthoughts.kafka.connect.filepulse.config.TaskConfig;
import io.streamthoughts.kafka.connect.filepulse.offset.SimpleOffsetManager;
import io.streamthoughts.kafka.connect.filepulse.scanner.FileSystemScanner;
import io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.FSDirectoryWalker;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.CompositeFileListFilter;
import io.streamthoughts.kafka.connect.filepulse.state.FileStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.class */
public class FilePulseSourceConnector extends SourceConnector {
    private static final Logger LOG = LoggerFactory.getLogger(FilePulseSourceConnector.class);
    private static final long MAX_TIMEOUT = 5000;
    private static final String CONNECT_NAME_CONFIG = "name";
    private Map<String, String> configProperties;
    private AtomicInteger taskConfigsGeneration = new AtomicInteger(0);
    private FileSystemMonitorThread fsMonitorThread;
    private ConnectorConfig config;
    private FileSystemScanner scanner;
    private String connectorGroupName;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        String str = map.get(CONNECT_NAME_CONFIG);
        LOG.info("Configuring connector : {}", str);
        try {
            this.configProperties = map;
            this.config = new ConnectorConfig(map);
            String tasksReporterGroupId = this.config.getTasksReporterGroupId();
            this.connectorGroupName = tasksReporterGroupId != null ? tasksReporterGroupId : str;
            StateBackingStoreRegistry.instance().register(this.connectorGroupName, () -> {
                return new FileStateBackingStore(this.config.getTaskReporterTopic(), this.connectorGroupName, this.config.getInternalKafkaReporterConfig(), false);
            });
            FSDirectoryWalker directoryScanner = this.config.directoryScanner();
            directoryScanner.setFilter(new CompositeFileListFilter(this.config.filters()));
            try {
                this.scanner = new LocalFileSystemScanner(this.config.scanDirectoryPath(), directoryScanner, this.config.cleanupPolicy(), new SimpleOffsetManager(this.config.offsetStrategy()), StateBackingStoreRegistry.instance().get(this.connectorGroupName));
                this.fsMonitorThread = new FileSystemMonitorThread(this.context, this.scanner, this.config.scanInternalMs());
                this.fsMonitorThread.setUncaughtExceptionHandler((thread, th) -> {
                    LOG.info("Uncaught error from file system monitoring thread [{}]", thread.getName(), th);
                    throw new ConnectException(th);
                });
                this.fsMonitorThread.start();
            } catch (Exception e) {
                LOG.error("Closing resources due to an error thrown during initialization of connector {} ", this.connectorGroupName);
                StateBackingStoreRegistry.instance().release(this.connectorGroupName);
                if (this.fsMonitorThread != null) {
                    this.fsMonitorThread.shutdown(0L);
                }
                throw e;
            }
        } catch (ConfigException e2) {
            throw new ConnectException("Couldn't init FilePulseSourceConnector due to configuration error", e2);
        }
    }

    public Class<? extends Task> taskClass() {
        return FilePulseSourceTask.class;
    }

    public List<Map<String, String>> taskConfigs(int i) {
        LOG.info("Creating new tasks configurations (maxTasks={})", Integer.valueOf(i));
        List<List> partitionFilesAndGet = this.scanner.partitionFilesAndGet(i);
        ArrayList arrayList = new ArrayList(partitionFilesAndGet.size());
        if (partitionFilesAndGet.isEmpty()) {
            LOG.warn("Failed to create new task configs - no source files found.");
        } else {
            long andIncrement = this.taskConfigsGeneration.getAndIncrement();
            for (List list : partitionFilesAndGet) {
                HashMap hashMap = new HashMap(this.configProperties);
                hashMap.put("internal.kafka.reporter.id", this.connectorGroupName);
                hashMap.put(TaskConfig.FILE_INPUT_PATHS_CONFIG, String.join(",", list));
                arrayList.add(hashMap);
            }
            for (int i2 = 0; i2 < partitionFilesAndGet.size(); i2++) {
                LOG.info("Created config for task_id={} with '{}' source files (task_config_gen={}).", new Object[]{Integer.valueOf(i2), Integer.valueOf(((List) partitionFilesAndGet.get(i2)).size()), Long.valueOf(andIncrement)});
            }
        }
        return arrayList;
    }

    public void stop() {
        LOG.info("Stopping connector");
        this.fsMonitorThread.shutdown();
        try {
            StateBackingStoreRegistry.instance().release(this.connectorGroupName);
            this.fsMonitorThread.join(MAX_TIMEOUT);
        } catch (InterruptedException e) {
        }
        LOG.info("Connector stopped");
    }

    public ConfigDef config() {
        return ConnectorConfig.getConf();
    }
}
