Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
elega committed Sep 3, 2024
1 parent c711326 commit 0d593e0
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import alluxio.grpc.DataMessageMarshaller;
import alluxio.grpc.DataMessageMarshallerProvider;
import alluxio.grpc.FreeWorkerRequest;
import alluxio.grpc.GetBlockChecksumRequest;
import alluxio.grpc.GetBlockChecksumResponse;
import alluxio.grpc.GrpcChannel;
import alluxio.grpc.GrpcChannelBuilder;
import alluxio.grpc.GrpcNetworkGroup;
Expand Down Expand Up @@ -248,4 +250,10 @@ public void freeWorker() {
public ListenableFuture<LoadResponse> load(LoadRequest request) {
return mRpcFutureStub.load(request);
}

@Override
public ListenableFuture<GetBlockChecksumResponse> getBlockChecksum(
GetBlockChecksumRequest request) {
return mRpcFutureStub.getBlockChecksum(request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.network.protocol.databuffer;

import alluxio.exception.runtime.ResourceExhaustedRuntimeException;
import alluxio.retry.RetryPolicy;

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;

/**
* Direct buffer pool.
*/
public class NioHeapBufferPool {
private static final TreeMap<Integer, LinkedList<ByteBuffer>> BUF_POOL = new TreeMap<>();

/**
* @param length
* @return buffer
*/
public static synchronized ByteBuffer acquire(int length) {
Map.Entry<Integer, LinkedList<ByteBuffer>> entry = BUF_POOL.ceilingEntry(length);
if (entry == null || entry.getValue().size() == 0) {
return ByteBuffer.allocate(length);
}
ByteBuffer buffer = entry.getValue().pop();
buffer.clear();
// the buffer probably is larger than the amount of capacity being requested
// need to set the limit explicitly
buffer.limit(length);
return buffer;
}

/**
* @param length
* @param policy the retry policy to use
* @return buffer
*/
public static synchronized ByteBuffer acquire(int length, RetryPolicy policy) {
Error cause = null;
while (policy.attempt()) {
try {
return acquire(length);
} catch (OutOfMemoryError error) {
cause = error;
}
}
throw new ResourceExhaustedRuntimeException("Not enough heap memory allocated to buffer",
cause, false);
}

/**
* @param buffer
*/
public static synchronized void release(ByteBuffer buffer) {
LinkedList<ByteBuffer> bufList = BUF_POOL.get(buffer.capacity());
if (bufList == null) {
bufList = new LinkedList<>();
BUF_POOL.put(buffer.capacity(), bufList);
}
bufList.push(buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.underfs;

import alluxio.AlluxioURI;
import alluxio.Constants;
import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
Expand Down Expand Up @@ -543,7 +544,7 @@ public UfsFileStatus getFileStatus(String path, GetFileStatusOptions options) th
Map<String, byte[]> xAttr = null;
if (details.getCrc64Checksum().isPresent()) {
xAttr = new HashMap<>();
xAttr.put("crc64", details.getCrc64Checksum().get().getBytes());
xAttr.put(Constants.CRC64_KEY, details.getCrc64Checksum().get().getBytes());
}
ObjectPermissions permissions = getPermissions();
return
Expand All @@ -567,11 +568,17 @@ public UfsStatus getStatus(String path) throws IOException {
return getDirectoryStatus(path);
}
ObjectStatus details = getObjectStatus(stripPrefixIfPresent(path));
ObjectPermissions permissions = getPermissions();
if (details != null) {
ObjectPermissions permissions = getPermissions();
return new UfsFileStatus(path, details.getContentHash(), details.getContentLength(),
details.getLastModifiedTimeMs(), permissions.getOwner(), permissions.getGroup(),
permissions.getMode(), mUfsConf.getBytes(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT));
Map<String, byte[]> xAttr = null;
if (details.getCrc64Checksum().isPresent()) {
xAttr = new HashMap<>();
xAttr.put(Constants.CRC64_KEY, details.getCrc64Checksum().get().getBytes());
}
return
new UfsFileStatus(path, details.getContentHash(), details.getContentLength(),
details.getLastModifiedTimeMs(), permissions.getOwner(), permissions.getGroup(),
permissions.getMode(), xAttr, mUfsConf.getBytes(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT));
}
return getDirectoryStatus(path);
}
Expand Down
4 changes: 4 additions & 0 deletions core/common/src/main/java/alluxio/util/CRC64.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package alluxio.util;

import java.nio.ByteBuffer;
import java.util.zip.Checksum;

public class CRC64 implements Checksum {
/* 64-bit CRC-ecma182 polynomial with these coefficients, but reversed:
64, 62, 57, 55, 54, 53, 52, 47, 46, 45, 40, 39, 38, 37, 35, 33, 32,
31, 29, 27, 24, 23, 22, 21, 19, 17, 13, 12, 10, 9, 7, 4, 1, 0 */
private final static long POLY = (long) 0xc96c5795d7870f42L; // ECMA-182

/* CRC64 calculation table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,5 +246,5 @@ BlockReader createUfsBlockReader(long sessionId, long blockId, long offset, bool
*/
WorkerNetAddress getWorkerAddress();

Map<Long, BlockChecksum> calculateBlockChecksum();
Map<Long, BlockChecksum> calculateBlockChecksum(List<Long> blockIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,10 @@ private FileInfo getFileInfoFromUfs(AlluxioURI path) {
UfsStatus ufsStatus;
try {
ufsStatus = ufs.getStatus(ufsPath);
return FileInfo.fromUfsStatus(ufsStatus);
FileInfo fi = FileInfo.fromUfsStatus(ufsStatus);
// Set a dummy TTL action to avoid NPE
fi.setTtlAction(TtlAction.DELETE);
return fi;
} catch (FileNotFoundException e) {
throw new RuntimeException("File not found");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import alluxio.exception.AlluxioException;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.runtime.BlockDoesNotExistRuntimeException;
import alluxio.exception.runtime.ResourceExhaustedRuntimeException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.grpc.AsyncCacheRequest;
Expand All @@ -46,6 +47,8 @@
import alluxio.metrics.MetricInfo;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.network.protocol.databuffer.NioDirectBufferPool;
import alluxio.network.protocol.databuffer.NioHeapBufferPool;
import alluxio.proto.dataserver.Protocol;
import alluxio.retry.RetryUtils;
import alluxio.security.user.ServerUserState;
Expand All @@ -68,19 +71,27 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -131,6 +142,7 @@ public class DefaultBlockWorker extends AbstractWorker implements BlockWorker {
private final FuseManager mFuseManager;

protected WorkerNetAddress mAddress;
private final ExecutorService mChecksumCalculationThreadPool;

/**
* Constructs a default block worker.
Expand Down Expand Up @@ -166,7 +178,11 @@ public DefaultBlockWorker(BlockMasterClientPool blockMasterClientPool,
GrpcExecutors.CACHE_MANAGER_EXECUTOR, this, fsContext);
mFuseManager = mResourceCloser.register(new FuseManager(fsContext));
mWhitelist = new PrefixList(Configuration.getList(PropertyKey.WORKER_WHITELIST));

mChecksumCalculationThreadPool =
ExecutorServiceFactories.fixedThreadPool("checksum-calculation-pool", 16)
.create();
((ThreadPoolExecutor) mChecksumCalculationThreadPool)
.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Metrics.registerGauges(this);
}

Expand Down Expand Up @@ -603,22 +619,50 @@ public void close() {
}

@Override
public Map<Long, BlockChecksum> calculateBlockChecksum() {
long blockId = 114514;
// BlockMeta bm = mBlockStore;
// Set option to avoid reading from UFS
// TODO: 64MB is too much
try {
BlockReader br = mBlockStore.createBlockReader(
-1, blockId, 0, false, Protocol.OpenUfsBlockOptions.getDefaultInstance());
// Read this 1MB / 4MB every time and then combine
// Thread pool?
CRC64.fromBytes(br.read(0, br.getLength()).array());
} catch (Exception e) {
throw new RuntimeException(e);
public Map<Long, BlockChecksum> calculateBlockChecksum(List<Long> blockIds) {
int chunkSize = 1024 * 1024 * 8; //8MB
HashMap<Long, BlockChecksum> result = new HashMap<>();
List<Future<?>> futures = new ArrayList<>();
for (long blockId: blockIds) {
Future<?> future = mChecksumCalculationThreadPool.submit(()->{
ByteBuffer bf = null;
try {
CRC64 crc64 = new CRC64();
BlockReader br = mBlockStore.createBlockReader(
-1, blockId, 0, false, Protocol.OpenUfsBlockOptions.getDefaultInstance());
bf = NioHeapBufferPool.acquire(chunkSize);
ByteBuf bb = Unpooled.wrappedBuffer(bf);
while (true) {
bb.clear();
long bytesRead = br.transferTo(bb);
if (bytesRead < 0) {
break;
}
crc64.update(bf.array(), Math.toIntExact(bytesRead));
}
result.put(blockId,
BlockChecksum.newBuilder()
.setBlockId(blockId).setBlockLength(br.getLength())
.setChecksum(String.valueOf(crc64.getValue())).build());
} catch (BlockDoesNotExistRuntimeException e) {
LOG.warn("Block {} not found during CRC calculation", blockId);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (bf != null) {
NioHeapBufferPool.release(bf);
}
}
});
futures.add(future);
}
for (Future<?> future: futures) {
try {
future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// mBlockStore.createUfsBlockReader(sessionId, blockId, offset, positionShort, options);
// mBlockStore.getBlockStoreMeta().
return null;
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import alluxio.conf.PropertyKey;
import alluxio.grpc.AsyncCacheRequest;
import alluxio.grpc.AsyncCacheResponse;
import alluxio.grpc.BlockChecksum;
import alluxio.grpc.BlockStatus;
import alluxio.grpc.BlockWorkerGrpc;
import alluxio.grpc.CacheRequest;
Expand All @@ -27,6 +28,8 @@
import alluxio.grpc.CreateLocalBlockResponse;
import alluxio.grpc.FreeWorkerRequest;
import alluxio.grpc.FreeWorkerResponse;
import alluxio.grpc.GetBlockChecksumRequest;
import alluxio.grpc.GetBlockChecksumResponse;
import alluxio.grpc.LoadRequest;
import alluxio.grpc.LoadResponse;
import alluxio.grpc.MoveBlockRequest;
Expand Down Expand Up @@ -247,4 +250,15 @@ private AuthenticatedUserInfo getAuthenticatedUserInfo() {
throw Status.UNAUTHENTICATED.withDescription(e.toString()).asRuntimeException();
}
}

@Override
public void getBlockChecksum(
GetBlockChecksumRequest request,
StreamObserver<GetBlockChecksumResponse> responseObserver) {
RpcUtils.call(LOG, () -> {
Map<Long, BlockChecksum> checksums =
mBlockWorker.calculateBlockChecksum(request.getBlockIdsList());
return GetBlockChecksumResponse.newBuilder().putAllChecksum(checksums).build();
}, "getBlockChecksum", "request=%s", responseObserver, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void cleanupSession(long sessionId) {
}

@Override
public Map<Long, BlockChecksum> calculateBlockChecksum() {
public Map<Long, BlockChecksum> calculateBlockChecksum(List<Long> blockIds) {
return Collections.emptyMap();
}
}
Loading

0 comments on commit 0d593e0

Please sign in to comment.