package alluxio.client.block.stream;

import alluxio.client.block.stream.DataReader;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.DataMessage;
import alluxio.grpc.ReadRequest;
import alluxio.grpc.ReadResponse;
import alluxio.grpc.ReadResponseMarshaller;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/GrpcDataReader.class */
public final class GrpcDataReader implements DataReader {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcDataReader.class);
    private final int mReaderBufferSizeMessages;
    private final long mDataTimeoutMs;
    private final FileSystemContext mContext;
    private final BlockWorkerClient mClient;
    private final ReadRequest mReadRequest;
    private final WorkerNetAddress mAddress;
    private final GrpcBlockingStream<ReadRequest, ReadResponse> mStream;
    private final ReadResponseMarshaller mMarshaller;
    private long mPosToRead;

    /* loaded from: input_file:alluxio/client/block/stream/GrpcDataReader$Factory.class */
    public static class Factory implements DataReader.Factory {
        private final FileSystemContext mContext;
        private final WorkerNetAddress mAddress;
        private final ReadRequest mReadRequestPartial;

        public Factory(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, ReadRequest readRequest) {
            this.mContext = fileSystemContext;
            this.mAddress = workerNetAddress;
            this.mReadRequestPartial = readRequest;
        }

        @Override // alluxio.client.block.stream.DataReader.Factory
        public DataReader create(long j, long j2) throws IOException {
            return new GrpcDataReader(this.mContext, this.mAddress, this.mReadRequestPartial.toBuilder().setOffset(j).setLength(j2).build());
        }

        @Override // alluxio.client.block.stream.DataReader.Factory
        public boolean isShortCircuit() {
            return false;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }
    }

    private GrpcDataReader(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, ReadRequest readRequest) throws IOException {
        this.mContext = fileSystemContext;
        this.mAddress = workerNetAddress;
        this.mPosToRead = readRequest.getOffset();
        this.mReadRequest = readRequest;
        AlluxioConfiguration clusterConf = fileSystemContext.getClusterConf();
        this.mReaderBufferSizeMessages = clusterConf.getInt(PropertyKey.USER_NETWORK_READER_BUFFER_SIZE_MESSAGES);
        this.mDataTimeoutMs = clusterConf.getMs(PropertyKey.USER_NETWORK_DATA_TIMEOUT_MS);
        this.mMarshaller = new ReadResponseMarshaller();
        this.mClient = this.mContext.acquireBlockWorkerClient(workerNetAddress);
        try {
            if (clusterConf.getBoolean(PropertyKey.USER_NETWORK_ZEROCOPY_ENABLED)) {
                BlockWorkerClient blockWorkerClient = this.mClient;
                blockWorkerClient.getClass();
                this.mStream = new GrpcDataMessageBlockingStream(blockWorkerClient::readBlock, this.mReaderBufferSizeMessages, MoreObjects.toStringHelper(this).add("request", this.mReadRequest).add("address", workerNetAddress).toString(), null, this.mMarshaller);
            } else {
                BlockWorkerClient blockWorkerClient2 = this.mClient;
                blockWorkerClient2.getClass();
                this.mStream = new GrpcBlockingStream<>(blockWorkerClient2::readBlock, this.mReaderBufferSizeMessages, MoreObjects.toStringHelper(this).add("request", this.mReadRequest).add("address", workerNetAddress).toString());
            }
            this.mStream.send(this.mReadRequest, this.mDataTimeoutMs);
        } catch (Exception e) {
            this.mContext.releaseBlockWorkerClient(workerNetAddress, this.mClient);
            throw e;
        }
    }

    @Override // alluxio.client.block.stream.DataReader
    public long pos() {
        return this.mPosToRead;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v52, types: [alluxio.network.protocol.databuffer.DataBuffer] */
    @Override // alluxio.client.block.stream.DataReader
    public DataBuffer readChunk() throws IOException {
        Preconditions.checkState(!this.mClient.isShutdown(), "Data reader is closed while reading data chunks.");
        NioDataBuffer nioDataBuffer = null;
        ReadResponse readResponse = null;
        if (this.mStream instanceof GrpcDataMessageBlockingStream) {
            DataMessage receiveDataMessage = ((GrpcDataMessageBlockingStream) this.mStream).receiveDataMessage(this.mDataTimeoutMs);
            if (receiveDataMessage != null) {
                readResponse = (ReadResponse) receiveDataMessage.getMessage();
                nioDataBuffer = (DataBuffer) receiveDataMessage.getBuffer();
                if (nioDataBuffer == null && readResponse.hasChunk() && readResponse.getChunk().hasData()) {
                    nioDataBuffer = new NioDataBuffer(readResponse.getChunk().getData().asReadOnlyByteBuffer(), r0.remaining());
                }
                Preconditions.checkState(nioDataBuffer != null, "response should always contain chunk");
            }
        } else {
            readResponse = this.mStream.receive(this.mDataTimeoutMs);
            if (readResponse != null) {
                Preconditions.checkState(readResponse.hasChunk() && readResponse.getChunk().hasData(), "response should always contain chunk");
                nioDataBuffer = new NioDataBuffer(readResponse.getChunk().getData().asReadOnlyByteBuffer(), r0.remaining());
            }
        }
        if (readResponse == null) {
            return null;
        }
        this.mPosToRead += nioDataBuffer.readableBytes();
        try {
            this.mStream.send(this.mReadRequest.toBuilder().setOffsetReceived(this.mPosToRead).build());
        } catch (Exception e) {
            LOG.debug("Failed to send receipt of data to worker {} for request {}: {}.", new Object[]{this.mAddress, this.mReadRequest, e.getMessage()});
        }
        Preconditions.checkState(this.mPosToRead - this.mReadRequest.getOffset() <= this.mReadRequest.getLength());
        return nioDataBuffer;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (this.mClient.isShutdown()) {
                return;
            }
            this.mStream.close();
            this.mStream.waitForComplete(this.mDataTimeoutMs);
        } finally {
            this.mMarshaller.close();
            this.mContext.releaseBlockWorkerClient(this.mAddress, this.mClient);
        }
    }
}
