From ce746964be023ad11563a32c8a6823e20f7c2907 Mon Sep 17 00:00:00 2001 From: elega <445092967@qq.com> Date: Wed, 4 Sep 2024 23:22:24 +0800 Subject: [PATCH] update --- .../stream/DefaultBlockWorkerClient.java | 54 ++++- .../main/java/alluxio/conf/PropertyKey.java | 32 +++ .../databuffer/NioHeapBufferPool.java | 2 +- .../worker/block/DefaultBlockWorker.java | 33 ++- .../cli/fs/command/CRC64CheckCommand.java | 112 +-------- .../fs/command/CRC64CheckCommandUtils.java | 212 ++++++++++++++++++ 6 files changed, 331 insertions(+), 114 deletions(-) create mode 100644 shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommandUtils.java diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/DefaultBlockWorkerClient.java b/core/client/fs/src/main/java/alluxio/client/block/stream/DefaultBlockWorkerClient.java index e404b4d8e987..5e7282544991 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/DefaultBlockWorkerClient.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/DefaultBlockWorkerClient.java @@ -47,9 +47,13 @@ import alluxio.retry.RetryPolicy; import alluxio.retry.RetryUtils; import alluxio.security.user.UserState; +import alluxio.util.CommonUtils; +import com.google.common.collect.Lists; import com.google.common.io.Closer; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import io.netty.util.ResourceLeakDetector; @@ -58,6 +62,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -80,6 +85,8 @@ public class DefaultBlockWorkerClient implements BlockWorkerClient { private final BlockWorkerGrpc.BlockWorkerStub mStreamingAsyncStub; private final BlockWorkerGrpc.BlockWorkerBlockingStub mRpcBlockingStub; private final BlockWorkerGrpc.BlockWorkerFutureStub mRpcFutureStub; + private final int mChecksumBatchSize; + private final int mSlowChecksumCalculationRequestThresholdMs = 12; @Nullable private final ResourceLeakTracker mTracker; @@ -129,6 +136,8 @@ public DefaultBlockWorkerClient(UserState userState, GrpcServerAddress address, mRpcFutureStub = BlockWorkerGrpc.newFutureStub(mRpcChannel); mAddress = address; mRpcTimeoutMs = alluxioConf.getMs(PropertyKey.USER_RPC_RETRY_MAX_DURATION); + mChecksumBatchSize = alluxioConf.getInt( + PropertyKey.USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE); mTracker = DETECTOR.track(this); } @@ -254,6 +263,49 @@ public ListenableFuture load(LoadRequest request) { @Override public ListenableFuture getBlockChecksum( GetBlockChecksumRequest request) { - return mRpcFutureStub.getBlockChecksum(request); + // Non-batched mode + if (request.getBlockIdsCount() < mChecksumBatchSize) { + long startTs = CommonUtils.getCurrentMs(); + ListenableFuture future = mRpcFutureStub.getBlockChecksum(request); + future.addListener(()->{ + long timeElapsed = CommonUtils.getCurrentMs() - startTs; + if (timeElapsed > mSlowChecksumCalculationRequestThresholdMs) { + LOG.warn( + "Slow checksum calculation RPC for {} blocks, address {}, time elapsed {}ms ", + request.getBlockIdsCount(), mAddress, timeElapsed); + } + }, MoreExecutors.directExecutor()); + return future; + } + + // Batched mode + GetBlockChecksumResponse.Builder responseBuilder = GetBlockChecksumResponse.newBuilder(); + ListenableFuture chainedCalls = + Futures.immediateFuture(GetBlockChecksumResponse.getDefaultInstance()); + List> blockIdsPerBatch = + Lists.partition(request.getBlockIdsList(), mChecksumBatchSize); + for (List blockIdsOfBatch : blockIdsPerBatch) { + chainedCalls = Futures.transformAsync(chainedCalls, (previousResult) -> { + responseBuilder.putAllChecksum(previousResult.getChecksumMap()); + GetBlockChecksumRequest requestOfBatch = + GetBlockChecksumRequest.newBuilder().addAllBlockIds(blockIdsOfBatch).build(); + ListenableFuture future = + mRpcFutureStub.getBlockChecksum(requestOfBatch); + long startTs = CommonUtils.getCurrentMs(); + future.addListener(()->{ + long timeElapsed = CommonUtils.getCurrentMs() - startTs; + if (timeElapsed > mSlowChecksumCalculationRequestThresholdMs) { + LOG.warn( + "Slow checksum calculation RPC for {} blocks, address {}, time elapsed {}ms ", + blockIdsOfBatch.size(), mAddress, timeElapsed); + } + }, MoreExecutors.directExecutor()); + return future; + }, MoreExecutors.directExecutor()); + } + return Futures.transform(chainedCalls, (lastResult) -> { + responseBuilder.putAllChecksum(lastResult.getChecksumMap()); + return responseBuilder.build(); + }, MoreExecutors.directExecutor()); } } diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index 3a1fbf1c26e5..1a13b3ebf4c5 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -4297,6 +4297,22 @@ public String toString() { .setDescription("The implementation of LocalBlockStore that can be instantiated.") .setScope(Scope.WORKER) .build(); + public static final PropertyKey WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD = + dataSizeBuilder(Name.WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD) + .setDefaultValue(-1) + .setDescription("The throughput threshold per second to trigger the rate limit. " + + "-1 means no limitation. The minimum precision is 1MB.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE) + .setScope(Scope.WORKER) + .build(); + public static final PropertyKey WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE = + intBuilder(Name.WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE) + .setDefaultValue(16) + .setDescription("The thread pool size for the worker block checksum calculation. " + + "Each thread will take up at most 8MB (the chunksize) memory") + .setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE) + .setScope(Scope.WORKER) + .build(); public static final PropertyKey WORKER_CONTAINER_HOSTNAME = stringBuilder(Name.WORKER_CONTAINER_HOSTNAME) .setDescription("The container hostname if worker is running in a container.") @@ -6411,6 +6427,16 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); + public static final PropertyKey USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE = + intBuilder(Name.USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE) + .setDefaultValue(Integer.MAX_VALUE) + .setDescription("The batch size of block ids for the checksum calculation rpc. " + + "by default all block ids are in a single batch. Reduce this value if " + + "you see too many timeouts.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE) + .setScope(Scope.CLIENT) + .build(); + public static final PropertyKey USER_FILE_WRITE_TYPE_DEFAULT = enumBuilder(Name.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class) @@ -8710,6 +8736,10 @@ public static final class Name { public static final String WORKER_BLOCK_HEARTBEAT_TIMEOUT_MS = "alluxio.worker.block.heartbeat.timeout"; public static final String WORKER_BLOCK_STORE_TYPE = "alluxio.worker.block.store.type"; + public static final String WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD = + "alluxio.worker.block.checksum.calculation.throughput.threshold"; + public static final String WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE = + "alluxio.worker.block.checksum.calculation.thread.pool.size"; public static final String WORKER_CONTAINER_HOSTNAME = "alluxio.worker.container.hostname"; public static final String WORKER_DATA_FOLDER = "alluxio.worker.data.folder"; @@ -9102,6 +9132,8 @@ public static final class Name { "alluxio.user.client.cache.include.mtime"; public static final String USER_CLIENT_REPORT_VERSION_ENABLED = "alluxio.user.client.report.version.enabled"; + public static final String USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE = + "alluxio.useer.client.checksum.calculation.batch.size"; public static final String USER_CONF_CLUSTER_DEFAULT_ENABLED = "alluxio.user.conf.cluster.default.enabled"; public static final String USER_CONF_SYNC_INTERVAL = "alluxio.user.conf.sync.interval"; diff --git a/core/common/src/main/java/alluxio/network/protocol/databuffer/NioHeapBufferPool.java b/core/common/src/main/java/alluxio/network/protocol/databuffer/NioHeapBufferPool.java index 51998318e745..599e11b76fc7 100644 --- a/core/common/src/main/java/alluxio/network/protocol/databuffer/NioHeapBufferPool.java +++ b/core/common/src/main/java/alluxio/network/protocol/databuffer/NioHeapBufferPool.java @@ -20,7 +20,7 @@ import java.util.TreeMap; /** - * Direct buffer pool. + * Heap buffer pool. */ public class NioHeapBufferPool { private static final TreeMap> BUF_POOL = new TreeMap<>(); diff --git a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java index de2795cfb99c..5e00b63b140b 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java @@ -53,6 +53,7 @@ import alluxio.retry.RetryUtils; import alluxio.security.user.ServerUserState; import alluxio.util.CRC64; +import alluxio.util.CommonUtils; import alluxio.util.executor.ExecutorServiceFactories; import alluxio.util.io.FileUtils; import alluxio.wire.FileInfo; @@ -71,6 +72,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.io.Closer; +import com.google.common.util.concurrent.RateLimiter; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; @@ -87,6 +89,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -143,6 +146,7 @@ public class DefaultBlockWorker extends AbstractWorker implements BlockWorker { protected WorkerNetAddress mAddress; private final ExecutorService mChecksumCalculationThreadPool; + private final Optional mChecksumCalculationRateLimiter ; /** * Constructs a default block worker. @@ -178,11 +182,20 @@ public DefaultBlockWorker(BlockMasterClientPool blockMasterClientPool, GrpcExecutors.CACHE_MANAGER_EXECUTOR, this, fsContext); mFuseManager = mResourceCloser.register(new FuseManager(fsContext)); mWhitelist = new PrefixList(Configuration.getList(PropertyKey.WORKER_WHITELIST)); - mChecksumCalculationThreadPool = - ExecutorServiceFactories.fixedThreadPool("checksum-calculation-pool", 16) - .create(); - ((ThreadPoolExecutor) mChecksumCalculationThreadPool) - .setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + mChecksumCalculationThreadPool = ExecutorServiceFactories.fixedThreadPool( + "checksum-calculation-pool", + Configuration.getInt(PropertyKey.WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE)) + .create(); + long checksumThroughputThreshold = + Configuration.getBytes(PropertyKey.WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD); + if (checksumThroughputThreshold <= 0) { + mChecksumCalculationRateLimiter = Optional.empty(); + } else { + // The min precision is 1kb to avoid data overflow + mChecksumCalculationRateLimiter = + Optional.of(RateLimiter.create( + Math.max(Math.toIntExact(checksumThroughputThreshold / 1024), 1))); + } Metrics.registerGauges(this); } @@ -623,8 +636,8 @@ public Map calculateBlockChecksum(List blockIds) { int chunkSize = 1024 * 1024 * 8; //8MB HashMap result = new HashMap<>(); List> futures = new ArrayList<>(); - for (long blockId: blockIds) { - Future future = mChecksumCalculationThreadPool.submit(()->{ + for (long blockId : blockIds) { + Future future = mChecksumCalculationThreadPool.submit(() -> { ByteBuffer bf = null; try { CRC64 crc64 = new CRC64(); @@ -639,6 +652,10 @@ public Map calculateBlockChecksum(List blockIds) { break; } crc64.update(bf.array(), Math.toIntExact(bytesRead)); + int permits = Math.toIntExact(Math.max(1, bytesRead / 1024)); + if (mChecksumCalculationRateLimiter.isPresent()) { + mChecksumCalculationRateLimiter.get().acquire(permits); + } } result.put(blockId, BlockChecksum.newBuilder() @@ -656,7 +673,7 @@ public Map calculateBlockChecksum(List blockIds) { }); futures.add(future); } - for (Future future: futures) { + for (Future future : futures) { try { future.get(); } catch (Exception e) { diff --git a/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommand.java b/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommand.java index e5a12f935267..ebd2a572d2c4 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommand.java @@ -1,39 +1,12 @@ package alluxio.cli.fs.command; import alluxio.AlluxioURI; -import alluxio.Constants; -import alluxio.client.block.stream.BlockWorkerClient; import alluxio.client.file.FileSystemContext; -import alluxio.client.file.URIStatus; import alluxio.exception.AlluxioException; -import alluxio.grpc.BlockChecksum; -import alluxio.grpc.BlockLocation; -import alluxio.grpc.DeletePOptions; -import alluxio.grpc.GetBlockChecksumRequest; -import alluxio.grpc.GetBlockChecksumResponse; -import alluxio.grpc.GetStatusPOptions; -import alluxio.master.block.BlockId; -import alluxio.resource.CloseableResource; -import alluxio.util.CRC64; -import alluxio.wire.BlockLocationInfo; -import alluxio.wire.WorkerNetAddress; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.Nullable; import org.apache.commons.cli.CommandLine; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import java.util.zip.Checksum; public class CRC64CheckCommand extends AbstractFileSystemCommand{ public CRC64CheckCommand( @@ -45,83 +18,13 @@ public CRC64CheckCommand( protected void runPlainPath(AlluxioURI plainPath, CommandLine cl) throws AlluxioException, IOException { /* - CRC64 aaa = new CRC64(); - aaa.update("aaa".getBytes(), 3); - System.out.println(Long.toHexString(aaa.getValue())); - if (true) { - return; - } + System.out.println("Checking " + plainPath); + long crc64 = CRC64CheckCommandUtils.checkCRC64(mFsContext, mFileSystem, plainPath); + System.out.println("CRC64 check for file " + plainPath + " succeeded. " + + "CRC64: " + crc64); */ - - URIStatus ufsStatus = mFileSystem.getStatus( - plainPath, GetStatusPOptions.newBuilder().setDirectUfsAccess(true).build()); - if (ufsStatus.getXAttr() == null || !ufsStatus.getXAttr().containsKey(Constants.CRC64_KEY)) { - System.out.println("UFS does not store crc64"); -// return; - } - - long crc64Value = 0; - URIStatus status = mFileSystem.getStatus(plainPath); - if (status.getInAlluxioPercentage() != 100) { - System.out.println("Skipping the file because the in alluxio pct isnt 100"); - return; - } - List bl = mFileSystem.getBlockLocations(status); - Map> blockIdMap = new HashMap<>(); - for (BlockLocationInfo blockLocationInfo: bl) { - for (WorkerNetAddress address: blockLocationInfo.getLocations()) { - if (!blockIdMap.containsKey(address)) { - blockIdMap.put(address, new ArrayList<>()); - } - blockIdMap.get(address).add(blockLocationInfo.getBlockInfo().getBlockInfo().getBlockId()); - } - } - Map> futures = new HashMap<>(); - for (Map.Entry> entry: blockIdMap.entrySet()) { - try (CloseableResource blockWorkerClient - = mFsContext.acquireBlockWorkerClient(entry.getKey())) { - GetBlockChecksumRequest request = - GetBlockChecksumRequest.newBuilder().addAllBlockIds(entry.getValue()).addBlockIds(114514).build(); - futures.put(entry.getKey(), blockWorkerClient.get().getBlockChecksum(request)); - } - } - Map checksumMap = new HashMap<>(); - for (Map.Entry> entry: - futures.entrySet()) { - try { - GetBlockChecksumResponse response = entry.getValue().get(); - for (Map.Entry checksumEntry: response.getChecksumMap().entrySet()) { - long blockId = checksumEntry.getKey(); - BlockChecksum checksum = checksumEntry.getValue(); - if (checksumMap.containsKey(blockId)) { - BlockChecksum checksumFromMap = checksumMap.get(blockId); - if (checksumFromMap.getBlockLength() != checksum.getBlockLength() - || !Objects.equals(checksumFromMap.getChecksum(), checksum.getChecksum())) { - throw new RuntimeException("Block replica > 1 && the checksum does not match"); - } - } - checksumMap.put(blockId, checksum); - } - } catch (Exception e) { - throw new RuntimeException("rpc call failed " + entry.getKey(), e); - } - } - for (long blockId : status.getBlockIds()) { - if (!checksumMap.containsKey(blockId)) { - throw new RuntimeException("block does not exist"); - } - BlockChecksum bcs = checksumMap.get(blockId); - crc64Value = CRC64.combine(crc64Value, Long.parseLong(bcs.getChecksum()), bcs.getBlockLength()); - } - System.out.println("CRC64 value from workers: " + Long.toHexString(crc64Value)); - long crc64ValueFromUfs = - Long.parseLong(new String(ufsStatus.getXAttr().get(Constants.CRC64_KEY))); - if (crc64Value != crc64ValueFromUfs) { - System.out.println("Mismatch, data deleted from alluxio"); - mFileSystem.delete(plainPath, DeletePOptions.newBuilder().setAlluxioOnly(true).build()); - } - System.out.println("check passed"); + System.out.println(CRC64CheckCommandUtils.calculateAlluxioCRC64(mFsContext, mFileSystem, plainPath)); } @Override @@ -141,11 +44,12 @@ public String getCommandName() { @Override public String getUsage() { - return "foobar"; + return "crc64check ..."; } @Override public String getDescription() { - return "barfoo"; + return "Does the CRC check on a given alluxio path. The UFS must support CRC64 checksum and " + + "the file must be fully cached on alluxio."; } } diff --git a/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommandUtils.java b/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommandUtils.java new file mode 100644 index 000000000000..800fe9cc468f --- /dev/null +++ b/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommandUtils.java @@ -0,0 +1,212 @@ +package alluxio.cli.fs.command; + +import alluxio.AlluxioURI; +import alluxio.Constants; +import alluxio.client.block.stream.BlockWorkerClient; +import alluxio.client.file.FileSystem; +import alluxio.client.file.FileSystemContext; +import alluxio.client.file.URIStatus; +import alluxio.exception.AlluxioException; +import alluxio.grpc.BlockChecksum; +import alluxio.grpc.DeletePOptions; +import alluxio.grpc.GetBlockChecksumRequest; +import alluxio.grpc.GetBlockChecksumResponse; +import alluxio.grpc.GetStatusPOptions; +import alluxio.master.block.BlockId; +import alluxio.resource.CloseableResource; +import alluxio.util.CRC64; +import alluxio.wire.BlockLocationInfo; +import alluxio.wire.WorkerNetAddress; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +public class CRC64CheckCommandUtils { + private static final Logger LOG = LoggerFactory.getLogger(DistributedLoadUtils.class); + + /** + * Gets the CRC64 checksum from UFS for a given path + * + * @param fileSystem the file system + * @param plainPath the file path + * @return the CRC64, if exists + */ + public static long getUfsCRC64( + FileSystem fileSystem, AlluxioURI plainPath) + throws IOException, AlluxioException { + URIStatus ufsStatus = fileSystem.getStatus( + plainPath, GetStatusPOptions.newBuilder().setDirectUfsAccess(true).build()); + if (ufsStatus.getXAttr() == null || !ufsStatus.getXAttr().containsKey(Constants.CRC64_KEY)) { + throw new UnsupportedOperationException("The ufs does not support CRC64 checksum"); + } + return Long.parseLong(new String(ufsStatus.getXAttr().get(Constants.CRC64_KEY))); + } + + /** + * Calculates the alluxio CRC64 value + * + * @param fsContext the file system context + * @param fileSystem the file system + * @param plainPath the file path + * @return the alluxio side CRC64, if the file is fully cached in alluxio and consistent + */ + public static long calculateAlluxioCRC64( + FileSystemContext fsContext, FileSystem fileSystem, AlluxioURI plainPath) + throws IOException, AlluxioException { + URIStatus status = fileSystem.getStatus(plainPath); + if (status.isFolder()) { + throw new IllegalStateException("The path is a folder"); + } + if (status.getInAlluxioPercentage() != 100) { + throw new IllegalStateException("The file is not cached in alluxio"); + } + List blockLocationInfoList = fileSystem.getBlockLocations(status); + Map> blockIdsOnWorkers = new HashMap<>(); + for (BlockLocationInfo blockLocationInfo : blockLocationInfoList) { + // One block can be persisted on multiple workers if passive replication is enabled, + // even if multi replica is enabled. + // If a block has multiple replications, we require all of them to have the same CRC64 value, + // but tolerant with the possible missing of some replications on workers, + // as long as at least 1 copy exists, + for (WorkerNetAddress address : blockLocationInfo.getLocations()) { + if (!blockIdsOnWorkers.containsKey(address)) { + blockIdsOnWorkers.put(address, new ArrayList<>()); + } + blockIdsOnWorkers.get(address) + .add(blockLocationInfo.getBlockInfo().getBlockInfo().getBlockId()); + } + } + + // RPC all workers that contain blocks of the file + // If a worker contains too many blocks, the RPC will be sent by batches. + // Details are in DefaultBlockWorkerClient + Map> rpcFutures = new HashMap<>(); + for (Map.Entry> entry : blockIdsOnWorkers.entrySet()) { + try (CloseableResource blockWorkerClient + = fsContext.acquireBlockWorkerClient(entry.getKey())) { + GetBlockChecksumRequest request = + GetBlockChecksumRequest.newBuilder().addAllBlockIds(entry.getValue()).build(); + rpcFutures.put(entry.getKey(), blockWorkerClient.get().getBlockChecksum(request)); + } + } + + // Collect the results; + Map checksumMap = new HashMap<>(); + for (Map.Entry> entry : + rpcFutures.entrySet()) { + try { + GetBlockChecksumResponse response = entry.getValue().get(); + for (Map.Entry checksumEntry : response.getChecksumMap().entrySet()) { + long blockId = checksumEntry.getKey(); + BlockChecksum checksum = checksumEntry.getValue(); + if (checksumMap.containsKey(blockId)) { + BlockChecksum checksumFromMap = checksumMap.get(blockId); + if (checksumFromMap.getBlockLength() != checksum.getBlockLength() + || !Objects.equals(checksumFromMap.getChecksum(), checksum.getChecksum())) { + throw new RuntimeException( + "Block " + blockId + + " have multiple replicas across the workers but their checksum does not match" + ); + } + } + checksumMap.put(blockId, checksum); + } + } catch (Exception e) { + throw new RuntimeException("Rpc call failed for worker: " + entry.getKey(), e); + } + } + + long crc64Value = 0; + // Calculate the crc64 checksum + for (long blockId : status.getBlockIds()) { + if (!checksumMap.containsKey(blockId)) { + Optional> addresses = blockLocationInfoList.stream().filter( + it -> it.getBlockInfo().getBlockInfo().getBlockId() == blockId) + .findFirst().map(BlockLocationInfo::getLocations); + if (addresses.isPresent()) { + throw new RuntimeException("Block " + blockId + " does not exist on any worker. " + + "Master indicates the block is on the following workers: " + + StringUtils.join(addresses.get(), ",")); + } + throw new RuntimeException("Block " + blockId + " does not exist and cannot be found " + + "from master block locations."); + } + BlockChecksum bcs = checksumMap.get(blockId); + crc64Value = + CRC64.combine(crc64Value, Long.parseLong(bcs.getChecksum()), bcs.getBlockLength()); + } + return crc64Value; + } + + /** + * Calculates the alluxio CRC for a given block + * + * @param fsContext the file system context + * @param blockId the block id to calculate + * @param workerNetAddresses the worker addresses to calculate the blocks + * @return a map of block checksum; If the block does not exist on the worker, it will + * not show up in the map. + */ + public static Map calculateAlluxioCRCForBlock( + FileSystemContext fsContext, long blockId, List workerNetAddresses + ) throws IOException { + Map results = new HashMap<>(); + Map> rpcFutures = new HashMap<>(); + for (WorkerNetAddress address : workerNetAddresses) { + try (CloseableResource blockWorkerClient + = fsContext.acquireBlockWorkerClient(address)) { + GetBlockChecksumRequest request = + GetBlockChecksumRequest.newBuilder().addBlockIds(blockId).build(); + rpcFutures.put(address, blockWorkerClient.get().getBlockChecksum(request)); + } + } + for (Map.Entry> + entry : rpcFutures.entrySet()) { + try { + Map map = entry.getValue().get().getChecksumMap(); + if (!map.containsKey(blockId)) { + LOG.warn("Block {} does not exist on worker {}", blockId, entry.getKey()); + } else { + results.put(entry.getKey(), map.get(blockId)); + } + } catch (Exception e) { + LOG.error("Error calling RPC on worker {}", entry.getKey(), e); + } + } + return results; + } + + /** + * Checks if the CRC64 checksum is consistent between alluxio and UFS + * + * @param fsContext the file system context + * @param fileSystem the file system + * @param plainPath the file path + * @return the crc64 value, if the check passes, otherwise an exception is thrown + */ + public static long checkCRC64( + FileSystemContext fsContext, FileSystem fileSystem, AlluxioURI plainPath) + throws AlluxioException, IOException { + long ufsCRC = getUfsCRC64(fileSystem, plainPath); + long alluxioCRC = calculateAlluxioCRC64(fsContext, fileSystem, plainPath); + if (ufsCRC != alluxioCRC) { + throw new RuntimeException("CRC check of file " + plainPath + " failed. " + + "ufs CRC: " + Long.toHexString(ufsCRC) + " alluxio CRC: " + + Long.toHexString(alluxioCRC)); + } + return ufsCRC; + } +}