package alluxio.client.block.stream;

import alluxio.client.ReadType;
import alluxio.client.block.stream.DataReader;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.OpenLocalBlockRequest;
import alluxio.grpc.OpenLocalBlockResponse;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.block.io.LocalFileBlockReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Objects;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/LocalFileDataReader.class */
public final class LocalFileDataReader implements DataReader {
    private final LocalFileBlockReader mReader;
    private final long mEnd;
    private final long mChunkSize;
    private long mPos;
    private boolean mClosed;

    @NotThreadSafe
    /* loaded from: input_file:alluxio/client/block/stream/LocalFileDataReader$Factory.class */
    public static class Factory implements DataReader.Factory {
        private final CloseableResource<BlockWorkerClient> mBlockWorker;
        private final long mBlockId;
        private final String mPath;
        private final long mLocalReaderChunkSize;
        private final int mReadBufferSize;
        private final GrpcBlockingStream<OpenLocalBlockRequest, OpenLocalBlockResponse> mStream;
        private LocalFileBlockReader mReader;
        private final long mDataTimeoutMs;
        private boolean mClosed;

        public Factory(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, long j, long j2, InStreamOptions inStreamOptions) throws IOException {
            AlluxioConfiguration clusterConf = fileSystemContext.getClusterConf();
            this.mBlockId = j;
            this.mLocalReaderChunkSize = j2;
            this.mReadBufferSize = clusterConf.getInt(PropertyKey.USER_STREAMING_READER_BUFFER_SIZE_MESSAGES);
            this.mDataTimeoutMs = clusterConf.getMs(PropertyKey.USER_STREAMING_DATA_TIMEOUT);
            if (clusterConf.getBoolean(PropertyKey.USER_DIRECT_MEMORY_IO_ENABLED)) {
                this.mBlockWorker = null;
                this.mStream = null;
                this.mPath = Paths.get(clusterConf.get(PropertyKey.Template.WORKER_TIERED_STORE_LEVEL_DIRS_PATH.format(0)).split(",")[0], clusterConf.get(PropertyKey.WORKER_DATA_FOLDER), Long.toString(j)).toString();
                return;
            }
            OpenLocalBlockRequest build = OpenLocalBlockRequest.newBuilder().setBlockId(this.mBlockId).setPromote(ReadType.fromProto(inStreamOptions.getOptions().getReadType()).isPromote()).build();
            this.mBlockWorker = fileSystemContext.acquireBlockWorkerClient(workerNetAddress);
            try {
                BlockWorkerClient blockWorkerClient = this.mBlockWorker.get();
                Objects.requireNonNull(blockWorkerClient);
                this.mStream = new GrpcBlockingStream<>(blockWorkerClient::openLocalBlock, this.mReadBufferSize, MoreObjects.toStringHelper((Class<?>) LocalFileDataReader.class).add("request", build).add("address", workerNetAddress).toString());
                this.mStream.send(build, this.mDataTimeoutMs);
                OpenLocalBlockResponse receive = this.mStream.receive(this.mDataTimeoutMs);
                Preconditions.checkState(receive.hasPath());
                this.mPath = receive.getPath();
            } catch (Exception e) {
                this.mBlockWorker.close();
                throw e;
            }
        }

        @Override // alluxio.client.block.stream.DataReader.Factory
        public DataReader create(long j, long j2) throws IOException {
            if (this.mReader == null) {
                this.mReader = new LocalFileBlockReader(this.mPath);
            }
            Preconditions.checkState(this.mReader.getUsageCount() == 0);
            this.mReader.increaseUsageCount();
            return new LocalFileDataReader(this.mReader, j, j2, this.mLocalReaderChunkSize);
        }

        @Override // alluxio.client.block.stream.DataReader.Factory
        public boolean isShortCircuit() {
            return true;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.mClosed) {
                return;
            }
            try {
                if (this.mReader != null) {
                    this.mReader.close();
                }
                if (this.mStream != null) {
                    this.mStream.close();
                    this.mStream.waitForComplete(this.mDataTimeoutMs);
                }
            } finally {
                this.mClosed = true;
                if (this.mBlockWorker != null) {
                    this.mBlockWorker.close();
                }
            }
        }
    }

    private LocalFileDataReader(LocalFileBlockReader localFileBlockReader, long j, long j2, long j3) {
        this.mReader = localFileBlockReader;
        Preconditions.checkArgument(j3 > 0);
        this.mPos = j;
        this.mEnd = Math.min(this.mReader.getLength(), j + j2);
        this.mChunkSize = j3;
    }

    @Override // alluxio.client.block.stream.DataReader
    public DataBuffer readChunk() throws IOException {
        if (this.mPos >= this.mEnd) {
            return null;
        }
        NioDataBuffer nioDataBuffer = new NioDataBuffer(this.mReader.read(this.mPos, Math.min(this.mChunkSize, this.mEnd - this.mPos)), r0.remaining());
        this.mPos += nioDataBuffer.getLength();
        MetricsSystem.counter(MetricKey.CLIENT_BYTES_READ_LOCAL.getName()).inc(nioDataBuffer.getLength());
        MetricsSystem.meter(MetricKey.CLIENT_BYTES_READ_LOCAL_THROUGHPUT.getName()).mark(nioDataBuffer.getLength());
        return nioDataBuffer;
    }

    @Override // alluxio.client.block.stream.DataReader
    public long pos() {
        return this.mPos;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        this.mClosed = true;
        this.mReader.decreaseUsageCount();
    }
}
