package alluxio.client.fs.io;

import alluxio.AlluxioURI;
import alluxio.annotation.dora.DoraTestTodoItem;
import alluxio.client.block.BlockMasterClient;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.FileSystemMasterClient;
import alluxio.client.file.FileSystemTestUtils;
import alluxio.client.file.URIStatus;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.DecommissionWorkerPOptions;
import alluxio.grpc.GetStatusPOptions;
import alluxio.grpc.GrpcUtils;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
import alluxio.grpc.WritePType;
import alluxio.security.user.TestUserState;
import alluxio.testutils.LocalAlluxioClusterResource;
import alluxio.util.SleepUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.io.PathUtils;
import alluxio.wire.BlockLocation;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.WorkerNetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

@DoraTestTodoItem(action = DoraTestTodoItem.Action.FIX, owner = "jiacheng", comment = "check if decommission is still a relevant feature")
@Ignore
/* loaded from: input_file:alluxio/client/fs/io/FileInStreamDecommissionIntegrationTest.class */
public class FileInStreamDecommissionIntegrationTest {
    private static final int BLOCK_SIZE = 1048576;
    private static final int LENGTH = 2097152;
    private static final int CLIENT_WORKER_LIST_REFRESH_INTERVAL = 2000;
    private CreateFilePOptions mWriteBoth;
    private CreateFilePOptions mWriteAlluxio;
    private OpenFilePOptions mReadNoCache;
    private OpenFilePOptions mReadCachePromote;
    private String mTestPath;
    private ExecutorService mThreadPool;
    private String mCacheThroughFilePath;
    private String mMustCacheFilePath;

    @Rule
    public LocalAlluxioClusterResource mLocalAlluxioClusterResource = new LocalAlluxioClusterResource.Builder().setNumWorkers(2).setProperty(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT, Integer.valueOf(BLOCK_SIZE)).setProperty(PropertyKey.USER_WORKER_LIST_REFRESH_INTERVAL, "2s").setProperty(PropertyKey.USER_SHORT_CIRCUIT_ENABLED, false).setStartCluster(false).build();
    private FileSystem mFileSystem = null;

    @Rule
    public ExpectedException mThrown = ExpectedException.none();

    @Before
    public final void setUp() throws Exception {
        this.mLocalAlluxioClusterResource.start();
        this.mFileSystem = this.mLocalAlluxioClusterResource.get().getClient();
        WorkerNetAddress workerAddress = this.mLocalAlluxioClusterResource.get().getWorkerAddress();
        this.mWriteBoth = CreateFilePOptions.newBuilder().setBlockSizeBytes(1048576L).setWriteType(WritePType.CACHE_THROUGH).setWorkerLocation(GrpcUtils.toProto(workerAddress)).setRecursive(true).build();
        this.mWriteAlluxio = CreateFilePOptions.newBuilder().setBlockSizeBytes(1048576L).setWriteType(WritePType.MUST_CACHE).setWorkerLocation(GrpcUtils.toProto(workerAddress)).setRecursive(true).build();
        this.mReadCachePromote = OpenFilePOptions.newBuilder().setReadType(ReadPType.CACHE_PROMOTE).build();
        this.mReadNoCache = OpenFilePOptions.newBuilder().setReadType(ReadPType.NO_CACHE).build();
        this.mTestPath = PathUtils.uniqPath();
        this.mCacheThroughFilePath = this.mTestPath + "/file_BOTH";
        this.mMustCacheFilePath = this.mTestPath + "/file_CACHE";
        FileSystemTestUtils.createByteFile(this.mFileSystem, new AlluxioURI(this.mCacheThroughFilePath), this.mWriteBoth, LENGTH);
        FileSystemTestUtils.createByteFile(this.mFileSystem, new AlluxioURI(this.mMustCacheFilePath), this.mWriteAlluxio, LENGTH);
        this.mThreadPool = Executors.newFixedThreadPool(1, ThreadFactoryUtils.build("decommission-worker-%d", true));
    }

    @After
    public final void tearDown() throws Exception {
        this.mLocalAlluxioClusterResource.stop();
        this.mThreadPool.shutdownNow();
    }

    private List<CreateFilePOptions> getOptionSet() {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(this.mWriteBoth);
        arrayList.add(this.mWriteAlluxio);
        return arrayList;
    }

