Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
elega committed Sep 4, 2024
1 parent 0d593e0 commit ce74696
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@
import alluxio.retry.RetryPolicy;
import alluxio.retry.RetryUtils;
import alluxio.security.user.UserState;
import alluxio.util.CommonUtils;

import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.netty.util.ResourceLeakDetector;
Expand All @@ -58,6 +62,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

Expand All @@ -80,6 +85,8 @@ public class DefaultBlockWorkerClient implements BlockWorkerClient {
private final BlockWorkerGrpc.BlockWorkerStub mStreamingAsyncStub;
private final BlockWorkerGrpc.BlockWorkerBlockingStub mRpcBlockingStub;
private final BlockWorkerGrpc.BlockWorkerFutureStub mRpcFutureStub;
private final int mChecksumBatchSize;
private final int mSlowChecksumCalculationRequestThresholdMs = 12;

@Nullable
private final ResourceLeakTracker<DefaultBlockWorkerClient> mTracker;
Expand Down Expand Up @@ -129,6 +136,8 @@ public DefaultBlockWorkerClient(UserState userState, GrpcServerAddress address,
mRpcFutureStub = BlockWorkerGrpc.newFutureStub(mRpcChannel);
mAddress = address;
mRpcTimeoutMs = alluxioConf.getMs(PropertyKey.USER_RPC_RETRY_MAX_DURATION);
mChecksumBatchSize = alluxioConf.getInt(
PropertyKey.USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE);
mTracker = DETECTOR.track(this);
}

Expand Down Expand Up @@ -254,6 +263,49 @@ public ListenableFuture<LoadResponse> load(LoadRequest request) {
@Override
public ListenableFuture<GetBlockChecksumResponse> getBlockChecksum(
GetBlockChecksumRequest request) {
return mRpcFutureStub.getBlockChecksum(request);
// Non-batched mode
if (request.getBlockIdsCount() < mChecksumBatchSize) {
long startTs = CommonUtils.getCurrentMs();
ListenableFuture<GetBlockChecksumResponse> future = mRpcFutureStub.getBlockChecksum(request);
future.addListener(()->{
long timeElapsed = CommonUtils.getCurrentMs() - startTs;
if (timeElapsed > mSlowChecksumCalculationRequestThresholdMs) {
LOG.warn(
"Slow checksum calculation RPC for {} blocks, address {}, time elapsed {}ms ",
request.getBlockIdsCount(), mAddress, timeElapsed);
}
}, MoreExecutors.directExecutor());
return future;
}

// Batched mode
GetBlockChecksumResponse.Builder responseBuilder = GetBlockChecksumResponse.newBuilder();
ListenableFuture<GetBlockChecksumResponse> chainedCalls =
Futures.immediateFuture(GetBlockChecksumResponse.getDefaultInstance());
List<List<Long>> blockIdsPerBatch =
Lists.partition(request.getBlockIdsList(), mChecksumBatchSize);
for (List<Long> blockIdsOfBatch : blockIdsPerBatch) {
chainedCalls = Futures.transformAsync(chainedCalls, (previousResult) -> {
responseBuilder.putAllChecksum(previousResult.getChecksumMap());
GetBlockChecksumRequest requestOfBatch =
GetBlockChecksumRequest.newBuilder().addAllBlockIds(blockIdsOfBatch).build();
ListenableFuture<GetBlockChecksumResponse> future =
mRpcFutureStub.getBlockChecksum(requestOfBatch);
long startTs = CommonUtils.getCurrentMs();
future.addListener(()->{
long timeElapsed = CommonUtils.getCurrentMs() - startTs;
if (timeElapsed > mSlowChecksumCalculationRequestThresholdMs) {
LOG.warn(
"Slow checksum calculation RPC for {} blocks, address {}, time elapsed {}ms ",
blockIdsOfBatch.size(), mAddress, timeElapsed);
}
}, MoreExecutors.directExecutor());
return future;
}, MoreExecutors.directExecutor());
}
return Futures.transform(chainedCalls, (lastResult) -> {
responseBuilder.putAllChecksum(lastResult.getChecksumMap());
return responseBuilder.build();
}, MoreExecutors.directExecutor());
}
}
32 changes: 32 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -4297,6 +4297,22 @@ public String toString() {
.setDescription("The implementation of LocalBlockStore that can be instantiated.")
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD =
dataSizeBuilder(Name.WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD)
.setDefaultValue(-1)
.setDescription("The throughput threshold per second to trigger the rate limit. "
+ "-1 means no limitation. The minimum precision is 1MB.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE =
intBuilder(Name.WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE)
.setDefaultValue(16)
.setDescription("The thread pool size for the worker block checksum calculation. "
+ "Each thread will take up at most 8MB (the chunksize) memory")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_CONTAINER_HOSTNAME =
stringBuilder(Name.WORKER_CONTAINER_HOSTNAME)
.setDescription("The container hostname if worker is running in a container.")
Expand Down Expand Up @@ -6411,6 +6427,16 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE =
intBuilder(Name.USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE)
.setDefaultValue(Integer.MAX_VALUE)
.setDescription("The batch size of block ids for the checksum calculation rpc. "
+ "by default all block ids are in a single batch. Reduce this value if "
+ "you see too many timeouts.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.CLIENT)
.build();


public static final PropertyKey USER_FILE_WRITE_TYPE_DEFAULT =
enumBuilder(Name.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class)
Expand Down Expand Up @@ -8710,6 +8736,10 @@ public static final class Name {
public static final String WORKER_BLOCK_HEARTBEAT_TIMEOUT_MS =
"alluxio.worker.block.heartbeat.timeout";
public static final String WORKER_BLOCK_STORE_TYPE = "alluxio.worker.block.store.type";
public static final String WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD =
"alluxio.worker.block.checksum.calculation.throughput.threshold";
public static final String WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE =
"alluxio.worker.block.checksum.calculation.thread.pool.size";
public static final String WORKER_CONTAINER_HOSTNAME =
"alluxio.worker.container.hostname";
public static final String WORKER_DATA_FOLDER = "alluxio.worker.data.folder";
Expand Down Expand Up @@ -9102,6 +9132,8 @@ public static final class Name {
"alluxio.user.client.cache.include.mtime";
public static final String USER_CLIENT_REPORT_VERSION_ENABLED =
"alluxio.user.client.report.version.enabled";
public static final String USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE =
"alluxio.useer.client.checksum.calculation.batch.size";
public static final String USER_CONF_CLUSTER_DEFAULT_ENABLED =
"alluxio.user.conf.cluster.default.enabled";
public static final String USER_CONF_SYNC_INTERVAL = "alluxio.user.conf.sync.interval";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.TreeMap;

/**
* Direct buffer pool.
* Heap buffer pool.
*/
public class NioHeapBufferPool {
private static final TreeMap<Integer, LinkedList<ByteBuffer>> BUF_POOL = new TreeMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import alluxio.retry.RetryUtils;
import alluxio.security.user.ServerUserState;
import alluxio.util.CRC64;
import alluxio.util.CommonUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.util.io.FileUtils;
import alluxio.wire.FileInfo;
Expand All @@ -71,6 +72,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
Expand All @@ -87,6 +89,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -143,6 +146,7 @@ public class DefaultBlockWorker extends AbstractWorker implements BlockWorker {

protected WorkerNetAddress mAddress;
private final ExecutorService mChecksumCalculationThreadPool;
private final Optional<RateLimiter> mChecksumCalculationRateLimiter ;

/**
* Constructs a default block worker.
Expand Down Expand Up @@ -178,11 +182,20 @@ public DefaultBlockWorker(BlockMasterClientPool blockMasterClientPool,
GrpcExecutors.CACHE_MANAGER_EXECUTOR, this, fsContext);
mFuseManager = mResourceCloser.register(new FuseManager(fsContext));
mWhitelist = new PrefixList(Configuration.getList(PropertyKey.WORKER_WHITELIST));
mChecksumCalculationThreadPool =
ExecutorServiceFactories.fixedThreadPool("checksum-calculation-pool", 16)
.create();
((ThreadPoolExecutor) mChecksumCalculationThreadPool)
.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
mChecksumCalculationThreadPool = ExecutorServiceFactories.fixedThreadPool(
"checksum-calculation-pool",
Configuration.getInt(PropertyKey.WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE))
.create();
long checksumThroughputThreshold =
Configuration.getBytes(PropertyKey.WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD);
if (checksumThroughputThreshold <= 0) {
mChecksumCalculationRateLimiter = Optional.empty();
} else {
// The min precision is 1kb to avoid data overflow
mChecksumCalculationRateLimiter =
Optional.of(RateLimiter.create(
Math.max(Math.toIntExact(checksumThroughputThreshold / 1024), 1)));
}
Metrics.registerGauges(this);
}

Expand Down Expand Up @@ -623,8 +636,8 @@ public Map<Long, BlockChecksum> calculateBlockChecksum(List<Long> blockIds) {
int chunkSize = 1024 * 1024 * 8; //8MB
HashMap<Long, BlockChecksum> result = new HashMap<>();
List<Future<?>> futures = new ArrayList<>();
for (long blockId: blockIds) {
Future<?> future = mChecksumCalculationThreadPool.submit(()->{
for (long blockId : blockIds) {
Future<?> future = mChecksumCalculationThreadPool.submit(() -> {
ByteBuffer bf = null;
try {
CRC64 crc64 = new CRC64();
Expand All @@ -639,6 +652,10 @@ public Map<Long, BlockChecksum> calculateBlockChecksum(List<Long> blockIds) {
break;
}
crc64.update(bf.array(), Math.toIntExact(bytesRead));
int permits = Math.toIntExact(Math.max(1, bytesRead / 1024));
if (mChecksumCalculationRateLimiter.isPresent()) {
mChecksumCalculationRateLimiter.get().acquire(permits);
}
}
result.put(blockId,
BlockChecksum.newBuilder()
Expand All @@ -656,7 +673,7 @@ public Map<Long, BlockChecksum> calculateBlockChecksum(List<Long> blockIds) {
});
futures.add(future);
}
for (Future<?> future: futures) {
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
Expand Down
112 changes: 8 additions & 104 deletions shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommand.java
Original file line number Diff line number Diff line change
@@ -1,39 +1,12 @@
package alluxio.cli.fs.command;

import alluxio.AlluxioURI;
import alluxio.Constants;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.exception.AlluxioException;
import alluxio.grpc.BlockChecksum;
import alluxio.grpc.BlockLocation;
import alluxio.grpc.DeletePOptions;
import alluxio.grpc.GetBlockChecksumRequest;
import alluxio.grpc.GetBlockChecksumResponse;
import alluxio.grpc.GetStatusPOptions;
import alluxio.master.block.BlockId;
import alluxio.resource.CloseableResource;
import alluxio.util.CRC64;
import alluxio.wire.BlockLocationInfo;
import alluxio.wire.WorkerNetAddress;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nullable;
import org.apache.commons.cli.CommandLine;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.zip.Checksum;

public class CRC64CheckCommand extends AbstractFileSystemCommand{
public CRC64CheckCommand(
Expand All @@ -45,83 +18,13 @@ public CRC64CheckCommand(
protected void runPlainPath(AlluxioURI plainPath, CommandLine cl)
throws AlluxioException, IOException {
/*
CRC64 aaa = new CRC64();
aaa.update("aaa".getBytes(), 3);
System.out.println(Long.toHexString(aaa.getValue()));
if (true) {
return;
}
System.out.println("Checking " + plainPath);
long crc64 = CRC64CheckCommandUtils.checkCRC64(mFsContext, mFileSystem, plainPath);
System.out.println("CRC64 check for file " + plainPath + " succeeded. "
+ "CRC64: " + crc64);
*/

URIStatus ufsStatus = mFileSystem.getStatus(
plainPath, GetStatusPOptions.newBuilder().setDirectUfsAccess(true).build());
if (ufsStatus.getXAttr() == null || !ufsStatus.getXAttr().containsKey(Constants.CRC64_KEY)) {
System.out.println("UFS does not store crc64");
// return;
}

long crc64Value = 0;
URIStatus status = mFileSystem.getStatus(plainPath);
if (status.getInAlluxioPercentage() != 100) {
System.out.println("Skipping the file because the in alluxio pct isnt 100");
return;
}
List<BlockLocationInfo> bl = mFileSystem.getBlockLocations(status);
Map<WorkerNetAddress, List<Long>> blockIdMap = new HashMap<>();
for (BlockLocationInfo blockLocationInfo: bl) {
for (WorkerNetAddress address: blockLocationInfo.getLocations()) {
if (!blockIdMap.containsKey(address)) {
blockIdMap.put(address, new ArrayList<>());
}
blockIdMap.get(address).add(blockLocationInfo.getBlockInfo().getBlockInfo().getBlockId());
}
}
Map<WorkerNetAddress, ListenableFuture<GetBlockChecksumResponse>> futures = new HashMap<>();
for (Map.Entry<WorkerNetAddress, List<Long>> entry: blockIdMap.entrySet()) {
try (CloseableResource<BlockWorkerClient> blockWorkerClient
= mFsContext.acquireBlockWorkerClient(entry.getKey())) {
GetBlockChecksumRequest request =
GetBlockChecksumRequest.newBuilder().addAllBlockIds(entry.getValue()).addBlockIds(114514).build();
futures.put(entry.getKey(), blockWorkerClient.get().getBlockChecksum(request));
}
}
Map<Long, BlockChecksum> checksumMap = new HashMap<>();
for (Map.Entry<WorkerNetAddress, ListenableFuture<GetBlockChecksumResponse>> entry:
futures.entrySet()) {
try {
GetBlockChecksumResponse response = entry.getValue().get();
for (Map.Entry<Long, BlockChecksum> checksumEntry: response.getChecksumMap().entrySet()) {
long blockId = checksumEntry.getKey();
BlockChecksum checksum = checksumEntry.getValue();
if (checksumMap.containsKey(blockId)) {
BlockChecksum checksumFromMap = checksumMap.get(blockId);
if (checksumFromMap.getBlockLength() != checksum.getBlockLength()
|| !Objects.equals(checksumFromMap.getChecksum(), checksum.getChecksum())) {
throw new RuntimeException("Block replica > 1 && the checksum does not match");
}
}
checksumMap.put(blockId, checksum);
}
} catch (Exception e) {
throw new RuntimeException("rpc call failed " + entry.getKey(), e);
}
}
for (long blockId : status.getBlockIds()) {
if (!checksumMap.containsKey(blockId)) {
throw new RuntimeException("block does not exist");
}
BlockChecksum bcs = checksumMap.get(blockId);
crc64Value = CRC64.combine(crc64Value, Long.parseLong(bcs.getChecksum()), bcs.getBlockLength());
}
System.out.println("CRC64 value from workers: " + Long.toHexString(crc64Value));
long crc64ValueFromUfs =
Long.parseLong(new String(ufsStatus.getXAttr().get(Constants.CRC64_KEY)));
if (crc64Value != crc64ValueFromUfs) {
System.out.println("Mismatch, data deleted from alluxio");
mFileSystem.delete(plainPath, DeletePOptions.newBuilder().setAlluxioOnly(true).build());
}
System.out.println("check passed");
System.out.println(CRC64CheckCommandUtils.calculateAlluxioCRC64(mFsContext, mFileSystem, plainPath));
}

@Override
Expand All @@ -141,11 +44,12 @@ public String getCommandName() {

@Override
public String getUsage() {
return "foobar";
return "crc64check <path> ...";
}

@Override
public String getDescription() {
return "barfoo";
return "Does the CRC check on a given alluxio path. The UFS must support CRC64 checksum and "
+ "the file must be fully cached on alluxio.";
}
}
Loading

0 comments on commit ce74696

Please sign in to comment.