package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.filter.FilterException;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputReader;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import java.io.File;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/source/DefaultFileRecordsPollingConsumer.class */
public class DefaultFileRecordsPollingConsumer implements FileRecordsPollingConsumer<FileRecord<TypedStruct>> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultFileRecordsPollingConsumer.class);
    private final boolean ignoreCommittedOffsets;
    private final FileInputReader reader;
    private final RecordFilterPipeline<FileRecord<TypedStruct>> pipeline;
    private final OffsetManager offsetManager;
    private StateListener listener;
    private final SourceTaskContext taskContext;
    private FileRecord latestPollRecord;
    private FileInputIterator<FileRecord<TypedStruct>> currentIterator;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Queue<FileInputIterable> queue = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultFileRecordsPollingConsumer(SourceTaskContext sourceTaskContext, FileInputReader fileInputReader, RecordFilterPipeline<FileRecord<TypedStruct>> recordFilterPipeline, OffsetManager offsetManager, boolean z) {
        this.ignoreCommittedOffsets = z;
        this.reader = fileInputReader;
        this.pipeline = recordFilterPipeline;
        this.offsetManager = offsetManager;
        this.taskContext = sourceTaskContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addAll(List<String> list) {
        if (isClose()) {
            throw new IllegalStateException("Can't add new input files, consumer is closed");
        }
        this.queue.addAll((List) list.stream().map(toIterable()).filter(excludeUnreadableAndNotify()).peek(fileInputIterable -> {
            if (hasListener()) {
                this.listener.onScheduled(new FileContext(fileInputIterable.metadata()));
            }
        }).collect(Collectors.toList()));
    }

    private Function<String, FileInputIterable> toIterable() {
        return str -> {
            return new FileInputIterable(new File(str), this.reader);
        };
    }

    private Predicate<FileInputIterable> excludeUnreadableAndNotify() {
        return fileInputIterable -> {
            boolean isValid = fileInputIterable.isValid();
            if (!isValid) {
                LOG.error("Invalid source, file doesn't exist or is not readable - ignore : {}", fileInputIterable.file().getAbsolutePath());
                this.listener.onInvalid(new FileContext(fileInputIterable.metadata()));
            }
            return isValid;
        };
    }

    public FileContext context() {
        if (this.currentIterator == null) {
            return null;
        }
        FileContext context = this.currentIterator.context();
        if (this.latestPollRecord != null) {
            context = new FileContext(context.metadata(), this.latestPollRecord.offset().toSourceOffset());
        }
        return context;
    }

    public void seekTo(SourceOffset sourceOffset) {
        throw new UnsupportedOperationException();
    }

    /* renamed from: next, reason: merged with bridge method [inline-methods] */
    public RecordsIterable<FileRecord<TypedStruct>> m27next() {
        if (this.queue.isEmpty()) {
            return RecordsIterable.empty();
        }
        do {
            FileInputIterable peek = this.queue.peek();
            if (peek.isOpen()) {
                this.currentIterator = getOrCloseIteratorIfNoMoreRecord(peek);
            } else {
                this.currentIterator = openAndGetIteratorOrNullIfInvalid(this.taskContext, peek);
            }
            if (this.queue.isEmpty()) {
                break;
            }
        } while (this.currentIterator == null);
        if (this.currentIterator == null) {
            return RecordsIterable.empty();
        }
        try {
            try {
                try {
                    RecordsIterable<FileRecord<TypedStruct>> apply = this.pipeline.apply(this.currentIterator.next(), this.currentIterator.hasNext());
                    if (!apply.isEmpty()) {
                        this.latestPollRecord = (FileRecord) apply.last();
                    }
                    return apply;
                } catch (FilterException e) {
                    RecordsIterable<FileRecord<TypedStruct>> empty = RecordsIterable.empty();
                    if (e != null) {
                        closeIterator(this.currentIterator, e);
                    }
                    return empty;
                }
            } catch (Exception e2) {
                throw new ConnectFilePulseException(e2);
            } catch (ConnectFilePulseException e3) {
                throw e3;
            }
        } finally {
            if (0 != 0) {
                closeIterator(this.currentIterator, null);
            }
        }
    }

    public boolean hasNext() {
        return !this.queue.isEmpty();
    }

    public void close() {
        while (true) {
            FileInputIterable poll = this.queue.poll();
            if (poll == null) {
                this.reader.close();
                this.closed.set(true);
                return;
            }
            try {
                poll.close();
            } catch (Exception e) {
            }
        }
    }

    public boolean isClose() {
        return this.closed.get();
    }

    public void setFileListener(StateListener stateListener) {
        this.listener = stateListener;
    }

    private FileInputIterator<FileRecord<TypedStruct>> openAndGetIteratorOrNullIfInvalid(SourceTaskContext sourceTaskContext, FileInputIterable fileInputIterable) {
        FileInputIterator<FileRecord<TypedStruct>> fileInputIterator = null;
        SourceMetadata metadata = fileInputIterable.metadata();
        try {
        } catch (Exception e) {
            deleteFileQueueAndInvokeListener(new FileContext(metadata), e);
        }
        if (!fileInputIterable.isValid()) {
            LOG.error("File does not exist or is not readable, skip entry and continue '{}'", metadata.absolutePath());
            deleteFileQueueAndInvokeListener(new FileContext(metadata), null);
            return null;
        }
        SourceOffset empty = !this.ignoreCommittedOffsets ? (SourceOffset) this.offsetManager.getOffsetFor(sourceTaskContext, metadata).orElse(SourceOffset.empty()) : SourceOffset.empty();
        if (this.ignoreCommittedOffsets || !FileInputIterable.isAlreadyCompleted(empty, metadata)) {
            fileInputIterator = fileInputIterable.open(empty);
            this.pipeline.init(fileInputIterator.context());
            if (hasListener()) {
                this.listener.onStart(fileInputIterator.context());
            }
        } else {
            LOG.warn("Detected source file already completed, skip entry and continue '{}'", metadata.absolutePath());
            deleteFileQueueAndInvokeListener(new FileContext(metadata, empty), null);
        }
        return fileInputIterator;
    }

    private FileInputIterator<FileRecord<TypedStruct>> getOrCloseIteratorIfNoMoreRecord(FileInputIterable fileInputIterable) {
        FileInputIterator<FileRecord<TypedStruct>> it = fileInputIterable.iterator();
        if (it.hasNext()) {
            return it;
        }
        closeIterator(it, null);
        return null;
    }

    private void closeIterator(FileInputIterator<FileRecord<TypedStruct>> fileInputIterator, Exception exc) {
        try {
            try {
                fileInputIterator.close();
                deleteFileQueueAndInvokeListener(fileInputIterator.context(), exc);
            } catch (Exception e) {
                LOG.debug("Error while closing file '{}'", fileInputIterator.context(), e);
                deleteFileQueueAndInvokeListener(fileInputIterator.context(), exc);
            }
        } catch (Throwable th) {
            deleteFileQueueAndInvokeListener(fileInputIterator.context(), exc);
            throw th;
        }
    }

    private void deleteFileQueueAndInvokeListener(FileContext fileContext, Throwable th) {
        this.queue.remove();
        if (hasListener()) {
            if (th != null) {
                this.listener.onFailure(fileContext, th);
            } else {
                this.listener.onCompleted(fileContext);
            }
        }
    }

    private boolean hasListener() {
        return this.listener != null;
    }
}
