package io.streamthoughts.kafka.connect.filepulse.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.reader.internal.NonBlockingBufferReader;
import io.streamthoughts.kafka.connect.filepulse.reader.internal.ReversedInputFileReader;
import io.streamthoughts.kafka.connect.filepulse.reader.internal.TextBlock;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffset;
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/reader/RowFileInputIterator.class */
public class RowFileInputIterator extends AbstractFileInputIterator<TypedStruct> {
    private static final Logger LOG = LoggerFactory.getLogger(RowFileInputIterator.class);
    private static final String HEADERS_RECORD_FIELD = "headers";
    private static final String FOOTERS_RECORD_FIELD = "footers";
    private final NonBlockingBufferReader reader;
    private int minNumReadRecords;
    private int skipHeaders;
    private int skipFooters;
    private List<TextBlock> headers;
    private List<String> headerStrings;
    private List<TextBlock> footers;
    private List<String> footersStrings;
    private long offsetLines;
    private final Charset charset;
    private long maxWaitMs;
    private long lastObservedRecords;
    private AtomicBoolean initialized;

    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/reader/RowFileInputIterator$Builder.class */
    public static class Builder {
        private Charset charset;
        private int minNumReadRecords;
        private FileContext context;
        private int initialBufferSize;
        private int skipHeaders;
        private int skipFooters;
        private long waitMaxMs;
        private IteratorManager iteratorManager;

