package alluxio.master.block;

import alluxio.clock.ManualClock;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.grpc.BlockHeartbeatPRequest;
import alluxio.grpc.BlockHeartbeatPResponse;
import alluxio.grpc.BlockIdList;
import alluxio.grpc.BlockStoreLocationProto;
import alluxio.grpc.GetRegisterLeasePRequest;
import alluxio.grpc.LocationBlockIdListEntry;
import alluxio.grpc.RegisterWorkerPOptions;
import alluxio.grpc.RegisterWorkerPRequest;
import alluxio.grpc.RegisterWorkerPResponse;
import alluxio.master.CoreMasterContext;
import alluxio.master.MasterRegistry;
import alluxio.master.MasterTestUtils;
import alluxio.master.metrics.MetricsMaster;
import alluxio.master.metrics.MetricsMasterFactory;
import alluxio.util.SleepUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.block.BlockStoreLocation;
import com.google.common.collect.ImmutableList;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:alluxio/master/block/BlockMasterWorkerServiceHandlerTest.class */
public class BlockMasterWorkerServiceHandlerTest {
    private static final WorkerNetAddress NET_ADDRESS_1 = new WorkerNetAddress().setHost("localhost").setRpcPort(80).setDataPort(81).setWebPort(82);
    private BlockMaster mBlockMaster;
    private MasterRegistry mRegistry;
    private ManualClock mClock;
    private ExecutorService mExecutorService;
    private MetricsMaster mMetricsMaster;
    private BlockMasterWorkerServiceHandler mHandler;

    @Before
    public void before() throws Exception {
        initServiceHandler(true);
    }

    public void initServiceHandler(boolean z) throws Exception {
        if (z) {
            ServerConfiguration.set(PropertyKey.MASTER_WORKER_REGISTER_LEASE_ENABLED, true);
            ServerConfiguration.set(PropertyKey.MASTER_WORKER_REGISTER_LEASE_TTL, "3s");
            ServerConfiguration.set(PropertyKey.MASTER_WORKER_REGISTER_LEASE_RESPECT_JVM_SPACE, false);
            ServerConfiguration.set(PropertyKey.MASTER_WORKER_REGISTER_LEASE_COUNT, 1);
        } else {
            ServerConfiguration.set(PropertyKey.MASTER_WORKER_REGISTER_LEASE_ENABLED, false);
        }
        this.mRegistry = new MasterRegistry();
        CoreMasterContext testMasterContext = MasterTestUtils.testMasterContext();
        this.mMetricsMaster = new MetricsMasterFactory().create(this.mRegistry, testMasterContext);
        this.mClock = new ManualClock();
        this.mExecutorService = Executors.newFixedThreadPool(2, ThreadFactoryUtils.build("TestBlockMaster-%d", true));
        this.mBlockMaster = new DefaultBlockMaster(this.mMetricsMaster, testMasterContext, this.mClock, ExecutorServiceFactories.constantExecutorServiceFactory(this.mExecutorService));
        this.mRegistry.add(BlockMaster.class, this.mBlockMaster);
        this.mRegistry.start(true);
        this.mHandler = new BlockMasterWorkerServiceHandler(this.mBlockMaster);
    }

    @After
    public void after() throws Exception {
        this.mRegistry.stop();
    }

    @Test
    public void registerWithNoLeaseIsRejected() {
        long workerId = this.mBlockMaster.getWorkerId(NET_ADDRESS_1);
        BlockStoreLocation blockStoreLocation = new BlockStoreLocation("MEM", 0);
        RegisterWorkerPRequest build = RegisterWorkerPRequest.newBuilder().setWorkerId(workerId).addStorageTiers("MEM").putTotalBytesOnTiers("MEM", 1000L).putUsedBytesOnTiers("MEM", 0L).setOptions(RegisterWorkerPOptions.getDefaultInstance()).addCurrentBlocks(LocationBlockIdListEntry.newBuilder().setKey(BlockStoreLocationProto.newBuilder().setTierAlias(blockStoreLocation.tierAlias()).setMediumType(blockStoreLocation.mediumType()).build()).setValue(BlockIdList.newBuilder().addAllBlockId(ImmutableList.of(1L, 2L)).build()).build()).build();
        final ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        this.mHandler.registerWorker(build, new StreamObserver<RegisterWorkerPResponse>() { // from class: alluxio.master.block.BlockMasterWorkerServiceHandlerTest.1
            public void onNext(RegisterWorkerPResponse registerWorkerPResponse) {
            }

            public void onError(Throwable th) {
                concurrentLinkedDeque.offer(th);
            }

            public void onCompleted() {
            }
        });
        Assert.assertEquals(1L, concurrentLinkedDeque.size());
        Assert.assertThat(((Throwable) concurrentLinkedDeque.poll()).getMessage(), CoreMatchers.containsString("does not have a lease or the lease has expired."));
    }

