package org.hpccsystems.dfs.client;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;
import org.hpccsystems.dfs.client.DataPartition;
import org.hpccsystems.dfs.client.RowServiceInputStream;

/* loaded from: input_file:org/hpccsystems/dfs/client/HpccRemoteFileReader.class */
public class HpccRemoteFileReader<T> implements Iterator<T> {
    private static final Logger log = LogManager.getLogger(HpccRemoteFileReader.class);
    private DataPartition dataPartition;
    private RowServiceInputStream inputStream;
    private BinaryRecordReader binaryRecordReader;
    private IRecordBuilder recordBuilder;
    private boolean handlePrefetch;
    private boolean isClosed;
    private boolean canReadNext;
    private int retryCount;
    private int maxReadRetries;
    private long openTimeMs;
    private long recordsRead;
    private FileReadContext context;
    private Span readSpan;
    public static final int NO_RECORD_LIMIT = -1;
    public static final int DEFAULT_READ_SIZE_OPTION = -1;
    public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1;
    public static final int DEFAULT_READ_RETRIES = 3;

    /* loaded from: input_file:org/hpccsystems/dfs/client/HpccRemoteFileReader$FileReadContext.class */
    public static class FileReadContext {
        public FieldDef originalRD = null;
        public int connectTimeout = -1;
        public int socketOpTimeoutMS = -1;
        public int recordReadLimit = -1;
        public boolean createPrefetchThread = true;
        public int initialReadSizeKB = -1;
        public int readSizeKB = -1;
        public int readRequestSpanBatchSize = -1;
        public Span parentSpan = null;
    }

    /* loaded from: input_file:org/hpccsystems/dfs/client/HpccRemoteFileReader$FileReadResumeInfo.class */
    public static class FileReadResumeInfo {
        public long inputStreamPos = 0;
        public byte[] tokenBin = null;
        public long recordReaderStreamPos = 0;
    }

    private static FileReadContext constructReadContext(FieldDef fieldDef, int i, int i2, int i3, boolean z, int i4) {
        FileReadContext fileReadContext = new FileReadContext();
        fileReadContext.originalRD = fieldDef;
        fileReadContext.connectTimeout = i;
        fileReadContext.socketOpTimeoutMS = i2;
        fileReadContext.recordReadLimit = i3;
        fileReadContext.createPrefetchThread = z;
        fileReadContext.readSizeKB = i4;
        return fileReadContext;
    }

    private static RowServiceInputStream.StreamContext constructStreamContext(FileReadContext fileReadContext) {
        RowServiceInputStream.StreamContext streamContext = new RowServiceInputStream.StreamContext();
        streamContext.recordDefinition = fileReadContext.originalRD;
        streamContext.recordReadLimit = fileReadContext.recordReadLimit;
        streamContext.createPrefetchThread = fileReadContext.createPrefetchThread;
        streamContext.maxReadSizeKB = fileReadContext.readSizeKB;
        streamContext.initialReadSizeKB = fileReadContext.initialReadSizeKB;
        streamContext.connectTimeoutMS = fileReadContext.connectTimeout;
        streamContext.socketOpTimeoutMS = fileReadContext.socketOpTimeoutMS;
        streamContext.createPrefetchThread = fileReadContext.createPrefetchThread;
        return streamContext;
    }

    public HpccRemoteFileReader(DataPartition dataPartition, FieldDef fieldDef, IRecordBuilder iRecordBuilder) throws Exception {
        this(dataPartition, fieldDef, iRecordBuilder, -1);
    }

    public HpccRemoteFileReader(DataPartition dataPartition, FieldDef fieldDef, IRecordBuilder iRecordBuilder, int i) throws Exception {
        this(dataPartition, fieldDef, iRecordBuilder, i, -1);
    }

    public HpccRemoteFileReader(DataPartition dataPartition, FieldDef fieldDef, IRecordBuilder iRecordBuilder, int i, int i2) throws Exception {
        this(dataPartition, fieldDef, iRecordBuilder, i, i2, true, -1);
    }

