package org.apache.celeborn.client.read;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.celeborn.client.compress.Decompressor;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.shaded.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.celeborn.shaded.io.netty.buffer.ByteBuf;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/celeborn/client/read/RssInputStream.class */
public abstract class RssInputStream extends InputStream {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) RssInputStream.class);
    private static final RssInputStream emptyInputStream = new RssInputStream() { // from class: org.apache.celeborn.client.read.RssInputStream.1
        @Override // java.io.InputStream
        public int read() throws IOException {
            return -1;
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            return -1;
        }

        @Override // org.apache.celeborn.client.read.RssInputStream
        public void setCallback(MetricsCallback metricsCallback) {
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/celeborn/client/read/RssInputStream$RssInputStreamImpl.class */
    public static final class RssInputStreamImpl extends RssInputStream {
        private static final Random RAND = new Random();
        private final CelebornConf conf;
        private final TransportClientFactory clientFactory;
        private final String shuffleKey;
        private final PartitionLocation[] locations;
        private final int[] attempts;
        private final int attemptNumber;
        private final int startMapIndex;
        private final int endMapIndex;
        private byte[] compressedBuf;
        private byte[] decompressedBuf;
        private final Decompressor decompressor;
        private ByteBuf currentChunk;
        private PartitionReader currentReader;
        private final int fetchChunkMaxRetry;
        int retryWaitMs;
        private int fileIndex;
        private int position;
        private int limit;
        private MetricsCallback callback;
        private final boolean rangeReadFilter;
        private final Map<Integer, Set<Integer>> batchesRead = new HashMap();
        private int fetchChunkRetryCnt = 0;
        private final int BATCH_HEADER_SIZE = 16;
        private final byte[] sizeBuf = new byte[16];
        private LongAdder skipCount = new LongAdder();

        RssInputStreamImpl(CelebornConf celebornConf, TransportClientFactory transportClientFactory, String str, PartitionLocation[] partitionLocationArr, int[] iArr, int i, int i2, int i3) throws IOException {
            this.conf = celebornConf;
            this.clientFactory = transportClientFactory;
            this.shuffleKey = str;
            this.locations = (PartitionLocation[]) Utils.randomizeInPlace(partitionLocationArr, RAND);
            this.attempts = iArr;
            this.attemptNumber = i;
            this.startMapIndex = i2;
            this.endMapIndex = i3;
            this.rangeReadFilter = celebornConf.shuffleRangeReadFilterEnabled();
            int pushBufferMaxSize = celebornConf.pushBufferMaxSize() + Decompressor.getCompressionHeaderLength(celebornConf);
            this.compressedBuf = new byte[pushBufferMaxSize];
            this.decompressedBuf = new byte[pushBufferMaxSize];
            this.decompressor = Decompressor.getDecompressor(celebornConf);
            this.fetchChunkMaxRetry = celebornConf.fetchMaxRetries();
            this.retryWaitMs = Utils.fromCelebornConf(celebornConf, TransportModuleConstants.DATA_MODULE, 0).ioRetryWaitTimeMs();
            moveToNextReader();
        }

        private boolean skipLocation(int i, int i2, PartitionLocation partitionLocation) {
            if (!this.rangeReadFilter || i2 == Integer.MAX_VALUE) {
                return false;
            }
            RoaringBitmap mapIdBitMap = partitionLocation.getMapIdBitMap();
            if (mapIdBitMap == null && partitionLocation.getPeer() != null) {
                mapIdBitMap = partitionLocation.getPeer().getMapIdBitMap();
            }
            for (int i3 = i; i3 < i2; i3++) {
                if (mapIdBitMap.contains(i3)) {
                    return false;
                }
            }
            return true;
        }

        private PartitionLocation nextReadableLocation() {
            int length = this.locations.length;
            if (this.fileIndex >= length) {
                return null;
            }
            PartitionLocation partitionLocation = this.locations[this.fileIndex];
            while (true) {
                PartitionLocation partitionLocation2 = partitionLocation;
                if (!skipLocation(this.startMapIndex, this.endMapIndex, partitionLocation2)) {
                    this.fetchChunkRetryCnt = 0;
                    return partitionLocation2;
                }
                this.skipCount.increment();
                this.fileIndex++;
                if (this.fileIndex == length) {
                    return null;
                }
                partitionLocation = this.locations[this.fileIndex];
            }
        }

        private void moveToNextReader() throws IOException {
            if (this.currentReader != null) {
                this.currentReader.close();
                this.currentReader = null;
            }
            PartitionLocation nextReadableLocation = nextReadableLocation();
            if (nextReadableLocation == null) {
                return;
            }
            this.currentReader = createReaderWithRetry(nextReadableLocation);
            this.fileIndex++;
            while (!this.currentReader.hasNext()) {
                this.currentReader.close();
                this.currentReader = null;
                PartitionLocation nextReadableLocation2 = nextReadableLocation();
                if (nextReadableLocation2 == null) {
                    return;
                }
                this.currentReader = createReaderWithRetry(nextReadableLocation2);
                this.fileIndex++;
            }
            this.currentChunk = getNextChunk();
        }

        private PartitionReader createReaderWithRetry(PartitionLocation partitionLocation) throws IOException {
            while (this.fetchChunkRetryCnt < this.fetchChunkMaxRetry) {
                try {
                    return createReader(partitionLocation, this.fetchChunkRetryCnt, this.fetchChunkMaxRetry);
                } catch (Exception e) {
                    this.fetchChunkRetryCnt++;
                    if (partitionLocation.getPeer() != null) {
                        partitionLocation = partitionLocation.getPeer();
                        RssInputStream.logger.warn("CreatePartitionReader failed {}/{} times, change to peer", Integer.valueOf(this.fetchChunkRetryCnt), Integer.valueOf(this.fetchChunkMaxRetry));
                    } else {
                        RssInputStream.logger.warn("CreatePartitionReader failed {}/{} times, retry the same location", Integer.valueOf(this.fetchChunkRetryCnt), Integer.valueOf(this.fetchChunkMaxRetry));
                        Uninterruptibles.sleepUninterruptibly(this.retryWaitMs, TimeUnit.MILLISECONDS);
                    }
                }
            }
            throw new IOException("createPartitionReader failed!");
        }

        private ByteBuf getNextChunk() throws IOException {
            while (this.fetchChunkRetryCnt < this.fetchChunkMaxRetry) {
                try {
                    return this.currentReader.next();
                } catch (Exception e) {
                    this.fetchChunkRetryCnt++;
                    this.currentReader.close();
                    if (this.fetchChunkRetryCnt == this.fetchChunkMaxRetry) {
                        RssInputStream.logger.warn("Fetch chunk fail exceeds max retry {}", Integer.valueOf(this.fetchChunkRetryCnt));
                        throw new IOException("Fetch chunk failed for " + this.fetchChunkRetryCnt + " times");
                    }
                    if (this.currentReader.getLocation().getPeer() != null) {
                        RssInputStream.logger.warn("Fetch chunk failed {}/{} times, change to peer", Integer.valueOf(this.fetchChunkRetryCnt), Integer.valueOf(this.fetchChunkMaxRetry));
                        this.currentReader = createReaderWithRetry(this.currentReader.getLocation().getPeer());
                    } else {
                        RssInputStream.logger.warn("Fetch chunk failed {}/{} times", Integer.valueOf(this.fetchChunkRetryCnt), Integer.valueOf(this.fetchChunkMaxRetry));
                        Uninterruptibles.sleepUninterruptibly(this.retryWaitMs, TimeUnit.MILLISECONDS);
                        this.currentReader = createReaderWithRetry(this.currentReader.getLocation());
                    }
                }
            }
            throw new IOException("Fetch chunk failed!");
        }

        private PartitionReader createReader(PartitionLocation partitionLocation, int i, int i2) throws IOException {
            if (partitionLocation.getPeer() == null) {
                RssInputStream.logger.debug("Partition {} has only one partition replica.", partitionLocation);
            }
            if (partitionLocation.getPeer() != null && this.attemptNumber % 2 == 1) {
                partitionLocation = partitionLocation.getPeer();
                RssInputStream.logger.debug("Read peer {} for attempt {}.", partitionLocation, Integer.valueOf(this.attemptNumber));
            }
            RssInputStream.logger.debug("create reader for location {}", partitionLocation);
            StorageInfo storageInfo = partitionLocation.getStorageInfo();
            if (storageInfo.getType() == StorageInfo.Type.HDD || storageInfo.getType() == StorageInfo.Type.SSD) {
                return new WorkerPartitionReader(this.conf, this.shuffleKey, partitionLocation, this.clientFactory, this.startMapIndex, this.endMapIndex, i, i2);
            }
            if (storageInfo.getType() == StorageInfo.Type.HDFS) {
                return new DfsPartitionReader(this.conf, this.shuffleKey, partitionLocation, this.clientFactory, this.startMapIndex, this.endMapIndex);
            }
            throw new IOException("Unknown storage info " + storageInfo + " to read location " + partitionLocation);
        }

        @Override // org.apache.celeborn.client.read.RssInputStream
        public void setCallback(MetricsCallback metricsCallback) {
            this.callback = metricsCallback;
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            if (this.position < this.limit) {
                byte b = this.decompressedBuf[this.position];
                this.position++;
                return b & 255;
            }
            if (!fillBuffer()) {
                return -1;
            }
            if (this.position >= this.limit) {
                return read();
            }
            byte b2 = this.decompressedBuf[this.position];
            this.position++;
            return b2 & 255;
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            if (bArr == null) {
                throw new NullPointerException();
            }
            if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
                throw new IndexOutOfBoundsException();
            }
            if (i2 == 0) {
                return 0;
            }
            int i3 = 0;
            while (true) {
                int i4 = i3;
                if (i4 >= i2) {
                    return i4;
                }
                while (this.position >= this.limit) {
                    if (!fillBuffer()) {
                        if (i4 > 0) {
                            return i4;
                        }
                        return -1;
                    }
                }
                int min = Math.min(this.limit - this.position, i2 - i4);
                System.arraycopy(this.decompressedBuf, this.position, bArr, i + i4, min);
                this.position += min;
                i3 = i4 + min;
            }
        }

        @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            int length = this.locations.length;
            RssInputStream.logger.debug("total location count {} read {} skip {}", Integer.valueOf(length), Long.valueOf(length - this.skipCount.sum()), Long.valueOf(this.skipCount.sum()));
            if (this.currentChunk != null) {
                RssInputStream.logger.debug("Release chunk {}", this.currentChunk);
                this.currentChunk.release();
                this.currentChunk = null;
            }
            if (this.currentReader != null) {
                RssInputStream.logger.debug("Closing reader");
                this.currentReader.close();
                this.currentReader = null;
            }
        }

        private boolean moveToNextChunk() throws IOException {
            if (this.currentChunk != null) {
                this.currentChunk.release();
            }
            this.currentChunk = null;
            if (this.currentReader.hasNext()) {
                this.currentChunk = getNextChunk();
                return true;
            }
            if (this.fileIndex < this.locations.length) {
                moveToNextReader();
                return this.currentReader != null;
            }
            if (this.currentReader == null) {
                return false;
            }
            this.currentReader.close();
            this.currentReader = null;
            return false;
        }

        private boolean fillBuffer() throws IOException {
            if (this.currentChunk == null) {
                return false;
            }
            long currentTimeMillis = System.currentTimeMillis();
            boolean z = false;
            while (true) {
                if (!this.currentChunk.isReadable() && !moveToNextChunk()) {
                    break;
                }
                this.currentChunk.readBytes(this.sizeBuf);
                int i = Platform.getInt(this.sizeBuf, Platform.BYTE_ARRAY_OFFSET);
                int i2 = Platform.getInt(this.sizeBuf, Platform.BYTE_ARRAY_OFFSET + 4);
                int i3 = Platform.getInt(this.sizeBuf, Platform.BYTE_ARRAY_OFFSET + 8);
                int i4 = Platform.getInt(this.sizeBuf, Platform.BYTE_ARRAY_OFFSET + 12);
                if (i4 > this.compressedBuf.length) {
                    this.compressedBuf = new byte[i4];
                }
                this.currentChunk.readBytes(this.compressedBuf, 0, i4);
                if (i2 == this.attempts[i]) {
                    if (!this.batchesRead.containsKey(Integer.valueOf(i))) {
                        this.batchesRead.put(Integer.valueOf(i), new HashSet());
                    }
                    Set<Integer> set = this.batchesRead.get(Integer.valueOf(i));
                    if (set.contains(Integer.valueOf(i3))) {
                        RssInputStream.logger.debug("Skip duplicated batch: mapId {}, attemptId {}, batchId {}.", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
                    } else {
                        set.add(Integer.valueOf(i3));
                        if (this.callback != null) {
                            this.callback.incBytesRead(16 + i4);
                        }
                        int originalLen = this.decompressor.getOriginalLen(this.compressedBuf);
                        if (this.decompressedBuf.length < originalLen) {
                            this.decompressedBuf = new byte[originalLen];
                        }
                        this.limit = this.decompressor.decompress(this.compressedBuf, this.decompressedBuf, 0);
                        this.position = 0;
                        z = true;
                    }
                }
            }
            if (this.callback != null) {
                this.callback.incReadTime(System.currentTimeMillis() - currentTimeMillis);
            }
            return z;
        }
    }

    public static RssInputStream create(CelebornConf celebornConf, TransportClientFactory transportClientFactory, String str, PartitionLocation[] partitionLocationArr, int[] iArr, int i, int i2, int i3) throws IOException {
        return (partitionLocationArr == null || partitionLocationArr.length == 0) ? emptyInputStream : new RssInputStreamImpl(celebornConf, transportClientFactory, str, (PartitionLocation[]) Arrays.copyOf(partitionLocationArr, partitionLocationArr.length), iArr, i, i2, i3);
    }

    public static RssInputStream empty() {
        return emptyInputStream;
    }

    public abstract void setCallback(MetricsCallback metricsCallback);
}