    @Test
    public void registerWorkerFailsOnDuplicateBlockLocation() throws Exception {
        long workerId = this.mBlockMaster.getWorkerId(NET_ADDRESS_1);
        BlockStoreLocation blockStoreLocation = new BlockStoreLocation("MEM", 0);
        BlockStoreLocationProto build = BlockStoreLocationProto.newBuilder().setTierAlias(blockStoreLocation.tierAlias()).setMediumType(blockStoreLocation.mediumType()).build();
        LocationBlockIdListEntry build2 = LocationBlockIdListEntry.newBuilder().setKey(build).setValue(BlockIdList.newBuilder().addAllBlockId(ImmutableList.of(1L, 2L)).build()).build();
        LocationBlockIdListEntry build3 = LocationBlockIdListEntry.newBuilder().setKey(build).setValue(BlockIdList.newBuilder().addAllBlockId(ImmutableList.of(3L, 4L)).build()).build();
        Assert.assertTrue(this.mBlockMaster.tryAcquireRegisterLease(GetRegisterLeasePRequest.newBuilder().setWorkerId(workerId).setBlockCount(r0.getBlockIdCount() + r0.getBlockIdCount()).build()).isPresent());
        RegisterWorkerPRequest build4 = RegisterWorkerPRequest.newBuilder().setWorkerId(workerId).addStorageTiers("MEM").putTotalBytesOnTiers("MEM", 1000L).putUsedBytesOnTiers("MEM", 0L).setOptions(RegisterWorkerPOptions.getDefaultInstance()).addCurrentBlocks(build2).addCurrentBlocks(build3).build();
        StreamObserver<RegisterWorkerPResponse> streamObserver = new StreamObserver<RegisterWorkerPResponse>() { // from class: alluxio.master.block.BlockMasterWorkerServiceHandlerTest.2
            public void onNext(RegisterWorkerPResponse registerWorkerPResponse) {
            }

            public void onError(Throwable th) {
            }

            public void onCompleted() {
            }
        };
        Assert.assertThrows(AssertionError.class, () -> {
            this.mHandler.registerWorker(build4, streamObserver);
        });
        this.mBlockMaster.releaseRegisterLease(workerId);
    }

    @Test
    public void registerLeaseExpired() {
        long workerId = this.mBlockMaster.getWorkerId(NET_ADDRESS_1);
        BlockStoreLocation blockStoreLocation = new BlockStoreLocation("MEM", 0);
        BlockStoreLocationProto build = BlockStoreLocationProto.newBuilder().setTierAlias(blockStoreLocation.tierAlias()).setMediumType(blockStoreLocation.mediumType()).build();
        LocationBlockIdListEntry build2 = LocationBlockIdListEntry.newBuilder().setKey(build).setValue(BlockIdList.newBuilder().addAllBlockId(ImmutableList.of(1L, 2L)).build()).build();
        Assert.assertTrue(this.mBlockMaster.tryAcquireRegisterLease(GetRegisterLeasePRequest.newBuilder().setWorkerId(workerId).setBlockCount(r0.getBlockIdCount()).build()).isPresent());
        SleepUtils.sleepMs(5000L);
        Assert.assertTrue(this.mBlockMaster.tryAcquireRegisterLease(GetRegisterLeasePRequest.newBuilder().setWorkerId(workerId + 1).setBlockCount(r0.getBlockIdCount()).build()).isPresent());
        RegisterWorkerPRequest build3 = RegisterWorkerPRequest.newBuilder().setWorkerId(workerId).addStorageTiers("MEM").putTotalBytesOnTiers("MEM", 1000L).putUsedBytesOnTiers("MEM", 0L).setOptions(RegisterWorkerPOptions.getDefaultInstance()).addCurrentBlocks(build2).build();
        final ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        this.mHandler.registerWorker(build3, new StreamObserver<RegisterWorkerPResponse>() { // from class: alluxio.master.block.BlockMasterWorkerServiceHandlerTest.3
            public void onNext(RegisterWorkerPResponse registerWorkerPResponse) {
            }

            public void onError(Throwable th) {
                concurrentLinkedDeque.offer(th);
            }

            public void onCompleted() {
            }
        });
        Assert.assertEquals(1L, concurrentLinkedDeque.size());
        Assert.assertThat(((Throwable) concurrentLinkedDeque.poll()).getMessage(), CoreMatchers.containsString("does not have a lease or the lease has expired."));
        this.mBlockMaster.releaseRegisterLease(workerId + 1);
    }