    public HpccRemoteFileReader(DataPartition dataPartition, FieldDef fieldDef, IRecordBuilder iRecordBuilder, int i, int i2, boolean z, int i3) throws Exception {
        this(dataPartition, fieldDef, iRecordBuilder, i, i2, z, i3, null);
    }

    public HpccRemoteFileReader(DataPartition dataPartition, FieldDef fieldDef, IRecordBuilder iRecordBuilder, int i, int i2, boolean z, int i3, FileReadResumeInfo fileReadResumeInfo) throws Exception {
        this(dataPartition, fieldDef, iRecordBuilder, i, i2, z, i3, fileReadResumeInfo, 15000);
    }

    public HpccRemoteFileReader(DataPartition dataPartition, FieldDef fieldDef, IRecordBuilder iRecordBuilder, int i, int i2, boolean z, int i3, FileReadResumeInfo fileReadResumeInfo, int i4) throws Exception {
        this(constructReadContext(fieldDef, i, i4, i2, z, i3), dataPartition, iRecordBuilder, fileReadResumeInfo);
    }

    public HpccRemoteFileReader(FileReadContext fileReadContext, DataPartition dataPartition, IRecordBuilder iRecordBuilder) throws Exception {
        this(fileReadContext, dataPartition, iRecordBuilder, (FileReadResumeInfo) null);
    }

    public HpccRemoteFileReader(FileReadContext fileReadContext, DataPartition dataPartition, IRecordBuilder iRecordBuilder, FileReadResumeInfo fileReadResumeInfo) throws Exception {
        this.dataPartition = null;
        this.inputStream = null;
        this.recordBuilder = null;
        this.handlePrefetch = true;
        this.isClosed = false;
        this.canReadNext = true;
        this.retryCount = 0;
        this.maxReadRetries = 3;
        this.openTimeMs = 0L;
        this.recordsRead = 0L;
        this.context = null;
        this.readSpan = null;
        this.context = fileReadContext;
        this.handlePrefetch = this.context.createPrefetchThread;
        this.dataPartition = dataPartition;
        this.recordBuilder = iRecordBuilder;
        this.readSpan = createReadSpan(fileReadContext, dataPartition);
        if (this.context.originalRD == null) {
            Exception exc = new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
            this.readSpan.recordException(exc);
            this.readSpan.end();
            throw exc;
        }
        FieldDef recordDefinition = iRecordBuilder.getRecordDefinition();
        if (recordDefinition == null) {
            Exception exc2 = new Exception("IRecordBuilder does not have a valid record definition.");
            this.readSpan.recordException(exc2);
            this.readSpan.end();
            throw exc2;
        }
        RowServiceInputStream.StreamContext constructStreamContext = constructStreamContext(this.context);
        constructStreamContext.projectedRecordDefinition = recordDefinition;
        constructStreamContext.fileReadSpan = this.readSpan;
        if (fileReadResumeInfo == null) {
            this.inputStream = new RowServiceInputStream(constructStreamContext, this.dataPartition, (RowServiceInputStream.RestartInformation) null);
            this.inputStream.setReadRequestSpanBatchSize(this.context.readRequestSpanBatchSize);
            this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
            this.binaryRecordReader.initialize(this.recordBuilder);
            if (dataPartition.getFileType() == DataPartition.FileType.INDEX) {
                this.binaryRecordReader.setIsIndex(true);
            }
        } else {
            RowServiceInputStream.RestartInformation restartInformation = new RowServiceInputStream.RestartInformation();
            restartInformation.streamPos = fileReadResumeInfo.inputStreamPos;
            restartInformation.tokenBin = fileReadResumeInfo.tokenBin;
            this.inputStream = new RowServiceInputStream(constructStreamContext, this.dataPartition, restartInformation);
            this.inputStream.setReadRequestSpanBatchSize(this.context.readRequestSpanBatchSize);
            long j = fileReadResumeInfo.recordReaderStreamPos - fileReadResumeInfo.inputStreamPos;
            if (j < 0) {
                Exception exc3 = new Exception("Unable to restart read stream, unexpected stream position in record reader.");
                this.readSpan.recordException(exc3);
                this.readSpan.end();
                throw exc3;
            }
            this.inputStream.skip(j);
            this.binaryRecordReader = new BinaryRecordReader(this.inputStream, fileReadResumeInfo.recordReaderStreamPos);
            this.binaryRecordReader.initialize(this.recordBuilder);
        }
        log.info("HPCCRemoteFileReader: Opening file part: " + this.dataPartition.getThisPart() + (fileReadResumeInfo != null ? " resume position: " + fileReadResumeInfo.inputStreamPos : ""));
        log.trace("Original record definition:\n" + RecordDefinitionTranslator.toJsonRecord(this.context.originalRD) + " projected record definition:\n" + RecordDefinitionTranslator.toJsonRecord(recordDefinition));
        this.openTimeMs = System.currentTimeMillis();
    }

