package alluxio.client.block;

import alluxio.client.ClientContext;
import alluxio.client.RemoteBlockReader;
import alluxio.exception.ConnectionFailedException;
import alluxio.exception.ExceptionMessage;
import alluxio.wire.LockBlockResult;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.ClientMetrics;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/RemoteBlockInStream.class */
public final class RemoteBlockInStream extends BufferedBlockInStream {
    private final WorkerNetAddress mWorkerNetAddress;
    private final InetSocketAddress mWorkerInetSocketAddress;
    private final Long mLockId;
    private final BlockWorkerClient mBlockWorkerClient;
    private final BlockStoreContext mContext;
    private final ClientMetrics mMetrics;

    public RemoteBlockInStream(long j, long j2, WorkerNetAddress workerNetAddress) throws IOException {
        super(j, j2);
        this.mWorkerNetAddress = workerNetAddress;
        this.mWorkerInetSocketAddress = new InetSocketAddress(workerNetAddress.getHost(), workerNetAddress.getDataPort());
        this.mContext = BlockStoreContext.INSTANCE;
        this.mBlockWorkerClient = this.mContext.acquireWorkerClient(workerNetAddress);
        try {
            LockBlockResult lockBlock = this.mBlockWorkerClient.lockBlock(j);
            if (lockBlock == null) {
                throw new IOException(ExceptionMessage.BLOCK_UNAVAILABLE.getMessage(new Object[]{Long.valueOf(j)}));
            }
            this.mLockId = Long.valueOf(lockBlock.getLockId());
            this.mMetrics = this.mBlockWorkerClient.getClientMetrics();
        } catch (IOException e) {
            this.mContext.releaseWorkerClient(this.mBlockWorkerClient);
            throw e;
        }
    }

    @Override // alluxio.client.block.BufferedBlockInStream, java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        if (this.mBlockIsRead) {
            this.mMetrics.incBlocksReadRemote(1L);
        }
        try {
            try {
                this.mBlockWorkerClient.unlockBlock(this.mBlockId);
                this.mContext.releaseWorkerClient(this.mBlockWorkerClient);
                this.mClosed = true;
            } catch (ConnectionFailedException e) {
                throw new IOException((Throwable) e);
            }
        } catch (Throwable th) {
            this.mContext.releaseWorkerClient(this.mBlockWorkerClient);
            throw th;
        }
    }

    @Override // alluxio.client.block.BufferedBlockInStream
    protected void bufferedRead(int i) throws IOException {
        this.mBuffer.clear();
        this.mBuffer.limit(readFromRemote(this.mBuffer.array(), 0, i));
    }

    @Override // alluxio.client.block.BufferedBlockInStream
    protected int directRead(byte[] bArr, int i, int i2) throws IOException {
        return readFromRemote(bArr, i, i2);
    }

    @Override // alluxio.client.block.BufferedBlockInStream
    protected void incrementBytesReadMetric(int i) {
        this.mMetrics.incBytesReadRemote(i);
    }

    public WorkerNetAddress getWorkerNetAddress() {
        return this.mWorkerNetAddress;
    }

    private int readFromRemote(byte[] bArr, int i, int i2) throws IOException {
        int min = (int) Math.min(i2, remaining());
        int i3 = min;
        while (i3 > 0) {
            RemoteBlockReader create = RemoteBlockReader.Factory.create(ClientContext.getConf());
            try {
                ByteBuffer readRemoteBlock = create.readRemoteBlock(this.mWorkerInetSocketAddress, this.mBlockId, getPosition(), i3, this.mLockId.longValue(), this.mBlockWorkerClient.getSessionId());
                int remaining = readRemoteBlock.remaining();
                readRemoteBlock.get(bArr, i, remaining);
                i3 -= remaining;
                create.close();
            } catch (Throwable th) {
                create.close();
                throw th;
            }
        }
        return min;
    }
}
