From 0d593e01a1cc27505fe60c4b57bc5c2d7a2da645 Mon Sep 17 00:00:00 2001 From: elega <445092967@qq.com> Date: Tue, 3 Sep 2024 20:59:03 +0800 Subject: [PATCH] update --- .../stream/DefaultBlockWorkerClient.java | 8 ++ .../databuffer/NioHeapBufferPool.java | 74 ++++++++++ .../underfs/ObjectUnderFileSystem.java | 17 ++- .../src/main/java/alluxio/util/CRC64.java | 4 + .../alluxio/worker/block/BlockWorker.java | 2 +- .../master/file/DefaultFileSystemMaster.java | 5 +- .../worker/block/DefaultBlockWorker.java | 78 ++++++++--- .../grpc/BlockWorkerClientServiceHandler.java | 14 ++ .../alluxio/worker/block/NoopBlockWorker.java | 2 +- .../cli/fs/command/CRC64CheckCommand.java | 131 +++++++++++++++--- 10 files changed, 290 insertions(+), 45 deletions(-) create mode 100644 core/common/src/main/java/alluxio/network/protocol/databuffer/NioHeapBufferPool.java diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/DefaultBlockWorkerClient.java b/core/client/fs/src/main/java/alluxio/client/block/stream/DefaultBlockWorkerClient.java index 95d5c16d255a..e404b4d8e987 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/DefaultBlockWorkerClient.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/DefaultBlockWorkerClient.java @@ -24,6 +24,8 @@ import alluxio.grpc.DataMessageMarshaller; import alluxio.grpc.DataMessageMarshallerProvider; import alluxio.grpc.FreeWorkerRequest; +import alluxio.grpc.GetBlockChecksumRequest; +import alluxio.grpc.GetBlockChecksumResponse; import alluxio.grpc.GrpcChannel; import alluxio.grpc.GrpcChannelBuilder; import alluxio.grpc.GrpcNetworkGroup; @@ -248,4 +250,10 @@ public void freeWorker() { public ListenableFuture load(LoadRequest request) { return mRpcFutureStub.load(request); } + + @Override + public ListenableFuture getBlockChecksum( + GetBlockChecksumRequest request) { + return mRpcFutureStub.getBlockChecksum(request); + } } diff --git a/core/common/src/main/java/alluxio/network/protocol/databuffer/NioHeapBufferPool.java b/core/common/src/main/java/alluxio/network/protocol/databuffer/NioHeapBufferPool.java new file mode 100644 index 000000000000..51998318e745 --- /dev/null +++ b/core/common/src/main/java/alluxio/network/protocol/databuffer/NioHeapBufferPool.java @@ -0,0 +1,74 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.network.protocol.databuffer; + +import alluxio.exception.runtime.ResourceExhaustedRuntimeException; +import alluxio.retry.RetryPolicy; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.Map; +import java.util.TreeMap; + +/** + * Direct buffer pool. + */ +public class NioHeapBufferPool { + private static final TreeMap> BUF_POOL = new TreeMap<>(); + + /** + * @param length + * @return buffer + */ + public static synchronized ByteBuffer acquire(int length) { + Map.Entry> entry = BUF_POOL.ceilingEntry(length); + if (entry == null || entry.getValue().size() == 0) { + return ByteBuffer.allocate(length); + } + ByteBuffer buffer = entry.getValue().pop(); + buffer.clear(); + // the buffer probably is larger than the amount of capacity being requested + // need to set the limit explicitly + buffer.limit(length); + return buffer; + } + + /** + * @param length + * @param policy the retry policy to use + * @return buffer + */ + public static synchronized ByteBuffer acquire(int length, RetryPolicy policy) { + Error cause = null; + while (policy.attempt()) { + try { + return acquire(length); + } catch (OutOfMemoryError error) { + cause = error; + } + } + throw new ResourceExhaustedRuntimeException("Not enough heap memory allocated to buffer", + cause, false); + } + + /** + * @param buffer + */ + public static synchronized void release(ByteBuffer buffer) { + LinkedList bufList = BUF_POOL.get(buffer.capacity()); + if (bufList == null) { + bufList = new LinkedList<>(); + BUF_POOL.put(buffer.capacity(), bufList); + } + bufList.push(buffer); + } +} diff --git a/core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java b/core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java index 4cb64ecee818..77d3ecf1910a 100755 --- a/core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java +++ b/core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java @@ -12,6 +12,7 @@ package alluxio.underfs; import alluxio.AlluxioURI; +import alluxio.Constants; import alluxio.collections.Pair; import alluxio.conf.AlluxioConfiguration; import alluxio.conf.PropertyKey; @@ -543,7 +544,7 @@ public UfsFileStatus getFileStatus(String path, GetFileStatusOptions options) th Map xAttr = null; if (details.getCrc64Checksum().isPresent()) { xAttr = new HashMap<>(); - xAttr.put("crc64", details.getCrc64Checksum().get().getBytes()); + xAttr.put(Constants.CRC64_KEY, details.getCrc64Checksum().get().getBytes()); } ObjectPermissions permissions = getPermissions(); return @@ -567,11 +568,17 @@ public UfsStatus getStatus(String path) throws IOException { return getDirectoryStatus(path); } ObjectStatus details = getObjectStatus(stripPrefixIfPresent(path)); + ObjectPermissions permissions = getPermissions(); if (details != null) { - ObjectPermissions permissions = getPermissions(); - return new UfsFileStatus(path, details.getContentHash(), details.getContentLength(), - details.getLastModifiedTimeMs(), permissions.getOwner(), permissions.getGroup(), - permissions.getMode(), mUfsConf.getBytes(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT)); + Map xAttr = null; + if (details.getCrc64Checksum().isPresent()) { + xAttr = new HashMap<>(); + xAttr.put(Constants.CRC64_KEY, details.getCrc64Checksum().get().getBytes()); + } + return + new UfsFileStatus(path, details.getContentHash(), details.getContentLength(), + details.getLastModifiedTimeMs(), permissions.getOwner(), permissions.getGroup(), + permissions.getMode(), xAttr, mUfsConf.getBytes(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT)); } return getDirectoryStatus(path); } diff --git a/core/common/src/main/java/alluxio/util/CRC64.java b/core/common/src/main/java/alluxio/util/CRC64.java index 4068320de0f6..69350d3c0153 100644 --- a/core/common/src/main/java/alluxio/util/CRC64.java +++ b/core/common/src/main/java/alluxio/util/CRC64.java @@ -1,8 +1,12 @@ package alluxio.util; +import java.nio.ByteBuffer; import java.util.zip.Checksum; public class CRC64 implements Checksum { + /* 64-bit CRC-ecma182 polynomial with these coefficients, but reversed: + 64, 62, 57, 55, 54, 53, 52, 47, 46, 45, 40, 39, 38, 37, 35, 33, 32, + 31, 29, 27, 24, 23, 22, 21, 19, 17, 13, 12, 10, 9, 7, 4, 1, 0 */ private final static long POLY = (long) 0xc96c5795d7870f42L; // ECMA-182 /* CRC64 calculation table. */ diff --git a/core/common/src/main/java/alluxio/worker/block/BlockWorker.java b/core/common/src/main/java/alluxio/worker/block/BlockWorker.java index 879db99dc0da..6a82bd6ab570 100644 --- a/core/common/src/main/java/alluxio/worker/block/BlockWorker.java +++ b/core/common/src/main/java/alluxio/worker/block/BlockWorker.java @@ -246,5 +246,5 @@ BlockReader createUfsBlockReader(long sessionId, long blockId, long offset, bool */ WorkerNetAddress getWorkerAddress(); - Map calculateBlockChecksum(); + Map calculateBlockChecksum(List blockIds); } diff --git a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java index be85230a9ca8..eaa58ce570f0 100644 --- a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java +++ b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java @@ -1039,7 +1039,10 @@ private FileInfo getFileInfoFromUfs(AlluxioURI path) { UfsStatus ufsStatus; try { ufsStatus = ufs.getStatus(ufsPath); - return FileInfo.fromUfsStatus(ufsStatus); + FileInfo fi = FileInfo.fromUfsStatus(ufsStatus); + // Set a dummy TTL action to avoid NPE + fi.setTtlAction(TtlAction.DELETE); + return fi; } catch (FileNotFoundException e) { throw new RuntimeException("File not found"); } diff --git a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java index 64962dc67e06..de2795cfb99c 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java @@ -28,6 +28,7 @@ import alluxio.exception.AlluxioException; import alluxio.exception.ExceptionMessage; import alluxio.exception.runtime.AlluxioRuntimeException; +import alluxio.exception.runtime.BlockDoesNotExistRuntimeException; import alluxio.exception.runtime.ResourceExhaustedRuntimeException; import alluxio.exception.status.AlluxioStatusException; import alluxio.grpc.AsyncCacheRequest; @@ -46,6 +47,8 @@ import alluxio.metrics.MetricInfo; import alluxio.metrics.MetricKey; import alluxio.metrics.MetricsSystem; +import alluxio.network.protocol.databuffer.NioDirectBufferPool; +import alluxio.network.protocol.databuffer.NioHeapBufferPool; import alluxio.proto.dataserver.Protocol; import alluxio.retry.RetryUtils; import alluxio.security.user.ServerUserState; @@ -68,19 +71,27 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.io.Closer; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.NotThreadSafe; @@ -131,6 +142,7 @@ public class DefaultBlockWorker extends AbstractWorker implements BlockWorker { private final FuseManager mFuseManager; protected WorkerNetAddress mAddress; + private final ExecutorService mChecksumCalculationThreadPool; /** * Constructs a default block worker. @@ -166,7 +178,11 @@ public DefaultBlockWorker(BlockMasterClientPool blockMasterClientPool, GrpcExecutors.CACHE_MANAGER_EXECUTOR, this, fsContext); mFuseManager = mResourceCloser.register(new FuseManager(fsContext)); mWhitelist = new PrefixList(Configuration.getList(PropertyKey.WORKER_WHITELIST)); - + mChecksumCalculationThreadPool = + ExecutorServiceFactories.fixedThreadPool("checksum-calculation-pool", 16) + .create(); + ((ThreadPoolExecutor) mChecksumCalculationThreadPool) + .setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); Metrics.registerGauges(this); } @@ -603,22 +619,50 @@ public void close() { } @Override - public Map calculateBlockChecksum() { - long blockId = 114514; -// BlockMeta bm = mBlockStore; - // Set option to avoid reading from UFS - // TODO: 64MB is too much - try { - BlockReader br = mBlockStore.createBlockReader( - -1, blockId, 0, false, Protocol.OpenUfsBlockOptions.getDefaultInstance()); - // Read this 1MB / 4MB every time and then combine - // Thread pool? - CRC64.fromBytes(br.read(0, br.getLength()).array()); - } catch (Exception e) { - throw new RuntimeException(e); + public Map calculateBlockChecksum(List blockIds) { + int chunkSize = 1024 * 1024 * 8; //8MB + HashMap result = new HashMap<>(); + List> futures = new ArrayList<>(); + for (long blockId: blockIds) { + Future future = mChecksumCalculationThreadPool.submit(()->{ + ByteBuffer bf = null; + try { + CRC64 crc64 = new CRC64(); + BlockReader br = mBlockStore.createBlockReader( + -1, blockId, 0, false, Protocol.OpenUfsBlockOptions.getDefaultInstance()); + bf = NioHeapBufferPool.acquire(chunkSize); + ByteBuf bb = Unpooled.wrappedBuffer(bf); + while (true) { + bb.clear(); + long bytesRead = br.transferTo(bb); + if (bytesRead < 0) { + break; + } + crc64.update(bf.array(), Math.toIntExact(bytesRead)); + } + result.put(blockId, + BlockChecksum.newBuilder() + .setBlockId(blockId).setBlockLength(br.getLength()) + .setChecksum(String.valueOf(crc64.getValue())).build()); + } catch (BlockDoesNotExistRuntimeException e) { + LOG.warn("Block {} not found during CRC calculation", blockId); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (bf != null) { + NioHeapBufferPool.release(bf); + } + } + }); + futures.add(future); + } + for (Future future: futures) { + try { + future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } } - // mBlockStore.createUfsBlockReader(sessionId, blockId, offset, positionShort, options); -// mBlockStore.getBlockStoreMeta(). - return null; + return result; } } diff --git a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockWorkerClientServiceHandler.java b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockWorkerClientServiceHandler.java index 5162fbb7c246..48a55cf4c6d4 100644 --- a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockWorkerClientServiceHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockWorkerClientServiceHandler.java @@ -17,6 +17,7 @@ import alluxio.conf.PropertyKey; import alluxio.grpc.AsyncCacheRequest; import alluxio.grpc.AsyncCacheResponse; +import alluxio.grpc.BlockChecksum; import alluxio.grpc.BlockStatus; import alluxio.grpc.BlockWorkerGrpc; import alluxio.grpc.CacheRequest; @@ -27,6 +28,8 @@ import alluxio.grpc.CreateLocalBlockResponse; import alluxio.grpc.FreeWorkerRequest; import alluxio.grpc.FreeWorkerResponse; +import alluxio.grpc.GetBlockChecksumRequest; +import alluxio.grpc.GetBlockChecksumResponse; import alluxio.grpc.LoadRequest; import alluxio.grpc.LoadResponse; import alluxio.grpc.MoveBlockRequest; @@ -247,4 +250,15 @@ private AuthenticatedUserInfo getAuthenticatedUserInfo() { throw Status.UNAUTHENTICATED.withDescription(e.toString()).asRuntimeException(); } } + + @Override + public void getBlockChecksum( + GetBlockChecksumRequest request, + StreamObserver responseObserver) { + RpcUtils.call(LOG, () -> { + Map checksums = + mBlockWorker.calculateBlockChecksum(request.getBlockIdsList()); + return GetBlockChecksumResponse.newBuilder().putAllChecksum(checksums).build(); + }, "getBlockChecksum", "request=%s", responseObserver, request); + } } diff --git a/core/server/worker/src/test/java/alluxio/worker/block/NoopBlockWorker.java b/core/server/worker/src/test/java/alluxio/worker/block/NoopBlockWorker.java index 48b151bd82d8..7468eaffb12d 100644 --- a/core/server/worker/src/test/java/alluxio/worker/block/NoopBlockWorker.java +++ b/core/server/worker/src/test/java/alluxio/worker/block/NoopBlockWorker.java @@ -205,7 +205,7 @@ public void cleanupSession(long sessionId) { } @Override - public Map calculateBlockChecksum() { + public Map calculateBlockChecksum(List blockIds) { return Collections.emptyMap(); } } diff --git a/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommand.java b/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommand.java index 599b927d7913..e5a12f935267 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/CRC64CheckCommand.java @@ -1,60 +1,151 @@ package alluxio.cli.fs.command; +import alluxio.AlluxioURI; import alluxio.Constants; import alluxio.client.block.stream.BlockWorkerClient; +import alluxio.client.file.FileSystemContext; import alluxio.client.file.URIStatus; import alluxio.exception.AlluxioException; import alluxio.grpc.BlockChecksum; import alluxio.grpc.BlockLocation; +import alluxio.grpc.DeletePOptions; import alluxio.grpc.GetBlockChecksumRequest; import alluxio.grpc.GetBlockChecksumResponse; import alluxio.grpc.GetStatusPOptions; +import alluxio.master.block.BlockId; import alluxio.resource.CloseableResource; import alluxio.util.CRC64; import alluxio.wire.BlockLocationInfo; +import alluxio.wire.WorkerNetAddress; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import javax.annotation.Nullable; +import org.apache.commons.cli.CommandLine; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.zip.Checksum; public class CRC64CheckCommand extends AbstractFileSystemCommand{ + public CRC64CheckCommand( + @Nullable FileSystemContext fsContext) { + super(fsContext); + } - void aaa() throws IOException, AlluxioException, ExecutionException, InterruptedException { - long crc64Value = 0; + @Override + protected void runPlainPath(AlluxioURI plainPath, CommandLine cl) + throws AlluxioException, IOException { + /* + CRC64 aaa = new CRC64(); + aaa.update("aaa".getBytes(), 3); + System.out.println(Long.toHexString(aaa.getValue())); + if (true) { + return; + } + + */ + + URIStatus ufsStatus = mFileSystem.getStatus( + plainPath, GetStatusPOptions.newBuilder().setDirectUfsAccess(true).build()); + if (ufsStatus.getXAttr() == null || !ufsStatus.getXAttr().containsKey(Constants.CRC64_KEY)) { + System.out.println("UFS does not store crc64"); +// return; + } - URIStatus status = mFileSystem.getStatus("FILE_PATH"); + long crc64Value = 0; + URIStatus status = mFileSystem.getStatus(plainPath); + if (status.getInAlluxioPercentage() != 100) { + System.out.println("Skipping the file because the in alluxio pct isnt 100"); + return; + } List bl = mFileSystem.getBlockLocations(status); - CloseableResource blockWorkerClient = - mFsContext.acquireBlockWorkerClient(bl.get(0).getLocations().get(0)); - ListenableFuture f = blockWorkerClient.get().getBlockChecksum(GetBlockChecksumRequest.getDefaultInstance()); - ListenableFuture> ff = Futures.allAsList(f); - //if (status.getInAlluxioPercentage() != 100) - Map combinedMap = ff.get().stream() - .flatMap(proto -> proto.getChecksumMap().entrySet().stream()) - .collect(Collectors.toMap( - Map.Entry::getKey, - Map.Entry::getValue, - (oldValue, newValue) -> oldValue)); + Map> blockIdMap = new HashMap<>(); + for (BlockLocationInfo blockLocationInfo: bl) { + for (WorkerNetAddress address: blockLocationInfo.getLocations()) { + if (!blockIdMap.containsKey(address)) { + blockIdMap.put(address, new ArrayList<>()); + } + blockIdMap.get(address).add(blockLocationInfo.getBlockInfo().getBlockInfo().getBlockId()); + } + } + Map> futures = new HashMap<>(); + for (Map.Entry> entry: blockIdMap.entrySet()) { + try (CloseableResource blockWorkerClient + = mFsContext.acquireBlockWorkerClient(entry.getKey())) { + GetBlockChecksumRequest request = + GetBlockChecksumRequest.newBuilder().addAllBlockIds(entry.getValue()).addBlockIds(114514).build(); + futures.put(entry.getKey(), blockWorkerClient.get().getBlockChecksum(request)); + } + } + Map checksumMap = new HashMap<>(); + for (Map.Entry> entry: + futures.entrySet()) { + try { + GetBlockChecksumResponse response = entry.getValue().get(); + for (Map.Entry checksumEntry: response.getChecksumMap().entrySet()) { + long blockId = checksumEntry.getKey(); + BlockChecksum checksum = checksumEntry.getValue(); + if (checksumMap.containsKey(blockId)) { + BlockChecksum checksumFromMap = checksumMap.get(blockId); + if (checksumFromMap.getBlockLength() != checksum.getBlockLength() + || !Objects.equals(checksumFromMap.getChecksum(), checksum.getChecksum())) { + throw new RuntimeException("Block replica > 1 && the checksum does not match"); + } + } + checksumMap.put(blockId, checksum); + } + } catch (Exception e) { + throw new RuntimeException("rpc call failed " + entry.getKey(), e); + } + } for (long blockId : status.getBlockIds()) { - if (!combinedMap.containsKey(blockId)) { + if (!checksumMap.containsKey(blockId)) { throw new RuntimeException("block does not exist"); } - BlockChecksum bcs = combinedMap.get(blockId); + BlockChecksum bcs = checksumMap.get(blockId); crc64Value = CRC64.combine(crc64Value, Long.parseLong(bcs.getChecksum()), bcs.getBlockLength()); } - URIStatus ufsStatus = mFileSystem.getStatus( - "FILE_PATH", GetStatusPOptions.newBuilder().setDirectUfsAccess(true).build()); - + System.out.println("CRC64 value from workers: " + Long.toHexString(crc64Value)); long crc64ValueFromUfs = Long.parseLong(new String(ufsStatus.getXAttr().get(Constants.CRC64_KEY))); if (crc64Value != crc64ValueFromUfs) { - throw new RuntimeException("Mismatch!"); + System.out.println("Mismatch, data deleted from alluxio"); + mFileSystem.delete(plainPath, DeletePOptions.newBuilder().setAlluxioOnly(true).build()); + } + System.out.println("check passed"); + } + + @Override + public int run(CommandLine cl) throws AlluxioException, IOException { + String[] args = cl.getArgs(); + for (String dirArg : args) { + AlluxioURI path = new AlluxioURI(dirArg); + runPlainPath(path, cl); } + return 0; + } + + @Override + public String getCommandName() { + return "crc64check"; + } + + @Override + public String getUsage() { + return "foobar"; + } + + @Override + public String getDescription() { + return "barfoo"; } }