    private static Span createReadSpan(FileReadContext fileReadContext, DataPartition dataPartition) {
        Span createChildSpan = Utils.createChildSpan(fileReadContext.parentSpan, "HPCCRemoteFileReader/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart());
        createChildSpan.setStatus(StatusCode.OK);
        String copyIP = dataPartition.getCopyIP(0);
        String copyIP2 = dataPartition.getCopyCount() > 1 ? dataPartition.getCopyIP(1) : "";
        long j = fileReadContext.readSizeKB;
        if (j < 0) {
            j = 4096;
        }
        createChildSpan.setAllAttributes(Attributes.of(AttributeKey.stringKey("server.0.address"), copyIP, AttributeKey.stringKey("server.1.address"), copyIP2, AttributeKey.stringKey("server.port"), Integer.toString(dataPartition.getPort()), AttributeKey.longKey("read.size"), Long.valueOf(j * 1000)));
        return createChildSpan;
    }

    private boolean retryRead() {
        if (this.retryCount >= this.maxReadRetries) {
            return false;
        }
        log.info("Retrying read for " + this.dataPartition.toString() + " retry count: " + this.retryCount);
        this.retryCount++;
        FileReadResumeInfo fileReadResumeInfo = getFileReadResumeInfo();
        RowServiceInputStream.RestartInformation restartInformation = new RowServiceInputStream.RestartInformation();
        restartInformation.streamPos = fileReadResumeInfo.inputStreamPos;
        restartInformation.tokenBin = fileReadResumeInfo.tokenBin;
        try {
            this.inputStream.close();
        } catch (Exception e) {
        }
        try {
            this.readSpan = createReadSpan(this.context, this.dataPartition);
            RowServiceInputStream.StreamContext constructStreamContext = constructStreamContext(this.context);
            constructStreamContext.projectedRecordDefinition = this.recordBuilder.getRecordDefinition();
            constructStreamContext.fileReadSpan = this.readSpan;
            this.inputStream = new RowServiceInputStream(constructStreamContext, this.dataPartition, restartInformation);
            this.inputStream.setReadRequestSpanBatchSize(this.context.readRequestSpanBatchSize);
            long j = fileReadResumeInfo.recordReaderStreamPos - fileReadResumeInfo.inputStreamPos;
            if (j < 0) {
                throw new Exception("Unable to restart read stream, unexpected stream position in record reader.");
            }
            this.inputStream.skip(j);
            this.binaryRecordReader = new BinaryRecordReader(this.inputStream, fileReadResumeInfo.recordReaderStreamPos);
            this.binaryRecordReader.initialize(this.recordBuilder);
            return true;
        } catch (Exception e2) {
            this.readSpan.recordException(e2);
            this.readSpan.setStatus(StatusCode.ERROR);
            this.readSpan.end();
            log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e2.getMessage(), e2);
            return false;
        }
    }

    public void setMaxReadRetries(int i) {
        this.maxReadRetries = i;
    }

    public long getStreamPosition() {
        return this.binaryRecordReader.getStreamPosAfterLastRecord();
    }

    public FileReadResumeInfo getFileReadResumeInfo() {
        return getFileReadResumeInfo(Long.valueOf(getStreamPosition()));
    }

