package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.config.TaskConfig;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager;
import io.streamthoughts.kafka.connect.filepulse.offset.SimpleOffsetManager;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.state.FileStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.class */
public class FilePulseSourceTask extends SourceTask {
    private static final Logger LOG = LoggerFactory.getLogger(FilePulseSourceTask.class);
    private static final Integer NO_PARTITION = null;
    public TaskConfig config;
    private String topic;
    private DefaultFileRecordsPollingConsumer consumer;
    private OffsetManager offsetManager;
    private StateBackingStore<SourceFile> store;
    private KafkaFileStateReporter reporter;
    private volatile FileContext contextToBeCommitted;

    public String version() {
        return new FilePulseSourceConnector().version();
    }

    public void start(Map<String, String> map) {
        LOG.info("Starting task");
        this.config = new TaskConfig(map);
        this.offsetManager = new SimpleOffsetManager(this.config.offsetStrategy());
        this.store = getStateStatesBackingStore();
        this.topic = this.config.topic();
        this.consumer = newFileRecordsPollingConsumer();
        this.reporter = new KafkaFileStateReporter(this.store, this.offsetManager);
        this.consumer.setFileListener(this.reporter);
        this.consumer.addAll(this.config.files());
    }

    private DefaultFileRecordsPollingConsumer newFileRecordsPollingConsumer() {
        return new DefaultFileRecordsPollingConsumer(this.context, this.config.reader(), new DefaultRecordFilterPipeline(this.config.filters()), this.offsetManager, this.config.isReadCommittedFile());
    }

    private StateBackingStore<SourceFile> getStateStatesBackingStore() {
        String tasksReporterGroupId = this.config.getTasksReporterGroupId();
        StateBackingStoreRegistry.instance().register(tasksReporterGroupId, () -> {
            return new FileStateBackingStore(this.config.getTaskReporterTopic(), tasksReporterGroupId, this.config.getInternalKafkaReporterConfig());
        });
        return StateBackingStoreRegistry.instance().get(tasksReporterGroupId);
    }

    public List<SourceRecord> poll() throws InterruptedException {
        this.contextToBeCommitted = this.consumer.context();
        if (!this.consumer.hasNext()) {
            this.contextToBeCommitted = null;
            LOG.info("Orphan task detected - all scheduled files are now completed - waiting for new reconfiguration.");
            synchronized (this) {
                wait();
            }
            return null;
        }
        RecordsIterable<FileRecord<TypedStruct>> m24next = this.consumer.m24next();
        if (m24next.isEmpty() && this.consumer.hasNext()) {
            Thread.sleep(500L);
            m24next = this.consumer.m24next();
        }
        FileContext context = this.consumer.context();
        if (m24next == null || m24next.isEmpty()) {
            return null;
        }
        return (List) m24next.stream().map(fileRecord -> {
            return buildSourceRecord(context, fileRecord);
        }).collect(Collectors.toList());
    }

    public void commit() {
        if (this.contextToBeCommitted != null) {
            this.reporter.notify(this.contextToBeCommitted.metadata(), this.contextToBeCommitted.offset(), SourceStatus.READING);
        }
    }

    private SourceRecord buildSourceRecord(FileContext fileContext, FileRecord<?> fileRecord) {
        return fileRecord.toSourceRecord(this.offsetManager.toPartitionMap(fileContext.metadata()), this.offsetManager.toOffsetMap(fileRecord.offset().toSourceOffset()), fileContext.metadata(), this.topic, NO_PARTITION);
    }

    public void stop() {
        LOG.info("Stopping task.");
        synchronized (this) {
            if (this.consumer != null) {
                this.consumer.close();
                notify();
            }
            if (this.store != null) {
                StateBackingStoreRegistry.instance().release(this.config.getTasksReporterGroupId());
            }
        }
        LOG.info("Task stopped.");
    }
}