    @Test
    public void readUfsFromUndecommissionedWorker() throws Exception {
        AlluxioURI alluxioURI = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext create = FileSystemContext.create(new TestUserState("test", Configuration.global()).getSubject(), Configuration.global());
        Assert.assertEquals(2L, ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).getWorkerInfoList().size());
        WorkerNetAddress workerAddress = ((BlockLocation) ((FileBlockInfo) ((FileSystemMasterClient) create.acquireMasterClientResource().get()).getStatus(alluxioURI, GetStatusPOptions.getDefaultInstance()).getFileBlockInfos().get(0)).getBlockInfo().getLocations().get(0)).getWorkerAddress();
        ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).decommissionWorker(DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerAddress.getHost()).setWorkerWebPort(workerAddress.getWebPort()).setCanRegisterAgain(true).build());
        FileInStream openFile = this.mFileSystem.openFile(alluxioURI, this.mReadCachePromote);
        byte[] bArr = new byte[BLOCK_SIZE];
        int i = 0;
        int i2 = 0;
        while (i2 != -1) {
            i2 = openFile.read(bArr);
            if (i2 != -1) {
                i += i2;
            }
        }
        Assert.assertEquals(i, 2097152L);
        openFile.close();
        URIStatus status = ((FileSystemMasterClient) create.acquireMasterClientResource().get()).getStatus(alluxioURI, GetStatusPOptions.getDefaultInstance());
        Assert.assertEquals(2L, status.getFileBlockInfos().size());
        List locations = ((FileBlockInfo) status.getFileBlockInfos().get(0)).getBlockInfo().getLocations();
        Assert.assertEquals(1L, locations.size());
        Assert.assertNotEquals(workerAddress, ((BlockLocation) locations.get(0)).getWorkerAddress());
        List locations2 = ((FileBlockInfo) status.getFileBlockInfos().get(1)).getBlockInfo().getLocations();
        Assert.assertEquals(1L, locations2.size());
        Assert.assertNotEquals(workerAddress, ((BlockLocation) locations2.get(0)).getWorkerAddress());
    }

    @Test
    public void cannotReadCacheFromDecommissionedWorker() throws Exception {
        AlluxioURI alluxioURI = new AlluxioURI(this.mMustCacheFilePath);
        FileSystemContext create = FileSystemContext.create(new TestUserState("test", Configuration.global()).getSubject(), Configuration.global());
        Assert.assertEquals(2L, ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).getWorkerInfoList().size());
        ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).decommissionWorker(DecommissionWorkerPOptions.newBuilder().setWorkerHostname(((BlockLocation) ((FileBlockInfo) ((FileSystemMasterClient) create.acquireMasterClientResource().get()).getStatus(alluxioURI, GetStatusPOptions.getDefaultInstance()).getFileBlockInfos().get(0)).getBlockInfo().getLocations().get(0)).getWorkerAddress().getHost()).setWorkerWebPort(r0.getWebPort()).setCanRegisterAgain(true).build());
        FileInStream openFile = this.mFileSystem.openFile(alluxioURI, this.mReadCachePromote);
        Assert.assertThrows(UnavailableException.class, () -> {
            openFile.read();
        });
        openFile.close();
    }

    @Test
    public void decommissionWhileReading() throws Exception {
        AlluxioURI alluxioURI = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext create = FileSystemContext.create(new TestUserState("test", Configuration.global()).getSubject(), Configuration.global());
        Assert.assertEquals(2L, ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).getWorkerInfoList().size());
        WorkerNetAddress workerAddress = ((BlockLocation) ((FileBlockInfo) ((FileSystemMasterClient) create.acquireMasterClientResource().get()).getStatus(alluxioURI, GetStatusPOptions.getDefaultInstance()).getFileBlockInfos().get(0)).getBlockInfo().getLocations().get(0)).getWorkerAddress();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.mThreadPool.submit(() -> {
            try {
                countDownLatch.await();
                ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).decommissionWorker(DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerAddress.getHost()).setWorkerWebPort(workerAddress.getWebPort()).setCanRegisterAgain(true).build());
                Assert.assertEquals(1L, ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).getWorkerInfoList().size());
                countDownLatch2.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        FileInStream openFile = this.mFileSystem.openFile(alluxioURI, this.mReadNoCache);
        byte[] bArr = new byte[1024];
        int i = 0;
        int i2 = 0;
        boolean z = false;
        while (i != -1) {
            if (i2 > 1024 && !z) {
                countDownLatch.countDown();
                z = true;
                countDownLatch2.await();
            }
            i = openFile.read(bArr);
            if (i != -1) {
                i2 += i;
            }
        }
        Assert.assertEquals(i2, 2097152L);
        openFile.close();
    }

    @Test
    public void halfStreamFromAnotherWorker() throws Exception {
        AlluxioURI alluxioURI = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext create = FileSystemContext.create(new TestUserState("test", Configuration.global()).getSubject(), Configuration.global());
        Assert.assertEquals(2L, ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).getWorkerInfoList().size());
        WorkerNetAddress workerAddress = ((BlockLocation) ((FileBlockInfo) ((FileSystemMasterClient) create.acquireMasterClientResource().get()).getStatus(alluxioURI, GetStatusPOptions.getDefaultInstance()).getFileBlockInfos().get(0)).getBlockInfo().getLocations().get(0)).getWorkerAddress();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.mThreadPool.submit(() -> {
            try {
                countDownLatch.await();
                ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).decommissionWorker(DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerAddress.getHost()).setWorkerWebPort(workerAddress.getWebPort()).setCanRegisterAgain(true).build());
                Assert.assertEquals(1L, ((BlockMasterClient) create.acquireBlockMasterClientResource().get()).getWorkerInfoList().size());
                countDownLatch2.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        FileInStream openFile = this.mFileSystem.openFile(alluxioURI, this.mReadCachePromote);
        byte[] bArr = new byte[1024];
        int i = 0;
        int i2 = 0;
        boolean z = false;
        while (i != -1) {
            if (i2 == BLOCK_SIZE && !z) {
                countDownLatch.countDown();
                z = true;
                countDownLatch2.await();
                SleepUtils.sleepMs(2000L);
                Assert.assertEquals(1L, create.getCachedWorkers().size());
            }
            i = openFile.read(bArr);
            if (i != -1) {
                i2 += i;
            }
        }
        Assert.assertEquals(i2, 2097152L);
        Assert.assertNotEquals(((BlockLocation) ((FileBlockInfo) ((FileSystemMasterClient) create.acquireMasterClientResource().get()).getStatus(alluxioURI, GetStatusPOptions.getDefaultInstance()).getFileBlockInfos().get(1)).getBlockInfo().getLocations().get(0)).getWorkerAddress(), workerAddress);
        openFile.close();
    }
}
