Skip to content

Commit

Permalink
[CELEBORN-796] Support for globally disable thread-local cache in the…
Browse files Browse the repository at this point in the history
… shared PooledByteBufAllocator

### What changes were proposed in this pull request?

As title

### Why are the changes needed?

As title

### Does this PR introduce _any_ user-facing change?

Yes, the thread local cache of shared `PooledByteBufAllocator` can be disabled by setting `celeborn.network.memory.allocator.allowCache=false`

### How was this patch tested?

Pass GA

Closes #1716 from cfmcgrady/allow-cache.

Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
cfmcgrady authored and waitinfuture committed Aug 11, 2023
1 parent 5462281 commit 6f1bb41
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@

/** Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO. */
public class NettyUtils {
private static volatile PooledByteBufAllocator _allocator;
private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
new PooledByteBufAllocator[2];
private static ConcurrentHashMap<String, Integer> allocatorsIndex =
JavaUtils.newConcurrentHashMap();
/** Creates a new ThreadFactory which prefixes each thread with the given name. */
Expand Down Expand Up @@ -118,29 +119,35 @@ private static PooledByteBufAllocator createPooledByteBufAllocator(
allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
}

private static PooledByteBufAllocator getSharedPooledByteBufAllocator(
CelebornConf conf, AbstractSource source) {
synchronized (PooledByteBufAllocator.class) {
if (_allocator == null) {
// each core should have one arena to allocate memory
_allocator = createPooledByteBufAllocator(true, true, conf.networkAllocatorArenas());
if (source != null) {
new NettyMemoryMetrics(
_allocator,
"shared-pool",
conf.networkAllocatorVerboseMetric(),
source,
Collections.emptyMap());
}
/**
* Returns the lazily created shared pooled ByteBuf allocator for the specified allowCache
* parameter value.
*/
public static synchronized PooledByteBufAllocator getSharedPooledByteBufAllocator(
CelebornConf conf, AbstractSource source, boolean allowCache) {
final int index = allowCache ? 0 : 1;
if (_sharedPooledByteBufAllocator[index] == null) {
_sharedPooledByteBufAllocator[index] =
createPooledByteBufAllocator(true, allowCache, conf.networkAllocatorArenas());
if (source != null) {
new NettyMemoryMetrics(
_sharedPooledByteBufAllocator[index],
"shared-pool",
conf.networkAllocatorVerboseMetric(),
source,
Collections.emptyMap());
}
return _allocator;
}
return _sharedPooledByteBufAllocator[index];
}

public static PooledByteBufAllocator getPooledByteBufAllocator(
TransportConf conf, AbstractSource source, boolean allowCache) {
if (conf.getCelebornConf().networkShareMemoryAllocator()) {
return getSharedPooledByteBufAllocator(conf.getCelebornConf(), source);
return getSharedPooledByteBufAllocator(
conf.getCelebornConf(),
source,
allowCache && conf.getCelebornConf().networkMemoryAllocatorAllowCache());
}
PooledByteBufAllocator allocator =
createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache, conf.clientThreads());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se

def networkShareMemoryAllocator: Boolean = get(NETWORK_MEMORY_ALLOCATOR_SHARE)

def networkMemoryAllocatorAllowCache: Boolean =
get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE)

def networkAllocatorArenas: Int = get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max(
Runtime.getRuntime.availableProcessors(),
2))
Expand Down Expand Up @@ -1234,6 +1237,15 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")

val NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.allowCache")
.categories("network")
.internal
.version("0.3.1")
.doc("When false, globally disable thread-local cache in the shared PooledByteBufAllocator.")
.booleanConf
.createWithDefault(true)

val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.share")
.categories("network")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ public class ChannelsLimiter extends ChannelDuplexHandler
private final AtomicBoolean isPaused = new AtomicBoolean(false);
private final AtomicInteger needTrimChannels = new AtomicInteger(0);
private final long waitTrimInterval;
private final boolean allowCache;

public ChannelsLimiter(String moduleName, CelebornConf conf) {
this.moduleName = moduleName;
this.waitTrimInterval = conf.workerDirectMemoryTrimChannelWaitInterval();
this.allowCache = conf.networkMemoryAllocatorAllowCache();
MemoryManager memoryManager = MemoryManager.instance();
memoryManager.registerMemoryListener(this);
}
Expand Down Expand Up @@ -147,7 +149,9 @@ public void onResume(String moduleName) {

@Override
public void onTrim() {
trimCache();
if (allowCache) {
trimCache();
}
}

static class TrimCache {}
Expand Down

0 comments on commit 6f1bb41

Please sign in to comment.