        private Builder() {
            this.charset = StandardCharsets.UTF_8;
            this.minNumReadRecords = 1;
            this.initialBufferSize = 4096;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withContext(FileContext fileContext) {
            this.context = fileContext;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withSkipHeaders(int i) {
            this.skipHeaders = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withSkipFooters(int i) {
            this.skipFooters = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withMinNumReadRecords(int i) {
            this.minNumReadRecords = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withMaxWaitMs(long j) {
            this.waitMaxMs = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withInitialBufferSize(int i) {
            this.initialBufferSize = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withCharset(Charset charset) {
            this.charset = charset;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withIteratorManager(IteratorManager iteratorManager) {
            this.iteratorManager = iteratorManager;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public RowFileInputIterator build() {
            validateNotNull(this.context, "context");
            NonBlockingBufferReader nonBlockingBufferReader = new NonBlockingBufferReader(this.context.file(), this.initialBufferSize, this.charset);
            nonBlockingBufferReader.disableAutoFlush();
            RowFileInputIterator rowFileInputIterator = new RowFileInputIterator(this.context, nonBlockingBufferReader, this.iteratorManager, this.charset);
            rowFileInputIterator.setSkipFooters(this.skipFooters);
            rowFileInputIterator.setSkipHeaders(this.skipHeaders);
            rowFileInputIterator.setMinNumReadRecords(this.minNumReadRecords);
            rowFileInputIterator.setMaxWaitMs(this.waitMaxMs);
            return rowFileInputIterator;
        }

        private void validateNotNull(Object obj, String str) {
            if (obj == null) {
                throw new IllegalStateException("Error while building new RowFileInputIterator. The property " + str + " is null.");
            }
        }
    }

    private RowFileInputIterator(FileContext fileContext, NonBlockingBufferReader nonBlockingBufferReader, IteratorManager iteratorManager, Charset charset) {
        super(iteratorManager, fileContext);
        this.minNumReadRecords = 0;
        this.skipHeaders = 0;
        this.skipFooters = 0;
        this.offsetLines = 0L;
        this.maxWaitMs = 0L;
        this.lastObservedRecords = -1L;
        this.initialized = new AtomicBoolean(false);
        Objects.requireNonNull(nonBlockingBufferReader, "reader can't be null");
        Objects.requireNonNull(iteratorManager, "iteratorManager can't be null");
        Objects.requireNonNull(charset, "charset can't be null");
        this.reader = nonBlockingBufferReader;
        this.charset = charset;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setMinNumReadRecords(int i) {
        this.minNumReadRecords = i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setSkipHeaders(int i) {
        this.skipHeaders = i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setSkipFooters(int i) {
        this.skipFooters = i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setMaxWaitMs(long j) {
        this.maxWaitMs = j;
    }

    public void seekTo(SourceOffset sourceOffset) {
        Objects.requireNonNull(sourceOffset, "offset can't be null");
        if (sourceOffset.position() != -1) {
            this.offsetLines = sourceOffset.rows();
            this.reader.seekTo(Long.valueOf(sourceOffset.position()));
        }
        updateContext();
    }

    /* renamed from: next, reason: merged with bridge method [inline-methods] */
    public RecordsIterable<FileRecord<TypedStruct>> m10next() {
        try {
            initializeIfNeeded();
            mayWaitForLinesToBeAvailable();
            LinkedList linkedList = new LinkedList();
            List<TextBlock> readLines = this.reader.readLines(this.minNumReadRecords, false);
            if (readLines != null) {
                for (TextBlock textBlock : readLines) {
                    this.offsetLines++;
                    if (isNotLineFooter(textBlock) && isNotLineHeader(textBlock)) {
                        linkedList.add(createOutputRecord(textBlock));
                    }
                }
            }
            if (!linkedList.isEmpty() && canWaitForMoreRecords()) {
                this.lastObservedRecords = Time.SYSTEM.milliseconds();
            }
            RecordsIterable<FileRecord<TypedStruct>> recordsIterable = new RecordsIterable<>(linkedList);
            updateContext();
            return recordsIterable;
        } catch (IOException e) {
            updateContext();
            return null;
        } catch (Throwable th) {
            updateContext();
            throw th;
        }
    }

    private void mayWaitForLinesToBeAvailable() {
        if (!this.reader.hasNext()) {
            LOG.debug("Waiting for more bytes from file {} (timeout={}ms)", this.context.metadata(), Long.valueOf(this.maxWaitMs));
            long j = this.lastObservedRecords + this.maxWaitMs;
            while (!this.reader.hasNext() && canWaitForMoreRecords()) {
                Time.SYSTEM.sleep(Math.min(100L, Math.abs(j - Time.SYSTEM.milliseconds())));
            }
        }
        if (this.reader.hasNext() || canWaitForMoreRecords()) {
            return;
        }
        LOG.info("Timeout after waiting for more bytes from file {} after '{}ms'.", this.context.metadata(), Long.valueOf(this.maxWaitMs));
        if (this.reader.remaining()) {
            LOG.info("Remaining buffered bytes detected");
            this.reader.enableAutoFlush();
        }
    }

    private void updateContext() {
        this.context = this.context.withOffset(new SourceOffset(this.reader.position(), this.offsetLines, Time.SYSTEM.milliseconds()));
    }

    private FileRecord<TypedStruct> createOutputRecord(TextBlock textBlock) {
        TypedStruct create = TypedStruct.create();
        create.put("message", textBlock.data());
        if (this.skipHeaders > 0) {
            create.put(HEADERS_RECORD_FIELD, this.headerStrings);
        }
        if (this.skipFooters > 0) {
            create.put(FOOTERS_RECORD_FIELD, this.footersStrings);
        }
        return new TypedFileRecord(RowFileRecordOffset.with(textBlock.startOffset(), textBlock.endOffset()).withSize(textBlock.size()).withRowNumber(this.offsetLines), create);
    }

    private void initializeIfNeeded() {
        if (this.initialized.get()) {
            return;
        }
        mayReadHeaders();
        mayReadFooters();
        this.lastObservedRecords = Time.SYSTEM.milliseconds();
        this.initialized.set(true);
    }

    public boolean hasNext() {
        return this.reader.hasNext() || this.reader.remaining() || canWaitForMoreRecords();
    }

    private boolean canWaitForMoreRecords() {
        return this.lastObservedRecords + this.maxWaitMs > Time.SYSTEM.milliseconds();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.reader.AbstractFileInputIterator
    public void close() {
        if (isClose()) {
            return;
        }
        if (this.reader != null) {
            this.reader.close();
        }
        super.close();
    }

    private void mayReadFooters() {
        String name = this.context.metadata().name();
        String absolutePath = this.context.metadata().absolutePath();
        if (this.skipFooters > 0) {
            LOG.info("Starting to read footer lines ({}) from file {}", Integer.valueOf(this.skipFooters), name);
            try {
                ReversedInputFileReader reversedInputFileReader = new ReversedInputFileReader(absolutePath, this.charset);
                try {
                    this.footers = reversedInputFileReader.readLines(this.skipFooters);
                    reversedInputFileReader.close();
                    if (this.footers.size() < this.skipFooters) {
                        throw new ReaderException("Not enough value for reading footer lines from file " + absolutePath + " (available=" + this.footers.size() + ", expecting=" + this.skipFooters + ")");
                    }
                    Collections.reverse(this.footers);
                    this.footersStrings = (List) this.footers.stream().map((v0) -> {
                        return v0.data();
                    }).collect(Collectors.toList());
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException("", e);
            }
        }
    }

    private void mayReadHeaders() {
        String name = this.context.metadata().name();
        String absolutePath = this.context.metadata().absolutePath();
        if (this.skipHeaders > 0) {
            LOG.info("Starting to read header lines ({}) from file {}", Integer.valueOf(this.skipHeaders), name);
            try {
                NonBlockingBufferReader nonBlockingBufferReader = new NonBlockingBufferReader(new File(absolutePath), this.charset);
                try {
                    this.headers = nonBlockingBufferReader.readLines(this.skipHeaders, true);
                    this.headerStrings = (List) this.headers.stream().map((v0) -> {
                        return v0.data();
                    }).collect(Collectors.toList());
                    nonBlockingBufferReader.close();
                    if (this.headers.size() < this.skipHeaders) {
                        throw new ReaderException(String.format("Not enough value for reading header lines from file %s (available=%d, expecting=%d)", absolutePath, Integer.valueOf(this.headers.size()), Integer.valueOf(this.skipHeaders)));
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException("", e);
            }
        }
    }

    private boolean isNotLineHeader(TextBlock textBlock) {
        return this.skipHeaders <= 0 || textBlock.startOffset() > this.headers.get(this.skipHeaders - 1).startOffset();
    }

    private boolean isNotLineFooter(TextBlock textBlock) {
        return this.skipFooters <= 0 || textBlock.startOffset() < this.footers.get(0).startOffset();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Builder newBuilder() {
        return new Builder();
    }
}
