package org.apache.celeborn.client.read;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.network.protocol.Message;
import org.apache.celeborn.common.network.protocol.OpenStream;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.util.ShuffleBlockInfoUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.shaded.io.netty.buffer.ByteBuf;
import org.apache.celeborn.shaded.io.netty.buffer.Unpooled;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/celeborn/client/read/DfsPartitionReader.class */
public class DfsPartitionReader implements PartitionReader {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) DfsPartitionReader.class);
    PartitionLocation location;
    private final int shuffleChunkSize;
    private final int fetchMaxReqsInFlight;
    private Thread fetchThread;
    private final FSDataInputStream hdfsInputStream;
    private int numChunks;
    private final AtomicReference<IOException> exception = new AtomicReference<>();
    private volatile boolean closed = false;
    private final AtomicInteger currentChunkIndex = new AtomicInteger(0);
    private final LinkedBlockingQueue<ByteBuf> results = new LinkedBlockingQueue<>();

    public DfsPartitionReader(CelebornConf celebornConf, String str, PartitionLocation partitionLocation, TransportClientFactory transportClientFactory, int i, int i2) throws IOException {
        this.numChunks = 0;
        this.shuffleChunkSize = (int) celebornConf.shuffleChunkSize();
        this.fetchMaxReqsInFlight = celebornConf.fetchMaxReqsInFlight();
        this.location = partitionLocation;
        ArrayList arrayList = new ArrayList();
        if (i2 != Integer.MAX_VALUE) {
            try {
                Message.decode(transportClientFactory.createClient(partitionLocation.getHost(), partitionLocation.getFetchPort()).sendRpcSync(new OpenStream(str, partitionLocation.getFileName(), i, i2).toByteBuffer(), celebornConf.fetchTimeoutMs()));
                this.hdfsInputStream = ShuffleClient.getHdfsFs(celebornConf).open(new Path(Utils.getSortedFilePath(partitionLocation.getStorageInfo().getFilePath())));
                arrayList.addAll(getChunkOffsetsFromSortedIndex(celebornConf, partitionLocation, i, i2));
            } catch (IOException | InterruptedException e) {
                throw new IOException("read shuffle file from hdfs failed, filePath: " + partitionLocation.getStorageInfo().getFilePath(), e);
            }
        } else {
            this.hdfsInputStream = ShuffleClient.getHdfsFs(celebornConf).open(new Path(partitionLocation.getStorageInfo().getFilePath()));
            arrayList.addAll(getChunkOffsetsFromUnsortedIndex(celebornConf, partitionLocation));
        }
        if (arrayList.size() > 1) {
            this.numChunks = arrayList.size() - 1;
            this.fetchThread = new Thread(() -> {
                while (!this.closed && this.currentChunkIndex.get() < this.numChunks) {
                    try {
                        while (this.results.size() >= this.fetchMaxReqsInFlight) {
                            Thread.sleep(50L);
                        }
                        long longValue = ((Long) arrayList.get(this.currentChunkIndex.get())).longValue();
                        byte[] bArr = new byte[(int) (((Long) arrayList.get(this.currentChunkIndex.get() + 1)).longValue() - longValue)];
                        this.hdfsInputStream.readFully(longValue, bArr);
                        this.results.add(Unpooled.wrappedBuffer(bArr));
                        this.currentChunkIndex.incrementAndGet();
                    } catch (IOException e2) {
                        this.exception.set(e2);
                        return;
                    } catch (InterruptedException e3) {
                        return;
                    }
                }
            });
            this.fetchThread.start();
            logger.debug("Start dfs read on location {}", partitionLocation);
        }
    }

    private List<Long> getChunkOffsetsFromUnsortedIndex(CelebornConf celebornConf, PartitionLocation partitionLocation) throws IOException {
        FSDataInputStream open = ShuffleClient.getHdfsFs(celebornConf).open(new Path(Utils.getIndexFilePath(partitionLocation.getStorageInfo().getFilePath())));
        ArrayList arrayList = new ArrayList();
        int readInt = open.readInt();
        for (int i = 0; i < readInt; i++) {
            arrayList.add(Long.valueOf(open.readLong()));
        }
        open.close();
        return arrayList;
    }

    private List<Long> getChunkOffsetsFromSortedIndex(CelebornConf celebornConf, PartitionLocation partitionLocation, int i, int i2) throws IOException {
        String indexFilePath = Utils.getIndexFilePath(partitionLocation.getStorageInfo().getFilePath());
        FSDataInputStream open = ShuffleClient.getHdfsFs(celebornConf).open(new Path(indexFilePath));
        byte[] bArr = new byte[(int) ShuffleClient.getHdfsFs(celebornConf).getFileStatus(new Path(indexFilePath)).getLen()];
        open.readFully(0L, bArr);
        ArrayList arrayList = new ArrayList(ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(i, i2, this.shuffleChunkSize, ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(bArr)));
        open.close();
        return arrayList;
    }

    @Override // org.apache.celeborn.client.read.PartitionReader
    public boolean hasNext() {
        return this.currentChunkIndex.get() < this.numChunks;
    }

    @Override // org.apache.celeborn.client.read.PartitionReader
    public ByteBuf next() throws IOException {
        checkException();
        ByteBuf byteBuf = null;
        while (byteBuf == null) {
            try {
                checkException();
                byteBuf = this.results.poll(500L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                IOException iOException = new IOException(e);
                this.exception.set(iOException);
                throw iOException;
            }
        }
        return byteBuf;
    }

    private void checkException() throws IOException {
        IOException iOException = this.exception.get();
        if (iOException != null) {
            throw iOException;
        }
    }

    @Override // org.apache.celeborn.client.read.PartitionReader
    public void close() {
        this.closed = true;
        this.fetchThread.interrupt();
        try {
            this.hdfsInputStream.close();
        } catch (IOException e) {
            logger.warn("close hdfs input stream failed.", (Throwable) e);
        }
        if (this.results.size() > 0) {
            this.results.forEach((v0) -> {
                v0.release();
            });
        }
        this.results.clear();
    }

    @Override // org.apache.celeborn.client.read.PartitionReader
    public PartitionLocation getLocation() {
        return this.location;
    }
}
