package alluxio.client.block.stream;

import alluxio.client.file.FileSystemContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.ReadRequest;
import alluxio.grpc.ReadResponse;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.resource.CloseableResource;
import alluxio.resource.LockResource;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.javax.annotation.Nullable;
import alluxio.shaded.client.javax.annotation.concurrent.GuardedBy;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/BufferCachingGrpcDataReader.class */
public class BufferCachingGrpcDataReader {
    private static final Logger LOG = LoggerFactory.getLogger(BufferCachingGrpcDataReader.class);
    private final WorkerNetAddress mAddress;
    private final CloseableResource<BlockWorkerClient> mClient;
    private final long mDataTimeoutMs;
    private final ReadRequest mReadRequest;
    private final GrpcBlockingStream<ReadRequest, ReadResponse> mStream;

    @GuardedBy("mBufferLocks")
    private final DataBuffer[] mDataBuffers;

    @VisibleForTesting
    protected long mPosToRead;
    private final AtomicInteger mRefCount = new AtomicInteger(0);
    private final AtomicInteger mBufferCount = new AtomicInteger(0);
    private final ReentrantReadWriteLock mBufferLocks = new ReentrantReadWriteLock();

    @VisibleForTesting
    protected BufferCachingGrpcDataReader(WorkerNetAddress workerNetAddress, CloseableResource<BlockWorkerClient> closeableResource, long j, ReadRequest readRequest, GrpcBlockingStream<ReadRequest, ReadResponse> grpcBlockingStream) {
        this.mAddress = workerNetAddress;
        this.mClient = closeableResource;
        this.mDataTimeoutMs = j;
        this.mPosToRead = readRequest.getOffset();
        this.mReadRequest = readRequest;
        this.mStream = grpcBlockingStream;
        long length = this.mReadRequest.getLength() + this.mReadRequest.getOffset();
        long chunkSize = this.mReadRequest.getChunkSize();
        int i = (int) (length / chunkSize);
        this.mDataBuffers = new DataBuffer[length % chunkSize != 0 ? i + 1 : i];
    }

    @Nullable
    public DataBuffer readChunk(int i) throws IOException {
        if (i >= this.mDataBuffers.length) {
            return null;
        }
        if (i >= this.mBufferCount.get()) {
            LockResource lockResource = new LockResource(this.mBufferLocks.writeLock());
            while (i >= this.mBufferCount.get()) {
                try {
                    this.mDataBuffers[this.mBufferCount.get()] = readChunk();
                    this.mBufferCount.incrementAndGet();
                } catch (Throwable th) {
                    try {
                        lockResource.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
            lockResource.close();
        }
        return this.mDataBuffers[i];
    }

    @Nullable
    @VisibleForTesting
    protected DataBuffer readChunk() throws IOException {
        Preconditions.checkState(!this.mClient.get().isShutdown(), "Data reader is closed while reading data chunks.");
        ReadResponse receive = this.mStream.receive(this.mDataTimeoutMs);
        if (receive == null) {
            return null;
        }
        Preconditions.checkState(receive.hasChunk() && receive.getChunk().hasData(), "response should always contain chunk");
        NioDataBuffer nioDataBuffer = new NioDataBuffer(receive.getChunk().getData().asReadOnlyByteBuffer(), r0.remaining());
        this.mPosToRead += nioDataBuffer.readableBytes();
        try {
            this.mStream.send(this.mReadRequest.toBuilder().setOffsetReceived(this.mPosToRead).build());
        } catch (Exception e) {
            LOG.debug("Failed to send receipt of data to worker {} for request {}: {}.", new Object[]{this.mAddress, this.mReadRequest, e.getMessage()});
        }
        Preconditions.checkState(this.mPosToRead - this.mReadRequest.getOffset() <= this.mReadRequest.getLength());
        return nioDataBuffer;
    }

    public void close() throws IOException {
        try {
            if (this.mClient.get().isShutdown()) {
                return;
            }
            this.mStream.close();
            this.mStream.waitForComplete(this.mDataTimeoutMs);
        } finally {
            this.mClient.close();
        }
    }

    public int ref() {
        return this.mRefCount.incrementAndGet();
    }

    public int deRef() {
        return this.mRefCount.decrementAndGet();
    }

    public int getRefCount() {
        return this.mRefCount.get();
    }

    public static BufferCachingGrpcDataReader create(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, ReadRequest readRequest) throws IOException {
        AlluxioConfiguration clusterConf = fileSystemContext.getClusterConf();
        int i = clusterConf.getInt(PropertyKey.USER_STREAMING_READER_BUFFER_SIZE_MESSAGES);
        long ms = clusterConf.getMs(PropertyKey.USER_STREAMING_DATA_TIMEOUT);
        CloseableResource<BlockWorkerClient> acquireBlockWorkerClient = fileSystemContext.acquireBlockWorkerClient(workerNetAddress);
        String format = LOG.isDebugEnabled() ? String.format("BufferCachingGrpcDataReader(request=%s,address=%s)", readRequest, workerNetAddress) : "BufferCachingGrpcDataReader";
        GrpcBlockingStream grpcBlockingStream = null;
        try {
            BlockWorkerClient blockWorkerClient = acquireBlockWorkerClient.get();
            Objects.requireNonNull(blockWorkerClient);
            grpcBlockingStream = new GrpcBlockingStream(blockWorkerClient::readBlock, i, format);
            grpcBlockingStream.send(readRequest, ms);
            return new BufferCachingGrpcDataReader(workerNetAddress, acquireBlockWorkerClient, ms, readRequest, grpcBlockingStream);
        } catch (Exception e) {
            if (grpcBlockingStream != null) {
                grpcBlockingStream.close();
            }
            acquireBlockWorkerClient.close();
            throw e;
        }
    }
}