    public FileReadResumeInfo getFileReadResumeInfo(Long l) {
        FileReadResumeInfo fileReadResumeInfo = new FileReadResumeInfo();
        fileReadResumeInfo.recordReaderStreamPos = l.longValue();
        RowServiceInputStream.RestartInformation restartInformationForStreamPos = this.inputStream.getRestartInformationForStreamPos(fileReadResumeInfo.recordReaderStreamPos);
        fileReadResumeInfo.inputStreamPos = restartInformationForStreamPos.streamPos;
        fileReadResumeInfo.tokenBin = restartInformationForStreamPos.tokenBin;
        return fileReadResumeInfo;
    }

    public int getRemoteReadMessageCount() {
        int i = 0;
        if (this.binaryRecordReader != null) {
            i = this.binaryRecordReader.getStreamMessageCount();
        }
        return i;
    }

    public String getRemoteReadMessages() {
        return this.binaryRecordReader != null ? this.binaryRecordReader.getStreamMessages() : "";
    }

    public void prefetch() {
        if (this.handlePrefetch) {
            log.warn("Prefetch called on an HpccRemoteFileReader that has an internal prefetch thread.");
        } else if (this.isClosed) {
            log.warn("Prefetch called on an HpccRemoteFileReader that has been closed.");
        } else {
            this.inputStream.prefetchData();
        }
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        if (this.isClosed) {
            log.warn("hasNext() called on an HpccRemoteFileReader that has been closed.");
            return false;
        }
        this.canReadNext = false;
        try {
            this.canReadNext = this.binaryRecordReader.hasNext();
            if (this.inputStream.getPrefetchException() != null) {
                throw this.inputStream.getPrefetchException();
            }
            return this.canReadNext;
        } catch (HpccFileException e) {
            this.readSpan.recordException(e);
            this.readSpan.setStatus(StatusCode.ERROR);
            this.readSpan.end();
            if (retryRead()) {
                return hasNext();
            }
            this.canReadNext = false;
            log.error("Read failure for " + this.dataPartition.toString() + ":" + e.getMessage(), e);
            NoSuchElementException noSuchElementException = new NoSuchElementException("Fatal read error: " + e.getMessage());
            noSuchElementException.initCause(e);
            throw noSuchElementException;
        }
    }

    @Override // java.util.Iterator
    public T next() {
        if (this.isClosed && !this.canReadNext) {
            throw new NoSuchElementException("Fatal read error: Attempting to read next() from a closed file reader.");
        }
        try {
            T t = (T) this.binaryRecordReader.getNext();
            this.recordsRead++;
            this.canReadNext = false;
            return t;
        } catch (HpccFileException e) {
            this.readSpan.recordException(e);
            this.readSpan.setStatus(StatusCode.ERROR);
            this.readSpan.end();
            if (retryRead()) {
                return next();
            }
            log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e);
            NoSuchElementException noSuchElementException = new NoSuchElementException("Fatal read error: " + e.getMessage());
            noSuchElementException.initCause(e);
            throw noSuchElementException;
        }
    }

    public void close() throws Exception {
        if (this.isClosed) {
            log.warn("Calling close on an already closed file reader for file part: " + this.dataPartition.toString());
            return;
        }
        this.readSpan.end();
        report();
        this.inputStream.close();
        this.isClosed = true;
        log.info("HPCCRemoteFileReader: Closing file part: " + this.dataPartition.getThisPart() + " for " + this.dataPartition.getFileName() + " read time: " + ((System.currentTimeMillis() - this.openTimeMs) / 1000.0d) + "s  records read: " + this.recordsRead);
    }

    public int getAvailable() throws IOException {
        return this.binaryRecordReader.getAvailable();
    }

    public RowServiceInputStream getInputStream() {
        return this.inputStream;
    }

    public BinaryRecordReader getRecordReader() {
        return this.binaryRecordReader;
    }

    public void report() {
        if (getRemoteReadMessageCount() > 0) {
            log.warn("DataPartition '" + this.dataPartition + "' read operation messages for " + this.dataPartition.getFileName() + ":\n");
            log.warn(getRemoteReadMessages());
        }
    }
}