    @Test
    public void registerLeaseTurnedOff() throws Exception {
        initServiceHandler(false);
        long workerId = this.mBlockMaster.getWorkerId(NET_ADDRESS_1);
        BlockStoreLocation blockStoreLocation = new BlockStoreLocation("MEM", 0);
        RegisterWorkerPRequest build = RegisterWorkerPRequest.newBuilder().setWorkerId(workerId).addStorageTiers("MEM").putTotalBytesOnTiers("MEM", 1000L).putUsedBytesOnTiers("MEM", 0L).setOptions(RegisterWorkerPOptions.getDefaultInstance()).addCurrentBlocks(LocationBlockIdListEntry.newBuilder().setKey(BlockStoreLocationProto.newBuilder().setTierAlias(blockStoreLocation.tierAlias()).setMediumType(blockStoreLocation.mediumType()).build()).setValue(BlockIdList.newBuilder().addAllBlockId(ImmutableList.of(1L, 2L)).build()).build()).build();
        final ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        this.mHandler.registerWorker(build, new StreamObserver<RegisterWorkerPResponse>() { // from class: alluxio.master.block.BlockMasterWorkerServiceHandlerTest.4
            public void onNext(RegisterWorkerPResponse registerWorkerPResponse) {
            }

            public void onError(Throwable th) {
                concurrentLinkedDeque.offer(th);
            }

            public void onCompleted() {
            }
        });
        Assert.assertEquals(0L, concurrentLinkedDeque.size());
    }

    @Test
    public void workerHeartbeatFailsOnDuplicateBlockLocation() throws Exception {
        long workerId = this.mBlockMaster.getWorkerId(NET_ADDRESS_1);
        BlockStoreLocation blockStoreLocation = new BlockStoreLocation("MEM", 0);
        BlockStoreLocationProto build = BlockStoreLocationProto.newBuilder().setTierAlias(blockStoreLocation.tierAlias()).setMediumType(blockStoreLocation.mediumType()).build();
        LocationBlockIdListEntry build2 = LocationBlockIdListEntry.newBuilder().setKey(build).setValue(BlockIdList.newBuilder().addAllBlockId(ImmutableList.of(1L, 2L)).build()).build();
        BlockHeartbeatPRequest build3 = BlockHeartbeatPRequest.newBuilder().setWorkerId(workerId).addAddedBlocks(build2).addAddedBlocks(LocationBlockIdListEntry.newBuilder().setKey(build).setValue(BlockIdList.newBuilder().addAllBlockId(ImmutableList.of(3L, 4L)).build()).build()).build();
        StreamObserver<BlockHeartbeatPResponse> streamObserver = new StreamObserver<BlockHeartbeatPResponse>() { // from class: alluxio.master.block.BlockMasterWorkerServiceHandlerTest.5
            public void onNext(BlockHeartbeatPResponse blockHeartbeatPResponse) {
            }

            public void onError(Throwable th) {
            }

            public void onCompleted() {
            }
        };
        Assert.assertThrows(AssertionError.class, () -> {
            this.mHandler.blockHeartbeat(build3, streamObserver);
        });
    }
}
