diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java index 69951e18784..4c941e864cb 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java @@ -42,7 +42,8 @@ /** Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO. */ public class NettyUtils { - private static volatile PooledByteBufAllocator _allocator; + private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator = + new PooledByteBufAllocator[2]; private static ConcurrentHashMap allocatorsIndex = JavaUtils.newConcurrentHashMap(); /** Creates a new ThreadFactory which prefixes each thread with the given name. */ @@ -118,29 +119,35 @@ private static PooledByteBufAllocator createPooledByteBufAllocator( allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads()); } - private static PooledByteBufAllocator getSharedPooledByteBufAllocator( - CelebornConf conf, AbstractSource source) { - synchronized (PooledByteBufAllocator.class) { - if (_allocator == null) { - // each core should have one arena to allocate memory - _allocator = createPooledByteBufAllocator(true, true, conf.networkAllocatorArenas()); - if (source != null) { - new NettyMemoryMetrics( - _allocator, - "shared-pool", - conf.networkAllocatorVerboseMetric(), - source, - Collections.emptyMap()); - } + /** + * Returns the lazily created shared pooled ByteBuf allocator for the specified allowCache + * parameter value. + */ + public static synchronized PooledByteBufAllocator getSharedPooledByteBufAllocator( + CelebornConf conf, AbstractSource source, boolean allowCache) { + final int index = allowCache ? 0 : 1; + if (_sharedPooledByteBufAllocator[index] == null) { + _sharedPooledByteBufAllocator[index] = + createPooledByteBufAllocator(true, allowCache, conf.networkAllocatorArenas()); + if (source != null) { + new NettyMemoryMetrics( + _sharedPooledByteBufAllocator[index], + "shared-pool", + conf.networkAllocatorVerboseMetric(), + source, + Collections.emptyMap()); } - return _allocator; } + return _sharedPooledByteBufAllocator[index]; } public static PooledByteBufAllocator getPooledByteBufAllocator( TransportConf conf, AbstractSource source, boolean allowCache) { if (conf.getCelebornConf().networkShareMemoryAllocator()) { - return getSharedPooledByteBufAllocator(conf.getCelebornConf(), source); + return getSharedPooledByteBufAllocator( + conf.getCelebornConf(), + source, + allowCache && conf.getCelebornConf().networkMemoryAllocatorAllowCache()); } PooledByteBufAllocator allocator = createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache, conf.clientThreads()); diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 381c9da27d9..5128bb69b8f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -460,6 +460,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def networkShareMemoryAllocator: Boolean = get(NETWORK_MEMORY_ALLOCATOR_SHARE) + def networkMemoryAllocatorAllowCache: Boolean = + get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE) + def networkAllocatorArenas: Int = get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max( Runtime.getRuntime.availableProcessors(), 2)) @@ -1232,6 +1235,15 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("10s") + val NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE: ConfigEntry[Boolean] = + buildConf("celeborn.network.memory.allocator.allowCache") + .categories("network") + .internal + .version("0.3.1") + .doc("When false, globally disable thread-local cache in the shared PooledByteBufAllocator.") + .booleanConf + .createWithDefault(true) + val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] = buildConf("celeborn.network.memory.allocator.share") .categories("network") diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java index 050b28166cc..1d633d1055b 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java @@ -42,10 +42,12 @@ public class ChannelsLimiter extends ChannelDuplexHandler private final AtomicBoolean isPaused = new AtomicBoolean(false); private final AtomicInteger needTrimChannels = new AtomicInteger(0); private final long waitTrimInterval; + private final boolean allowCache; public ChannelsLimiter(String moduleName, CelebornConf conf) { this.moduleName = moduleName; this.waitTrimInterval = conf.workerDirectMemoryTrimChannelWaitInterval(); + this.allowCache = conf.networkMemoryAllocatorAllowCache(); MemoryManager memoryManager = MemoryManager.instance(); memoryManager.registerMemoryListener(this); } @@ -147,7 +149,9 @@ public void onResume(String moduleName) { @Override public void onTrim() { - trimCache(); + if (allowCache) { + trimCache(); + } } static class TrimCache {}