package org.apache.celeborn.client.read;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.buffer.ManagedBuffer;
import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
import org.apache.celeborn.common.network.client.ChunkReceivedCallback;
import org.apache.celeborn.common.network.client.TransportClient;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.network.protocol.Message;
import org.apache.celeborn.common.network.protocol.OpenStream;
import org.apache.celeborn.common.network.protocol.StreamHandle;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.util.ExceptionUtils;
import org.apache.celeborn.shaded.io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/celeborn/client/read/WorkerPartitionReader.class */
public class WorkerPartitionReader implements PartitionReader {
    private PartitionLocation location;
    private final TransportClientFactory clientFactory;
    private StreamHandle streamHandle;
    private int returnedChunks;
    private int chunkIndex;
    private final int fetchMaxReqsInFlight;
    private int fetchChunkRetryCnt;
    private int fetchChunkMaxRetry;
    private final boolean testFetch;
    private final Logger logger = LoggerFactory.getLogger((Class<?>) WorkerPartitionReader.class);
    private final AtomicReference<IOException> exception = new AtomicReference<>();
    private boolean closed = false;
    private final LinkedBlockingQueue<ByteBuf> results = new LinkedBlockingQueue<>();
    private final ChunkReceivedCallback callback = new ChunkReceivedCallback() { // from class: org.apache.celeborn.client.read.WorkerPartitionReader.1
        @Override // org.apache.celeborn.common.network.client.ChunkReceivedCallback
        public void onSuccess(int i, ManagedBuffer managedBuffer) {
            synchronized (this) {
                ByteBuf buf = ((NettyManagedBuffer) managedBuffer).getBuf();
                if (!WorkerPartitionReader.this.closed) {
                    buf.retain();
                    WorkerPartitionReader.this.results.add(buf);
                }
            }
        }

        @Override // org.apache.celeborn.common.network.client.ChunkReceivedCallback
        public void onFailure(int i, Throwable th) {
            String str = "Fetch chunk " + i + " failed.";
            WorkerPartitionReader.this.logger.error(str, th);
            WorkerPartitionReader.this.exception.set(new IOException(str, th));
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public WorkerPartitionReader(CelebornConf celebornConf, String str, PartitionLocation partitionLocation, TransportClientFactory transportClientFactory, int i, int i2, int i3, int i4) throws IOException {
        this.fetchMaxReqsInFlight = celebornConf.fetchMaxReqsInFlight();
        try {
            TransportClient createClient = transportClientFactory.createClient(partitionLocation.getHost(), partitionLocation.getFetchPort());
            OpenStream openStream = new OpenStream(str, partitionLocation.getFileName(), i, i2);
            this.streamHandle = (StreamHandle) Message.decode(createClient.sendRpcSync(openStream.toByteBuffer(), celebornConf.fetchTimeoutMs()));
            this.location = partitionLocation;
            this.clientFactory = transportClientFactory;
            this.fetchChunkRetryCnt = i3;
            this.fetchChunkMaxRetry = i4;
            this.testFetch = celebornConf.testFetchFailure();
        } catch (InterruptedException e) {
            throw new IOException("Interrupted when createClient", e);
        }
    }

    @Override // org.apache.celeborn.client.read.PartitionReader
    public boolean hasNext() {
        return this.returnedChunks < this.streamHandle.numChunks;
    }

    @Override // org.apache.celeborn.client.read.PartitionReader
    public ByteBuf next() throws IOException {
        checkException();
        if (this.chunkIndex < this.streamHandle.numChunks) {
            fetchChunks();
        }
        ByteBuf byteBuf = null;
        while (byteBuf == null) {
            try {
                checkException();
                byteBuf = this.results.poll(500L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                IOException iOException = new IOException(e);
                this.exception.set(iOException);
                throw iOException;
            }
        }
        this.returnedChunks++;
        return byteBuf;
    }

    @Override // org.apache.celeborn.client.read.PartitionReader
    public void close() {
        synchronized (this) {
            this.closed = true;
        }
        if (this.results.size() > 0) {
            this.results.forEach((v0) -> {
                v0.release();
            });
        }
        this.results.clear();
    }

    @Override // org.apache.celeborn.client.read.PartitionReader
    public PartitionLocation getLocation() {
        return this.location;
    }

    private void fetchChunks() throws IOException {
        int i = this.chunkIndex - this.returnedChunks;
        if (i < this.fetchMaxReqsInFlight) {
            int min = Math.min((this.fetchMaxReqsInFlight - i) + 1, this.streamHandle.numChunks - this.chunkIndex);
            for (int i2 = 0; i2 < min; i2++) {
                if (this.testFetch && this.fetchChunkRetryCnt < this.fetchChunkMaxRetry - 1 && this.chunkIndex == 3) {
                    this.callback.onFailure(this.chunkIndex, new IOException("Test fetch chunk failure"));
                } else {
                    try {
                        this.clientFactory.createClient(this.location.getHost(), this.location.getFetchPort()).fetchChunk(this.streamHandle.streamId, this.chunkIndex, this.callback);
                        this.chunkIndex++;
                    } catch (IOException | InterruptedException e) {
                        this.logger.error("fetchChunk for streamId: {}, chunkIndex: {} failed.", Long.valueOf(this.streamHandle.streamId), Integer.valueOf(this.chunkIndex), e);
                        ExceptionUtils.wrapAndThrowIOException(e);
                    }
                }
            }
        }
    }

    private void checkException() throws IOException {
        IOException iOException = this.exception.get();
        if (iOException != null) {
            throw iOException;
        }
    }
}
