From 0087e42da696844de98c2475f9d96b62e9969ea2 Mon Sep 17 00:00:00 2001 From: Bin Fan Date: Mon, 14 Aug 2023 08:41:39 -0700 Subject: [PATCH] Address comments --- dora/core/server/master/pom.xml | 5 + .../alluxio/master/block/BlockMasterTest.java | 1110 +++++++++++++++++ dora/tests/pom.xml | 7 - 3 files changed, 1115 insertions(+), 7 deletions(-) create mode 100644 dora/core/server/master/src/test/java/alluxio/master/block/BlockMasterTest.java diff --git a/dora/core/server/master/pom.xml b/dora/core/server/master/pom.xml index 0fd0c1a7e66d..ea87ab0cbd28 100644 --- a/dora/core/server/master/pom.xml +++ b/dora/core/server/master/pom.xml @@ -75,6 +75,11 @@ alluxio-job-client ${project.version} + + org.alluxio + alluxio-stress-shell + ${project.version} + org.apache.commons commons-lang3 diff --git a/dora/core/server/master/src/test/java/alluxio/master/block/BlockMasterTest.java b/dora/core/server/master/src/test/java/alluxio/master/block/BlockMasterTest.java new file mode 100644 index 000000000000..40a70160266f --- /dev/null +++ b/dora/core/server/master/src/test/java/alluxio/master/block/BlockMasterTest.java @@ -0,0 +1,1110 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.master.block; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import alluxio.Constants; +import alluxio.client.block.options.GetWorkerReportOptions; +import alluxio.clock.ManualClock; +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; +import alluxio.exception.BlockInfoException; +import alluxio.exception.ExceptionMessage; +import alluxio.exception.status.NotFoundException; +import alluxio.grpc.BuildVersion; +import alluxio.grpc.Command; +import alluxio.grpc.CommandType; +import alluxio.grpc.ConfigProperty; +import alluxio.grpc.DecommissionWorkerPOptions; +import alluxio.grpc.RegisterWorkerPOptions; +import alluxio.grpc.RegisterWorkerPRequest; +import alluxio.grpc.RegisterWorkerPResponse; +import alluxio.grpc.StorageList; +import alluxio.grpc.WorkerLostStorageInfo; +import alluxio.heartbeat.HeartbeatContext; +import alluxio.heartbeat.HeartbeatScheduler; +import alluxio.heartbeat.ManuallyScheduleHeartbeat; +import alluxio.master.AlwaysPrimaryPrimarySelector; +import alluxio.master.CoreMasterContext; +import alluxio.master.MasterRegistry; +import alluxio.master.MasterTestUtils; +import alluxio.master.WorkerState; +import alluxio.master.block.meta.MasterWorkerInfo; +import alluxio.master.journal.JournalSystem; +import alluxio.master.journal.noop.NoopJournalSystem; +import alluxio.master.metrics.MetricsMaster; +import alluxio.master.metrics.MetricsMasterFactory; +import alluxio.metrics.Metric; +import alluxio.proto.meta.Block; +import alluxio.util.ThreadFactoryUtils; +import alluxio.util.executor.ExecutorServiceFactories; +import alluxio.wire.BlockInfo; +import alluxio.wire.BlockLocation; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerNetAddress; +import alluxio.worker.block.BlockStoreLocation; +import alluxio.worker.block.RegisterStreamer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.grpc.stub.StreamObserver; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * Unit tests for {@link BlockMaster}. + */ +public class BlockMasterTest { + public static final long CAPACITY = 20L * 1024 * 1024 * 1024; // 20GB + private static final WorkerNetAddress NET_ADDRESS_1 = new WorkerNetAddress().setHost("localhost") + .setRpcPort(80).setDataPort(81).setWebPort(82); + private static final WorkerNetAddress NET_ADDRESS_2 = new WorkerNetAddress().setHost("localhost") + .setRpcPort(83).setDataPort(84).setWebPort(85); + + private static final List NO_BLOCKS = ImmutableList.of(); + private static final Map> NO_BLOCKS_ON_LOCATION + = ImmutableMap.of(); + private static final Map NO_LOST_STORAGE = ImmutableMap.of(); + public static final Map> LOST_STORAGE = + ImmutableMap.of("MEM", ImmutableList.of()); + public static final List EMPTY_CONFIG = ImmutableList.of(); + public static final int BATCH_SIZE = 1000; + + public static final BuildVersion OLD_VERSION = BuildVersion.newBuilder().setVersion("1.0.0") + .setRevision("foobar").build(); + public static final BuildVersion NEW_VERSION = BuildVersion.newBuilder().setVersion("1.1.0") + .setRevision("foobaz").build(); + + private BlockMaster mBlockMaster; + private MasterRegistry mRegistry; + private ManualClock mClock; + private ExecutorService mExecutorService; + private MetricsMaster mMetricsMaster; + private List mMetrics; + + /** Rule to create a new temporary folder during each test. */ + @Rule + public TemporaryFolder mTestFolder = new TemporaryFolder(); + + /** The exception expected to be thrown. */ + @Rule + public ExpectedException mThrown = ExpectedException.none(); + + @ClassRule + public static ManuallyScheduleHeartbeat sManuallySchedule = new ManuallyScheduleHeartbeat( + HeartbeatContext.MASTER_LOST_WORKER_DETECTION); + + /** + * Sets up the dependencies before a test runs. + */ + @Before + public void before() throws Exception { + // set a large value of PropertyKey.MASTER_LOST_WORKER_DELETION_TIMEOUT_MS + // to prevent worker to be deleted after it is lost + Configuration.set(PropertyKey.MASTER_LOST_WORKER_DELETION_TIMEOUT_MS, Integer.MAX_VALUE); + Configuration.set(PropertyKey.MASTER_TIERED_STORE_GLOBAL_LEVELS, 3); + Configuration.set(PropertyKey.Template.MASTER_TIERED_STORE_GLOBAL_LEVEL_ALIAS.format(0), + Constants.MEDIUM_MEM); + Configuration.set(PropertyKey.Template.MASTER_TIERED_STORE_GLOBAL_LEVEL_ALIAS.format(1), + Constants.MEDIUM_SSD); + Configuration.set(PropertyKey.Template.MASTER_TIERED_STORE_GLOBAL_LEVEL_ALIAS.format(2), + Constants.MEDIUM_HDD); + mRegistry = new MasterRegistry(); + mMetrics = Lists.newArrayList(); + JournalSystem journalSystem = new NoopJournalSystem(); + CoreMasterContext masterContext = MasterTestUtils.testMasterContext( + new NoopJournalSystem(), null, new AlwaysPrimaryPrimarySelector() + ); + mMetricsMaster = new MetricsMasterFactory().create(mRegistry, masterContext); + mClock = new ManualClock(); + mExecutorService = + Executors.newFixedThreadPool(2, ThreadFactoryUtils.build("TestBlockMaster-%d", true)); + mBlockMaster = new DefaultBlockMaster(mMetricsMaster, masterContext, mClock, + ExecutorServiceFactories.constantExecutorServiceFactory(mExecutorService)); + mRegistry.add(BlockMaster.class, mBlockMaster); + mRegistry.start(true); + } + + /** + * Stops the master after a test ran. + */ + @After + public void after() throws Exception { + mRegistry.stop(); + } + + @Test + public void buildVersion() throws Exception { + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + + // Sequence to simulate worker upgrade and downgrade, + // with or without buildVersion in registerWorkerPOptions + BuildVersion[] buildVersions = new BuildVersion[]{ + null, + BuildVersion.newBuilder().setVersion("1.0.0") + .setRevision("foobar").build(), + BuildVersion.newBuilder().setVersion("1.1.0") + .setRevision("fizzbuzz").build(), + null, + }; + + for (BuildVersion bv : buildVersions) { + RegisterWorkerPOptions options = (bv == null) + ? RegisterWorkerPOptions.getDefaultInstance() + : RegisterWorkerPOptions.newBuilder().setBuildVersion(bv).build(); + + mBlockMaster.workerRegister(worker1, + ImmutableList.of(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 10L), + NO_BLOCKS_ON_LOCATION, + NO_LOST_STORAGE, + options); + + BuildVersion actual = mBlockMaster.getWorker(worker1).getBuildVersion(); + assertEquals(bv == null ? "" : bv.getVersion(), actual.getVersion()); + assertEquals(bv == null ? "" : bv.getRevision(), actual.getRevision()); + } + } + + @Test + public void countBytes() throws Exception { + // Register two workers + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + long worker2 = mBlockMaster.getWorkerId(NET_ADDRESS_2); + List tiers = Arrays.asList(Constants.MEDIUM_MEM, Constants.MEDIUM_SSD); + Map worker1TotalBytesOnTiers = + ImmutableMap.of(Constants.MEDIUM_MEM, 10L, Constants.MEDIUM_SSD, 20L); + Map worker2TotalBytesOnTiers = + ImmutableMap.of(Constants.MEDIUM_MEM, 1000L, Constants.MEDIUM_SSD, 2000L); + Map worker1UsedBytesOnTiers = + ImmutableMap.of(Constants.MEDIUM_MEM, 1L, Constants.MEDIUM_SSD, 2L); + Map worker2UsedBytesOnTiers = + ImmutableMap.of(Constants.MEDIUM_MEM, 100L, Constants.MEDIUM_SSD, 200L); + mBlockMaster.workerRegister(worker1, tiers, worker1TotalBytesOnTiers, worker1UsedBytesOnTiers, + NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, RegisterWorkerPOptions.getDefaultInstance()); + mBlockMaster.workerRegister(worker2, tiers, worker2TotalBytesOnTiers, worker2UsedBytesOnTiers, + NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, RegisterWorkerPOptions.getDefaultInstance()); + + // Check that byte counts are summed correctly. + assertEquals(3030, mBlockMaster.getCapacityBytes()); + assertEquals(303L, mBlockMaster.getUsedBytes()); + assertEquals(ImmutableMap.of(Constants.MEDIUM_MEM, 1010L, Constants.MEDIUM_SSD, 2020L), + mBlockMaster.getTotalBytesOnTiers()); + assertEquals(ImmutableMap.of(Constants.MEDIUM_MEM, 101L, Constants.MEDIUM_SSD, 202L), + mBlockMaster.getUsedBytesOnTiers()); + } + + @Test + public void detectLostWorker() throws Exception { + // Register a worker. + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + mBlockMaster.workerRegister(worker1, + ImmutableList.of(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 10L), + NO_BLOCKS_ON_LOCATION, + NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + // Advance the block master's clock by an hour so that worker appears lost. + mClock.setTimeMs(System.currentTimeMillis() + Constants.HOUR_MS); + + // Run the lost worker detector. + HeartbeatScheduler.execute(HeartbeatContext.MASTER_LOST_WORKER_DETECTION); + + // Make sure the worker is detected as lost. + List info = mBlockMaster.getLostWorkersInfoList(); + assertEquals(worker1, Iterables.getOnlyElement(info).getId()); + } + + @Test + public void decommissionWorker() throws Exception { + // Register a worker. + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + mBlockMaster.workerRegister(worker1, + ImmutableList.of(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 10L), + NO_BLOCKS_ON_LOCATION, + NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + // Decommission worker + DecommissionWorkerPOptions options = DecommissionWorkerPOptions.newBuilder() + .setWorkerHostname(NET_ADDRESS_1.getHost()).setWorkerWebPort(NET_ADDRESS_1.getWebPort()) + .build(); + mBlockMaster.decommissionWorker(options); + + // Make sure the worker is decommissioned. + int decommissionedCount = mBlockMaster.getDecommissionedWorkerCount(); + int liveCount = mBlockMaster.getWorkerCount(); + int lostCount = mBlockMaster.getLostWorkerCount(); + assertEquals(1, decommissionedCount); + assertEquals(0, liveCount); + assertEquals(0, lostCount); + } + + @Test + public void decommissionLostWorker() throws Exception { + // Register a worker. + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + mBlockMaster.workerRegister(worker1, + ImmutableList.of(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 10L), + NO_BLOCKS_ON_LOCATION, + NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + // Advance the block master's clock by an hour so that worker appears lost. + mClock.setTimeMs(System.currentTimeMillis() + Constants.HOUR_MS); + + // Run the lost worker detector. + HeartbeatScheduler.execute(HeartbeatContext.MASTER_LOST_WORKER_DETECTION); + + // Make sure the worker is detected as lost. + List info = mBlockMaster.getLostWorkersInfoList(); + assertEquals(worker1, Iterables.getOnlyElement(info).getId()); + + // Decommission worker + DecommissionWorkerPOptions options = DecommissionWorkerPOptions.newBuilder() + .setWorkerHostname(NET_ADDRESS_1.getHost()).setWorkerWebPort(NET_ADDRESS_1.getWebPort()) + .build(); + mBlockMaster.decommissionWorker(options); + + // Make sure the worker is decommissioned. + int decommissionedCount = mBlockMaster.getDecommissionedWorkerCount(); + int liveCount = mBlockMaster.getWorkerCount(); + int lostCount = mBlockMaster.getLostWorkerCount(); + assertEquals(1, decommissionedCount); + assertEquals(0, liveCount); + assertEquals(0, lostCount); + } + + @Test + public void decommissionCommitUpgradeRegister() throws Exception { + long workerId = mBlockMaster.getWorkerId(NET_ADDRESS_1); + RegisterWorkerPOptions options = RegisterWorkerPOptions.newBuilder() + .setBuildVersion(OLD_VERSION).build(); + mBlockMaster.workerRegister(workerId, + ImmutableList.of(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L), + NO_BLOCKS_ON_LOCATION, + NO_LOST_STORAGE, + options); + List liveWorkerInfo = mBlockMaster.getWorkerInfoList(); + List allWorkerInfo = mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, liveWorkerInfo.size()); + assertEquals(1, allWorkerInfo.size()); + WorkerInfo w = liveWorkerInfo.get(0); + assertEquals(WorkerState.LIVE.toString(), w.getState()); + assertEquals(OLD_VERSION.getVersion(), w.getVersion()); + assertEquals(OLD_VERSION.getRevision(), w.getRevision()); + + // Decommission the worker + DecommissionWorkerPOptions decomReq = DecommissionWorkerPOptions.newBuilder() + .setWorkerHostname(NET_ADDRESS_1.getHost()).setWorkerWebPort(NET_ADDRESS_1.getWebPort()) + .setCanRegisterAgain(true) + .build(); + mBlockMaster.decommissionWorker(decomReq); + List liveWorkersAfterDecom = mBlockMaster.getWorkerInfoList(); + assertEquals(0, liveWorkersAfterDecom.size()); + List allWorkersAfterDecom = + mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, allWorkersAfterDecom.size()); + WorkerInfo decomWorker = allWorkersAfterDecom.get(0); + assertEquals(WorkerState.DECOMMISSIONED.toString(), decomWorker.getState()); + assertEquals(OLD_VERSION.getVersion(), decomWorker.getVersion()); + assertEquals(OLD_VERSION.getRevision(), decomWorker.getRevision()); + + // After decommissioned, the worker can still heartbeat to the master + Map memUsage = ImmutableMap.of(Constants.MEDIUM_MEM, 0L); + alluxio.grpc.Command heartBeat = mBlockMaster.workerHeartbeat(workerId, null, memUsage, + NO_BLOCKS, NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Decommissioned, heartBeat.getCommandType()); + + // The leftover operations on the worker can still commit blocks to the master + long blockId = 1L; + long blockLength = 100L; + mBlockMaster.commitBlock(workerId, blockLength, "MEM", "MEM", blockId, blockLength); + // The block can be found on the master + BlockInfo blockInfo = mBlockMaster.getBlockInfo(blockId); + assertNotNull(blockInfo); + assertEquals(blockInfo.getLength(), blockLength); + // Although the block can successfully commit, the available locations do not include + // the decommissioned worker, so clients will not read from that worker for that block + assertEquals(0, blockInfo.getLocations().size()); + + // Heartbeat to the master again, the master does not remove the block incorrectly + Map memUsageWithBlock = ImmutableMap.of(Constants.MEDIUM_MEM, blockLength); + List memBlockList = ImmutableList.of(blockId); + Block.BlockLocation memTier = Block.BlockLocation.newBuilder() + .setTier("MEM").setMediumType("MEM").setWorkerId(workerId).build(); + alluxio.grpc.Command heartBeatAgain = mBlockMaster.workerHeartbeat(workerId, null, + memUsageWithBlock, memBlockList, ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Decommissioned, heartBeatAgain.getCommandType()); + + // The worker registers again with a higher version + RegisterWorkerPOptions upgradedWorker = RegisterWorkerPOptions.newBuilder() + .setBuildVersion(NEW_VERSION).build(); + mBlockMaster.workerRegister(workerId, + ImmutableList.of(Constants.MEDIUM_MEM), + memUsageWithBlock, + memUsageWithBlock, + ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, + upgradedWorker); + List liveWorkerAfterRestart = mBlockMaster.getWorkerInfoList(); + List allWorkerAfterRestart = + mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, liveWorkerAfterRestart.size()); + assertEquals(1, allWorkerAfterRestart.size()); + WorkerInfo restartedWorker = liveWorkerAfterRestart.get(0); + assertEquals(WorkerState.LIVE.toString(), restartedWorker.getState()); + assertEquals(NEW_VERSION.getVersion(), restartedWorker.getVersion()); + assertEquals(NEW_VERSION.getRevision(), restartedWorker.getRevision()); + MasterWorkerInfo upgradedWorkerInfo = mBlockMaster.getWorker(workerId); + assertEquals(1, upgradedWorkerInfo.getBlockCount()); + BlockInfo blockInfoCheckAgain = mBlockMaster.getBlockInfo(blockId); + assertNotNull(blockInfoCheckAgain); + assertEquals(blockInfoCheckAgain.getLength(), blockLength); + // The block can be found on the decommissioned worker once the worker registers + // again after the upgrade + assertEquals(1, blockInfoCheckAgain.getLocations().size()); + BlockLocation locCheckAgain = blockInfoCheckAgain.getLocations().get(0); + assertEquals(workerId, locCheckAgain.getWorkerId()); + + // Heartbeat to the master again, the master does not remove the block incorrectly + alluxio.grpc.Command heartBeatAfterUpgrade = mBlockMaster.workerHeartbeat(workerId, null, + memUsageWithBlock, memBlockList, ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Nothing, heartBeatAfterUpgrade.getCommandType()); + } + + @Test + public void decommissionCommitUpgradeStreamRegister() throws Exception { + long workerId = mBlockMaster.getWorkerId(NET_ADDRESS_1); + BlockMasterWorkerServiceHandler handler = new BlockMasterWorkerServiceHandler(mBlockMaster); + Queue errors = + streamRegisterWorkerWithVersion(handler, workerId, 0L, ImmutableList.of(), OLD_VERSION); + assertEquals(0, errors.size()); + + List liveWorkerInfo = mBlockMaster.getWorkerInfoList(); + List allWorkerInfo = mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, liveWorkerInfo.size()); + assertEquals(1, allWorkerInfo.size()); + WorkerInfo w = liveWorkerInfo.get(0); + assertEquals(WorkerState.LIVE.toString(), w.getState()); + assertEquals(OLD_VERSION.getVersion(), w.getVersion()); + assertEquals(OLD_VERSION.getRevision(), w.getRevision()); + + // Decommission the worker + DecommissionWorkerPOptions decomReq = DecommissionWorkerPOptions.newBuilder() + .setWorkerHostname(NET_ADDRESS_1.getHost()).setWorkerWebPort(NET_ADDRESS_1.getWebPort()) + .setCanRegisterAgain(true) + .build(); + mBlockMaster.decommissionWorker(decomReq); + List liveWorkersAfterDecom = mBlockMaster.getWorkerInfoList(); + assertEquals(0, liveWorkersAfterDecom.size()); + List allWorkersAfterDecom = + mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, allWorkersAfterDecom.size()); + WorkerInfo decomWorker = allWorkersAfterDecom.get(0); + assertEquals(WorkerState.DECOMMISSIONED.toString(), decomWorker.getState()); + assertEquals(OLD_VERSION.getVersion(), decomWorker.getVersion()); + assertEquals(OLD_VERSION.getRevision(), decomWorker.getRevision()); + + // After decommissioned, the worker can still heartbeat to the master + Map memUsage = ImmutableMap.of(Constants.MEDIUM_MEM, 0L); + alluxio.grpc.Command heartBeat = mBlockMaster.workerHeartbeat(workerId, null, memUsage, + NO_BLOCKS, NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Decommissioned, heartBeat.getCommandType()); + + // The leftover operations on the worker can still commit blocks to the master + long blockId = 1L; + long blockLength = 100L; + mBlockMaster.commitBlock(workerId, blockLength, "MEM", "MEM", blockId, blockLength); + // The block can be found on the master + BlockInfo blockInfo = mBlockMaster.getBlockInfo(blockId); + assertNotNull(blockInfo); + assertEquals(blockInfo.getLength(), blockLength); + // Although the block can successfully commit, the available locations do not include + // the decommissioned worker, so clients will not read from that worker for that block + assertEquals(0, blockInfo.getLocations().size()); + + // Heartbeat to the master again, the master does not remove the block incorrectly + Map memUsageWithBlock = ImmutableMap.of(Constants.MEDIUM_MEM, blockLength); + List memBlockList = ImmutableList.of(blockId); + Block.BlockLocation memTier = Block.BlockLocation.newBuilder() + .setTier("MEM").setMediumType("MEM").setWorkerId(workerId).build(); + alluxio.grpc.Command heartBeatAgain = mBlockMaster.workerHeartbeat(workerId, null, + memUsageWithBlock, memBlockList, ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Decommissioned, heartBeatAgain.getCommandType()); + + // The worker registers again with a higher version + errors = streamRegisterWorkerWithVersion(handler, workerId, blockLength, + ImmutableList.of(blockId), NEW_VERSION); + assertEquals(0, errors.size()); + List liveWorkerAfterRestart = mBlockMaster.getWorkerInfoList(); + List allWorkerAfterRestart = + mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, liveWorkerAfterRestart.size()); + assertEquals(1, allWorkerAfterRestart.size()); + WorkerInfo restartedWorker = liveWorkerAfterRestart.get(0); + assertEquals(WorkerState.LIVE.toString(), restartedWorker.getState()); + assertEquals(NEW_VERSION.getVersion(), restartedWorker.getVersion()); + assertEquals(NEW_VERSION.getRevision(), restartedWorker.getRevision()); + MasterWorkerInfo upgradedWorkerInfo = mBlockMaster.getWorker(workerId); + assertEquals(1, upgradedWorkerInfo.getBlockCount()); + BlockInfo blockInfoCheckAgain = mBlockMaster.getBlockInfo(blockId); + assertNotNull(blockInfoCheckAgain); + assertEquals(blockInfoCheckAgain.getLength(), blockLength); + // The block can be found on the decommissioned worker once the worker registers + // again after the upgrade + assertEquals(1, blockInfoCheckAgain.getLocations().size()); + BlockLocation locCheckAgain = blockInfoCheckAgain.getLocations().get(0); + assertEquals(workerId, locCheckAgain.getWorkerId()); + + // Heartbeat to the master again, the master does not remove the block incorrectly + alluxio.grpc.Command heartBeatAfterUpgrade = mBlockMaster.workerHeartbeat(workerId, null, + memUsageWithBlock, memBlockList, ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Nothing, heartBeatAfterUpgrade.getCommandType()); + } + + @Test + public void decommissionRemoveUpgradeStreamRegister() throws Exception { + long workerId = mBlockMaster.getWorkerId(NET_ADDRESS_1); + BlockMasterWorkerServiceHandler handler = new BlockMasterWorkerServiceHandler(mBlockMaster); + + // Sequence to simulate worker upgrade and downgrade, + // with or without buildVersion in registerWorkerPOptions + Queue errors = streamRegisterWorkerWithVersion(handler, workerId, 0L, + ImmutableList.of(), OLD_VERSION); + assertEquals(0, errors.size()); + List liveWorkerInfo = mBlockMaster.getWorkerInfoList(); + List allWorkerInfo = mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, liveWorkerInfo.size()); + assertEquals(1, allWorkerInfo.size()); + WorkerInfo w = liveWorkerInfo.get(0); + assertEquals(WorkerState.LIVE.toString(), w.getState()); + assertEquals(OLD_VERSION.getVersion(), w.getVersion()); + assertEquals(OLD_VERSION.getRevision(), w.getRevision()); + + // Prepare a block for removal + long blockId = 1L; + long blockLength = 100L; + mBlockMaster.commitBlock(workerId, blockLength, "MEM", "MEM", blockId, blockLength); + + // Decommission the worker + DecommissionWorkerPOptions decomReq = DecommissionWorkerPOptions.newBuilder() + .setWorkerHostname(NET_ADDRESS_1.getHost()).setWorkerWebPort(NET_ADDRESS_1.getWebPort()) + .setCanRegisterAgain(true) + .build(); + mBlockMaster.decommissionWorker(decomReq); + List liveWorkersAfterDecom = mBlockMaster.getWorkerInfoList(); + assertEquals(0, liveWorkersAfterDecom.size()); + List allWorkersAfterDecom = + mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, allWorkersAfterDecom.size()); + WorkerInfo decomWorker = allWorkersAfterDecom.get(0); + assertEquals(WorkerState.DECOMMISSIONED.toString(), decomWorker.getState()); + assertEquals(OLD_VERSION.getVersion(), decomWorker.getVersion()); + assertEquals(OLD_VERSION.getRevision(), decomWorker.getRevision()); + + // After decommissioned, the worker can still heartbeat to the master + Map memUsage = ImmutableMap.of(Constants.MEDIUM_MEM, 0L); + alluxio.grpc.Command heartBeat = mBlockMaster.workerHeartbeat(workerId, null, memUsage, + NO_BLOCKS, NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Decommissioned, heartBeat.getCommandType()); + + // Remove the block from the master and workers + mBlockMaster.removeBlocks(ImmutableList.of(blockId), true); + Exception e = assertThrows(BlockInfoException.class, () -> { + BlockInfo shouldNotExist = mBlockMaster.getBlockInfo(blockId); + }); + assertTrue(e.getMessage().contains(ExceptionMessage.BLOCK_META_NOT_FOUND.getMessage(blockId))); + + // Heartbeat to the master again, the master does nothing about the block + Map memUsageWithBlock = ImmutableMap.of(Constants.MEDIUM_MEM, blockLength); + List memBlockList = ImmutableList.of(blockId); + Block.BlockLocation memTier = Block.BlockLocation.newBuilder() + .setTier("MEM").setMediumType("MEM").setWorkerId(workerId).build(); + alluxio.grpc.Command heartBeatAgain = mBlockMaster.workerHeartbeat(workerId, null, + memUsageWithBlock, memBlockList, ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Decommissioned, heartBeatAgain.getCommandType()); + + // The worker registers again with a higher version + errors = streamRegisterWorkerWithVersion(handler, workerId, blockLength, + ImmutableList.of(blockId), NEW_VERSION); + assertEquals(0, errors.size()); + List liveWorkerAfterRestart = mBlockMaster.getWorkerInfoList(); + List allWorkerAfterRestart = + mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, liveWorkerAfterRestart.size()); + assertEquals(1, allWorkerAfterRestart.size()); + WorkerInfo restartedWorker = liveWorkerAfterRestart.get(0); + assertEquals(WorkerState.LIVE.toString(), restartedWorker.getState()); + assertEquals(NEW_VERSION.getVersion(), restartedWorker.getVersion()); + assertEquals(NEW_VERSION.getRevision(), restartedWorker.getRevision()); + MasterWorkerInfo upgradedWorkerInfo = mBlockMaster.getWorker(workerId); + // The block should not be recognized and therefore the master will want to remove that block + assertEquals(0, upgradedWorkerInfo.getBlockCount()); + assertEquals(1, upgradedWorkerInfo.getToRemoveBlockCount()); + + // Heartbeat to the master again, the master does not remove the block incorrectly + alluxio.grpc.Command heartBeatAfterUpgrade = mBlockMaster.workerHeartbeat(workerId, null, + memUsageWithBlock, memBlockList, ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Free, heartBeatAfterUpgrade.getCommandType()); + assertEquals(ImmutableList.of(blockId), heartBeatAfterUpgrade.getDataList()); + } + + @Test + public void decommissionRemoveUpgradeRegister() throws Exception { + long workerId = mBlockMaster.getWorkerId(NET_ADDRESS_1); + + // Sequence to simulate worker upgrade and downgrade, + // with or without buildVersion in registerWorkerPOptions + RegisterWorkerPOptions options = RegisterWorkerPOptions.newBuilder() + .setBuildVersion(OLD_VERSION).build(); + + mBlockMaster.workerRegister(workerId, + ImmutableList.of(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L), + NO_BLOCKS_ON_LOCATION, + NO_LOST_STORAGE, + options); + List liveWorkerInfo = mBlockMaster.getWorkerInfoList(); + List allWorkerInfo = mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, liveWorkerInfo.size()); + assertEquals(1, allWorkerInfo.size()); + WorkerInfo w = liveWorkerInfo.get(0); + assertEquals(WorkerState.LIVE.toString(), w.getState()); + assertEquals(OLD_VERSION.getVersion(), w.getVersion()); + assertEquals(OLD_VERSION.getRevision(), w.getRevision()); + + // Prepare a block for removal + long blockId = 1L; + long blockLength = 100L; + mBlockMaster.commitBlock(workerId, blockLength, "MEM", "MEM", blockId, blockLength); + + // Decommission the worker + DecommissionWorkerPOptions decomReq = DecommissionWorkerPOptions.newBuilder() + .setWorkerHostname(NET_ADDRESS_1.getHost()).setWorkerWebPort(NET_ADDRESS_1.getWebPort()) + .setCanRegisterAgain(true) + .build(); + mBlockMaster.decommissionWorker(decomReq); + List liveWorkersAfterDecom = mBlockMaster.getWorkerInfoList(); + assertEquals(0, liveWorkersAfterDecom.size()); + List allWorkersAfterDecom = + mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, allWorkersAfterDecom.size()); + WorkerInfo decomWorker = allWorkersAfterDecom.get(0); + assertEquals(WorkerState.DECOMMISSIONED.toString(), decomWorker.getState()); + assertEquals(OLD_VERSION.getVersion(), decomWorker.getVersion()); + assertEquals(OLD_VERSION.getRevision(), decomWorker.getRevision()); + + // After decommissioned, the worker can still heartbeat to the master + Map memUsage = ImmutableMap.of(Constants.MEDIUM_MEM, 0L); + alluxio.grpc.Command heartBeat = mBlockMaster.workerHeartbeat(workerId, null, memUsage, + NO_BLOCKS, NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Decommissioned, heartBeat.getCommandType()); + + // Remove the block from the master and workers + mBlockMaster.removeBlocks(ImmutableList.of(blockId), true); + Exception e = assertThrows(BlockInfoException.class, () -> { + BlockInfo shouldNotExist = mBlockMaster.getBlockInfo(blockId); + }); + assertTrue(e.getMessage().contains(ExceptionMessage.BLOCK_META_NOT_FOUND.getMessage(blockId))); + + // Heartbeat to the master again, the master does nothing about the block + Map memUsageWithBlock = ImmutableMap.of(Constants.MEDIUM_MEM, blockLength); + List memBlockList = ImmutableList.of(blockId); + Block.BlockLocation memTier = Block.BlockLocation.newBuilder() + .setTier("MEM").setMediumType("MEM").setWorkerId(workerId).build(); + alluxio.grpc.Command heartBeatAgain = mBlockMaster.workerHeartbeat(workerId, null, + memUsageWithBlock, memBlockList, ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Decommissioned, heartBeatAgain.getCommandType()); + + // The worker registers again with a higher version + RegisterWorkerPOptions upgradedWorker = RegisterWorkerPOptions.newBuilder() + .setBuildVersion(NEW_VERSION).build(); + mBlockMaster.workerRegister(workerId, + ImmutableList.of(Constants.MEDIUM_MEM), + memUsageWithBlock, + memUsageWithBlock, + ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, + upgradedWorker); + List liveWorkerAfterRestart = mBlockMaster.getWorkerInfoList(); + List allWorkerAfterRestart = + mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(1, liveWorkerAfterRestart.size()); + assertEquals(1, allWorkerAfterRestart.size()); + WorkerInfo restartedWorker = liveWorkerAfterRestart.get(0); + assertEquals(WorkerState.LIVE.toString(), restartedWorker.getState()); + assertEquals(NEW_VERSION.getVersion(), restartedWorker.getVersion()); + assertEquals(NEW_VERSION.getRevision(), restartedWorker.getRevision()); + MasterWorkerInfo upgradedWorkerInfo = mBlockMaster.getWorker(workerId); + // The block should not be recognized and therefore the master will want to remove that block + assertEquals(0, upgradedWorkerInfo.getBlockCount()); + assertEquals(1, upgradedWorkerInfo.getToRemoveBlockCount()); + + // Heartbeat to the master again, the master does not remove the block incorrectly + alluxio.grpc.Command heartBeatAfterUpgrade = mBlockMaster.workerHeartbeat(workerId, null, + memUsageWithBlock, memBlockList, ImmutableMap.of(memTier, memBlockList), + NO_LOST_STORAGE, mMetrics); + assertEquals(CommandType.Free, heartBeatAfterUpgrade.getCommandType()); + assertEquals(ImmutableList.of(blockId), heartBeatAfterUpgrade.getDataList()); + } + + public static Queue streamRegisterWorkerWithVersion( + BlockMasterWorkerServiceHandler handler, + long workerId, long blockSize, List blockList, BuildVersion version) { + List requests = generateRegisterStreamForWorkerWithVersion( + workerId, blockSize, blockList, version); + Queue errorQueue = new ConcurrentLinkedQueue<>(); + sendStreamToMaster(handler, requests, getErrorCapturingResponseObserver(errorQueue)); + return errorQueue; + } + + public static List generateRegisterStreamForWorkerWithVersion( + long workerId, long blockSize, List blockList, BuildVersion version) { + Map> blockMap = new HashMap<>(); + BlockStoreLocation mem = new BlockStoreLocation("MEM", 0, "MEM"); + blockMap.put(mem, blockList); + + // We just use the RegisterStreamer to generate the batch of requests + RegisterStreamer registerStreamer = new RegisterStreamer(null, + workerId, ImmutableList.of("MEM"), + ImmutableMap.of("MEM", CAPACITY), // capacity + ImmutableMap.of("MEM", blockSize * blockList.size()), // usage + blockMap, LOST_STORAGE, EMPTY_CONFIG, version); + + // Get chunks from the RegisterStreamer + return ImmutableList.copyOf(registerStreamer); + } + + public static StreamObserver getErrorCapturingResponseObserver( + Queue errorQueue) { + return new StreamObserver() { + @Override + public void onNext(RegisterWorkerPResponse response) {} + + @Override + public void onError(Throwable t) { + errorQueue.offer(t); + } + + @Override + public void onCompleted() {} + }; + } + + public static void sendStreamToMaster(BlockMasterWorkerServiceHandler handler, + List requestChunks, + StreamObserver responseObserver) { + StreamObserver requestObserver = + handler.registerWorkerStream(responseObserver); + for (RegisterWorkerPRequest chunk : requestChunks) { + requestObserver.onNext(chunk); + } + requestObserver.onCompleted(); + } + + @Test + public void streamRegDecommissionUpgradeStreamReg() throws Exception { + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + + // Sequence to simulate worker upgrade and downgrade, + // with or without buildVersion in registerWorkerPOptions + BuildVersion oldVersion = BuildVersion.newBuilder().setVersion("1.0.0") + .setRevision("abc").build(); + BuildVersion newVersion = BuildVersion.newBuilder().setVersion("1.1.0") + .setRevision("def").build(); + + BlockMasterWorkerServiceHandler handler = new BlockMasterWorkerServiceHandler(mBlockMaster); + Queue errors = streamRegisterWorkerWithVersion(handler, worker1, 64 * Constants.MB, + ImmutableList.of(), oldVersion); + assertEquals(0, errors.size()); + + List availableWorkerList = mBlockMaster.getWorkerInfoList(); + assertEquals(1, availableWorkerList.size()); + assertEquals(1, mBlockMaster.getWorkerCount()); + assertEquals(0, mBlockMaster.getLostWorkerCount()); + assertEquals(0, mBlockMaster.getDecommissionedWorkerCount()); + assertEquals(oldVersion.getVersion(), availableWorkerList.get(0).getVersion()); + assertEquals(oldVersion.getRevision(), availableWorkerList.get(0).getRevision()); + + // Decommission the worker + DecommissionWorkerPOptions decomReq = DecommissionWorkerPOptions.newBuilder() + .setWorkerHostname(NET_ADDRESS_1.getHost()).setWorkerWebPort(NET_ADDRESS_1.getWebPort()) + .setCanRegisterAgain(true) + .build(); + mBlockMaster.decommissionWorker(decomReq); + assertEquals(0, mBlockMaster.getWorkerCount()); + assertEquals(0, mBlockMaster.getLostWorkerCount()); + assertEquals(1, mBlockMaster.getDecommissionedWorkerCount()); + List workerReport = mBlockMaster.getWorkerReport(createGetWorkerReportOptions()); + assertEquals(oldVersion.getVersion(), workerReport.get(0).getVersion()); + assertEquals(oldVersion.getRevision(), workerReport.get(0).getRevision()); + + // Worker is restarted with a newer version + errors = streamRegisterWorkerWithVersion(handler, worker1, 64 * Constants.MB, + ImmutableList.of(), newVersion); + assertEquals(0, errors.size()); + assertEquals(1, mBlockMaster.getWorkerCount()); + assertEquals(0, mBlockMaster.getLostWorkerCount()); + assertEquals(0, mBlockMaster.getDecommissionedWorkerCount()); + List availableWorkerListNow = mBlockMaster.getWorkerInfoList(); + assertEquals(newVersion.getVersion(), availableWorkerListNow.get(0).getVersion()); + assertEquals(newVersion.getRevision(), availableWorkerListNow.get(0).getRevision()); + } + + private GetWorkerReportOptions createGetWorkerReportOptions() { + GetWorkerReportOptions getReportOptions = GetWorkerReportOptions.defaults(); + getReportOptions.setFieldRange(GetWorkerReportOptions.WorkerInfoField.ALL); + getReportOptions.setWorkerRange(GetWorkerReportOptions.WorkerRange.ALL); + return getReportOptions; + } + + @Test + public void autoDeleteTimeoutWorker() throws Exception { + + // In default configuration the lost worker will never be deleted. So set a short timeout + Configuration.set(PropertyKey.MASTER_LOST_WORKER_DELETION_TIMEOUT_MS, 1000); + // Register a worker. + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + mBlockMaster.workerRegister(worker1, + ImmutableList.of(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 10L), + NO_BLOCKS_ON_LOCATION, + NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + // Advance the block master's clock by an hour so that worker can be deleted. + mClock.setTimeMs(System.currentTimeMillis() + Constants.HOUR_MS); + + // Run the lost worker detector. + HeartbeatScheduler.execute(HeartbeatContext.MASTER_LOST_WORKER_DETECTION); + + // Make sure the worker has been deleted. + List info = mBlockMaster.getLostWorkersInfoList(); + assertEquals(0, mBlockMaster.getLostWorkersInfoList().size()); + assertThrows(NotFoundException.class, () -> mBlockMaster.getWorker(worker1)); + assertEquals(0, mBlockMaster.getWorkerCount()); + } + + @Test + public void workerReregisterRemembersLostWorker() throws Exception { + // Register a worker. + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + mBlockMaster.workerRegister(worker1, + ImmutableList.of(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 10L), + NO_BLOCKS_ON_LOCATION, + NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + // Advance the block master's clock by an hour so that the worker appears lost. + mClock.setTimeMs(System.currentTimeMillis() + Constants.HOUR_MS); + + // Run the lost worker detector. + HeartbeatScheduler.execute(HeartbeatContext.MASTER_LOST_WORKER_DETECTION); + + // Reregister the worker using its original worker id. + mBlockMaster.getWorkerId(NET_ADDRESS_1); + mBlockMaster.workerRegister(worker1, + ImmutableList.of(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 10L), + NO_BLOCKS_ON_LOCATION, + NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + // Check that there are no longer any lost workers and there is a live worker. + assertEquals(1, mBlockMaster.getWorkerCount()); + assertEquals(0, mBlockMaster.getLostWorkersInfoList().size()); + } + + @Test + public void removeBlockTellsWorkersToRemoveTheBlock() throws Exception { + // Create a worker with a block. + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + long blockId = 1L; + mBlockMaster.workerRegister(worker1, Arrays.asList(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L), NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + mBlockMaster.commitBlock(worker1, 50L, + Constants.MEDIUM_MEM, Constants.MEDIUM_MEM, blockId, 20L); + + // Remove the block + mBlockMaster.removeBlocks(Arrays.asList(1L), /*delete=*/false); + + // Check that the worker heartbeat tells the worker to remove the block. + Map memUsage = ImmutableMap.of(Constants.MEDIUM_MEM, 0L); + alluxio.grpc.Command heartBeat = mBlockMaster.workerHeartbeat(worker1, null, memUsage, + NO_BLOCKS, NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, mMetrics); + assertEquals(ImmutableList.of(1L), heartBeat.getDataList()); + } + + @Test + public void registerCleansUpOrphanedBlocks() throws Exception { + // Create a worker with unknown blocks. + long workerId = mBlockMaster.getWorkerId(NET_ADDRESS_1); + List orphanedBlocks = Arrays.asList(1L, 2L); + Map memUsage = ImmutableMap.of(Constants.MEDIUM_MEM, 10L); + + Block.BlockLocation blockLoc = Block.BlockLocation.newBuilder() + .setWorkerId(workerId).setTier(Constants.MEDIUM_MEM) + .setMediumType(Constants.MEDIUM_MEM).build(); + mBlockMaster.workerRegister(workerId, Arrays.asList(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + memUsage, ImmutableMap.of(blockLoc, orphanedBlocks), NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + // Check that the worker heartbeat tells the worker to remove the blocks. + alluxio.grpc.Command heartBeat = mBlockMaster.workerHeartbeat(workerId, null, + memUsage, NO_BLOCKS, NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, mMetrics); + assertEquals(orphanedBlocks, + heartBeat.getDataList().stream().sorted().collect(Collectors.toList())); + } + + @Test + public void workerHeartbeatUpdatesMemoryCount() throws Exception { + // Create a worker. + long worker = mBlockMaster.getWorkerId(NET_ADDRESS_1); + Map initialUsedBytesOnTiers = ImmutableMap.of(Constants.MEDIUM_MEM, 50L); + mBlockMaster.workerRegister(worker, Arrays.asList(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + initialUsedBytesOnTiers, NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + // Update used bytes with a worker heartbeat. + Map newUsedBytesOnTiers = ImmutableMap.of(Constants.MEDIUM_MEM, 50L); + mBlockMaster.workerHeartbeat(worker, null, newUsedBytesOnTiers, + NO_BLOCKS, NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, mMetrics); + + WorkerInfo workerInfo = Iterables.getOnlyElement(mBlockMaster.getWorkerInfoList()); + assertEquals(50, workerInfo.getUsedBytes()); + } + + @Test + public void workerHeartbeatUpdatesRemovedBlocks() throws Exception { + // Create a worker. + long worker = mBlockMaster.getWorkerId(NET_ADDRESS_1); + mBlockMaster.workerRegister(worker, Arrays.asList(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L), NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + long blockId = 1L; + mBlockMaster.commitBlock(worker, 50L, Constants.MEDIUM_MEM, + Constants.MEDIUM_MEM, blockId, 20L); + + // Indicate that blockId is removed on the worker. + mBlockMaster.workerHeartbeat(worker, null, + ImmutableMap.of(Constants.MEDIUM_MEM, 0L), + ImmutableList.of(blockId), NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, mMetrics); + assertTrue(mBlockMaster.getBlockInfo(blockId).getLocations().isEmpty()); + } + + @Test + public void workerHeartbeatUpdatesAddedBlocks() throws Exception { + // Create two workers. + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + mBlockMaster.workerRegister(worker1, Arrays.asList(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L), NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + long worker2 = mBlockMaster.getWorkerId(NET_ADDRESS_2); + mBlockMaster.workerRegister(worker2, Arrays.asList(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L), NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + // Commit blockId to worker1. + long blockId = 1L; + mBlockMaster.commitBlock(worker1, 50L, Constants.MEDIUM_MEM, + Constants.MEDIUM_MEM, blockId, 20L); + + // Send a heartbeat from worker2 saying that it's added blockId. + List addedBlocks = ImmutableList.of(blockId); + Block.BlockLocation blockOnWorker2 = Block.BlockLocation.newBuilder() + .setWorkerId(worker2).setTier(Constants.MEDIUM_MEM) + .setMediumType(Constants.MEDIUM_MEM).build(); + mBlockMaster.workerHeartbeat(worker2, null, + ImmutableMap.of(Constants.MEDIUM_MEM, 0L), NO_BLOCKS, + ImmutableMap.of(blockOnWorker2, addedBlocks), + NO_LOST_STORAGE, mMetrics); + + // The block now has two locations. + assertEquals(2, mBlockMaster.getBlockInfo(blockId).getLocations().size()); + } + + @Test + public void workerHeartbeatUpdatesLostStorage() throws Exception { + // Create two workers. + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + mBlockMaster.workerRegister(worker1, Arrays.asList(Constants.MEDIUM_MEM, Constants.MEDIUM_SSD), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L, Constants.MEDIUM_SSD, 200L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L, Constants.MEDIUM_SSD, 0L), + NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + long worker2 = mBlockMaster.getWorkerId(NET_ADDRESS_2); + mBlockMaster.workerRegister(worker2, Arrays.asList(Constants.MEDIUM_MEM, Constants.MEDIUM_HDD), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L, Constants.MEDIUM_HDD, 300L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L, Constants.MEDIUM_HDD, 0L), + NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + + Map lostStorageOnWorker1 = new HashMap<>(); + lostStorageOnWorker1.put(Constants.MEDIUM_SSD, StorageList.newBuilder() + .addAllStorage(Arrays.asList("/ssd/one", "/ssd/two")).build()); + Map lostStorageOnWorker2 = new HashMap<>(); + lostStorageOnWorker2.put(Constants.MEDIUM_HDD, + StorageList.newBuilder().addStorage("/hdd/one").build()); + + mBlockMaster.workerHeartbeat(worker1, + ImmutableMap.of(Constants.MEDIUM_MEM, 100L, Constants.MEDIUM_SSD, 0L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L, Constants.MEDIUM_SSD, 0L), NO_BLOCKS, + NO_BLOCKS_ON_LOCATION, lostStorageOnWorker1, mMetrics); + mBlockMaster.workerHeartbeat(worker2, + ImmutableMap.of(Constants.MEDIUM_MEM, 100L, Constants.MEDIUM_HDD, 200L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L, Constants.MEDIUM_HDD, 0L), NO_BLOCKS, + NO_BLOCKS_ON_LOCATION, lostStorageOnWorker2, mMetrics); + + // Two workers have lost storage paths + assertEquals(2, mBlockMaster.getWorkerLostStorage().size()); + int lostStorageNum = 0; + for (WorkerLostStorageInfo info : mBlockMaster.getWorkerLostStorage()) { + for (StorageList list : info.getLostStorageMap().values()) { + lostStorageNum += list.getStorageList().size(); + } + } + assertEquals(3, lostStorageNum); + } + + @Test + public void unknownWorkerHeartbeatTriggersRegisterRequest() { + Command heartBeat = mBlockMaster.workerHeartbeat(0, null, null, null, null, null, mMetrics); + assertEquals(Command.newBuilder().setCommandType(CommandType.Register).build(), heartBeat); + } + + @Test + public void stopTerminatesExecutorService() throws Exception { + mBlockMaster.stop(); + assertTrue(mExecutorService.isTerminated()); + } + + @Test + public void getBlockInfo() throws Exception { + // Create a worker with a block. + long worker1 = mBlockMaster.getWorkerId(NET_ADDRESS_1); + long blockId = 1L; + long blockLength = 20L; + mBlockMaster.workerRegister(worker1, Arrays.asList(Constants.MEDIUM_MEM), + ImmutableMap.of(Constants.MEDIUM_MEM, 100L), + ImmutableMap.of(Constants.MEDIUM_MEM, 0L), NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, + RegisterWorkerPOptions.getDefaultInstance()); + mBlockMaster.commitBlock(worker1, 50L, Constants.MEDIUM_MEM, + Constants.MEDIUM_MEM, blockId, blockLength); + + BlockLocation blockLocation = new BlockLocation() + .setTierAlias(Constants.MEDIUM_MEM) + .setWorkerAddress(NET_ADDRESS_1) + .setWorkerId(worker1) + .setMediumType(Constants.MEDIUM_MEM); + BlockInfo expectedBlockInfo = new BlockInfo() + .setBlockId(1L) + .setLength(20L) + .setLocations(ImmutableList.of(blockLocation)); + assertEquals(expectedBlockInfo, mBlockMaster.getBlockInfo(blockId)); + } + + @Test + public void getNewContainerId() throws Exception { + final int total = 10_000; + long containerIdReservationSize = Configuration.getInt( + PropertyKey.MASTER_CONTAINER_ID_RESERVATION_SIZE); + + final int parallelNum = 5; + ExecutorService containerIdFetcher = Executors.newFixedThreadPool(parallelNum); + AtomicInteger times = new AtomicInteger(0); + assertEquals(0L, mBlockMaster.getNewContainerId()); + + CyclicBarrier barrier = new CyclicBarrier(parallelNum); + + for (int count = 0; count < parallelNum; count++) { + containerIdFetcher.submit(() -> { + try { + barrier.await(); + while (times.incrementAndGet() < total) { + mBlockMaster.getNewContainerId(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + containerIdFetcher.shutdown(); + containerIdFetcher.awaitTermination(6, TimeUnit.SECONDS); + mBlockMaster.close(); + + long journaledNextContainerId = mBlockMaster.getJournaledNextContainerId(); + + assertTrue(journaledNextContainerId >= total); + assertTrue(journaledNextContainerId <= total + containerIdReservationSize); + } + + @Test + public void stop() throws Exception { + mRegistry.stop(); + assertTrue(mExecutorService.isShutdown()); + assertTrue(mExecutorService.isTerminated()); + } +} diff --git a/dora/tests/pom.xml b/dora/tests/pom.xml index ec7e3ee8c50e..e8bc82d1e101 100644 --- a/dora/tests/pom.xml +++ b/dora/tests/pom.xml @@ -209,13 +209,6 @@ ${project.version} test - - org.alluxio - alluxio-job-server - ${project.version} - test-jar - test - org.alluxio alluxio-underfs-hdfs