package io.streamthoughts.kafka.connect.filepulse.fs.reader.text;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.fs.reader.IteratorManager;
import io.streamthoughts.kafka.connect.filepulse.fs.reader.ManagedFileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.internal.TextBlock;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIterator.class */
public class RowFileInputIterator extends ManagedFileInputIterator<TypedStruct> {
    private static final Logger LOG = LoggerFactory.getLogger(RowFileInputIterator.class);
    private final NonBlockingBufferReader reader;
    private int minNumReadRecords;
    private long offsetLines;
    private long maxWaitMs;
    private long lastObservedRecords;

    public RowFileInputIterator(FileObjectMeta fileObjectMeta, IteratorManager iteratorManager, NonBlockingBufferReader nonBlockingBufferReader) {
        super(fileObjectMeta, iteratorManager);
        this.minNumReadRecords = 1;
        this.offsetLines = 0L;
        this.maxWaitMs = 0L;
        this.reader = (NonBlockingBufferReader) Objects.requireNonNull(nonBlockingBufferReader, "reader can't be null");
        this.lastObservedRecords = Time.SYSTEM.milliseconds();
    }

    public RowFileInputIterator setMinNumReadRecords(int i) {
        this.minNumReadRecords = i;
        return this;
    }

    public RowFileInputIterator setMaxWaitMs(long j) {
        this.maxWaitMs = j;
        return this;
    }

    public void seekTo(FileObjectOffset fileObjectOffset) {
        Objects.requireNonNull(fileObjectOffset, "offset can't be null");
        if (fileObjectOffset.position() != -1) {
            this.offsetLines = fileObjectOffset.rows();
            this.reader.seekTo(Long.valueOf(fileObjectOffset.position()));
        }
        updateContext();
    }

    /* renamed from: next, reason: merged with bridge method [inline-methods] */
    public RecordsIterable<FileRecord<TypedStruct>> m8next() {
        try {
            mayWaitForLinesToBeAvailable();
            LinkedList linkedList = new LinkedList();
            List<TextBlock> readLines = this.reader.readLines(this.minNumReadRecords, false);
            if (readLines != null) {
                for (TextBlock textBlock : readLines) {
                    this.offsetLines++;
                    TypedStruct create = TypedStruct.create();
                    create.put("message", textBlock.data());
                    linkedList.add(new TypedFileRecord(RowFileRecordOffset.with(textBlock.startOffset(), textBlock.endOffset()).withSize(textBlock.size()).withRowNumber(this.offsetLines), create));
                }
            }
            if (!linkedList.isEmpty() && canWaitForMoreRecords()) {
                this.lastObservedRecords = Time.SYSTEM.milliseconds();
            }
            RecordsIterable<FileRecord<TypedStruct>> recordsIterable = new RecordsIterable<>(linkedList);
            updateContext();
            return recordsIterable;
        } catch (IOException e) {
            updateContext();
            return null;
        } catch (Throwable th) {
            updateContext();
            throw th;
        }
    }

    private void mayWaitForLinesToBeAvailable() {
        if (!this.reader.hasNext()) {
            LOG.debug("Waiting for more bytes from file {} (timeout={}ms)", this.context.metadata(), Long.valueOf(this.maxWaitMs));
            long j = this.lastObservedRecords + this.maxWaitMs;
            while (!this.reader.hasNext() && canWaitForMoreRecords()) {
                Time.SYSTEM.sleep(Math.min(100L, Math.abs(j - Time.SYSTEM.milliseconds())));
            }
        }
        if (this.reader.hasNext() || canWaitForMoreRecords()) {
            return;
        }
        LOG.info("Timeout after waiting for more bytes from file {} after '{}ms'.", this.context.metadata(), Long.valueOf(this.maxWaitMs));
        if (this.reader.remaining()) {
            LOG.info("Remaining buffered bytes detected");
            this.reader.enableAutoFlush();
        }
    }

    private void updateContext() {
        this.context = this.context.withOffset(new FileObjectOffset(this.reader.position(), this.offsetLines, Time.SYSTEM.milliseconds()));
    }

    public boolean hasNext() {
        return this.reader.hasNext() || this.reader.remaining() || canWaitForMoreRecords();
    }

    private boolean canWaitForMoreRecords() {
        return this.lastObservedRecords + this.maxWaitMs > Time.SYSTEM.milliseconds();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.fs.reader.ManagedFileInputIterator
    public void close() {
        if (isClosed()) {
            return;
        }
        if (this.reader != null) {
            this.reader.close();
        }
        super.close();
    }
}
