From e63960cc1ea556892f3e33869b92e1ce48ea004c Mon Sep 17 00:00:00 2001 From: MiniSho <42286868+MiniSho@users.noreply.github.com> Date: Tue, 17 Dec 2024 13:29:47 +0800 Subject: [PATCH] cp --- .../plan/planner/LocalExecutionPlanner.java | 9 +++++ .../LoadTsFileAnalyzeSchemaMemoryBlock.java | 12 +++--- .../LoadTsFileDataCacheMemoryBlock.java | 38 +++++-------------- .../load/memory/LoadTsFileMemoryManager.java | 10 ++++- 4 files changed, 33 insertions(+), 36 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 1f02e3bb81f1..b643047dcfef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -262,6 +262,15 @@ public synchronized void reserveFromFreeMemoryForOperators( public synchronized void releaseToFreeMemoryForOperators(final long memoryInBytes) { freeMemoryForOperators += memoryInBytes; + + if (freeMemoryForOperators > ALLOCATE_MEMORY_FOR_OPERATORS) { + LOGGER.error( + "The free memory {} is more than allocated memory {}, last released memory: {}", + freeMemoryForOperators, + ALLOCATE_MEMORY_FOR_OPERATORS, + memoryInBytes); + freeMemoryForOperators = ALLOCATE_MEMORY_FOR_OPERATORS; + } } public long getAllocateMemoryForOperators() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java index c7add4b446fc..0b2555f67d3b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java @@ -45,13 +45,15 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends LoadTsFileAbstractMemory } @Override - public boolean hasEnoughMemory(long memoryTobeAddedInBytes) { + public synchronized boolean hasEnoughMemory(long memoryTobeAddedInBytes) { return memoryUsageInBytes.get() + memoryTobeAddedInBytes <= totalMemorySizeInBytes; } @Override - public void addMemoryUsage(long memoryInBytes) { - memoryUsageInBytes.addAndGet(memoryInBytes); + public synchronized void addMemoryUsage(long memoryInBytes) { + if (memoryUsageInBytes.addAndGet(memoryInBytes) > totalMemorySizeInBytes) { + LOGGER.warn("{} has exceed total memory size", this); + } MetricService.getInstance() .getOrCreateGauge( @@ -63,7 +65,7 @@ public void addMemoryUsage(long memoryInBytes) { } @Override - public void reduceMemoryUsage(long memoryInBytes) { + public synchronized void reduceMemoryUsage(long memoryInBytes) { if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) { LOGGER.warn("{} has reduce memory usage to negative", this); } @@ -78,7 +80,7 @@ public void reduceMemoryUsage(long memoryInBytes) { } @Override - protected void releaseAllMemory() { + protected synchronized void releaseAllMemory() { if (memoryUsageInBytes.get() != 0) { LOGGER.warn( "Try to release memory from a memory block {} which has not released all memory", this); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java index e0709cece9e6..e620984038d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java @@ -31,15 +31,9 @@ public class LoadTsFileDataCacheMemoryBlock extends LoadTsFileAbstractMemoryBloc private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileDataCacheMemoryBlock.class); private static final long MINIMUM_MEMORY_SIZE_IN_BYTES = 1024 * 1024L; // 1 MB - private static final int MAX_ASK_FOR_MEMORY_COUNT = 256; // must be a power of 2 - private static final long EACH_ASK_MEMORY_SIZE_IN_BYTES = - Math.max( - MINIMUM_MEMORY_SIZE_IN_BYTES, - LoadTsFileMemoryManager.MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 4); private final AtomicLong limitedMemorySizeInBytes; private final AtomicLong memoryUsageInBytes; - private final AtomicInteger askForMemoryCount; private final AtomicInteger referenceCount; LoadTsFileDataCacheMemoryBlock(long initialLimitedMemorySizeInBytes) { @@ -54,7 +48,6 @@ public class LoadTsFileDataCacheMemoryBlock extends LoadTsFileAbstractMemoryBloc this.limitedMemorySizeInBytes = new AtomicLong(initialLimitedMemorySizeInBytes); this.memoryUsageInBytes = new AtomicLong(0L); - this.askForMemoryCount = new AtomicInteger(1); this.referenceCount = new AtomicInteger(0); } @@ -64,29 +57,17 @@ public boolean hasEnoughMemory(long memoryTobeAddedInBytes) { } @Override - public void addMemoryUsage(long memoryInBytes) { - memoryUsageInBytes.addAndGet(memoryInBytes); - - askForMemoryCount.getAndUpdate( - count -> { - if ((count & (count - 1)) == 0) { - // count is a power of 2 - long actuallyAllocateMemorySizeInBytes = - MEMORY_MANAGER.tryAllocateFromQuery(EACH_ASK_MEMORY_SIZE_IN_BYTES); - limitedMemorySizeInBytes.addAndGet(actuallyAllocateMemorySizeInBytes); - if (actuallyAllocateMemorySizeInBytes < EACH_ASK_MEMORY_SIZE_IN_BYTES) { - return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1; - } else { - return 1; - } - } - return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1; - }); + public synchronized void addMemoryUsage(long memoryInBytes) { + if (memoryUsageInBytes.addAndGet(memoryInBytes) > limitedMemorySizeInBytes.get()) { + LOGGER.warn("{} has exceed total memory size", this); + } } @Override - public void reduceMemoryUsage(long memoryInBytes) { - memoryUsageInBytes.addAndGet(-memoryInBytes); + public synchronized void reduceMemoryUsage(long memoryInBytes) { + if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) { + LOGGER.warn("{} has reduce memory usage to negative", this); + } } @Override @@ -113,6 +94,7 @@ public boolean doShrink(long shrinkMemoryInBytes) { return false; } + MEMORY_MANAGER.releaseToQuery(shrinkMemoryInBytes); limitedMemorySizeInBytes.addAndGet(-shrinkMemoryInBytes); return true; } @@ -140,8 +122,6 @@ public String toString() { + limitedMemorySizeInBytes.get() + ", memoryUsageInBytes=" + memoryUsageInBytes.get() - + ", askForMemoryCount=" - + askForMemoryCount.get() + '}'; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java index be6e8dcef973..97b8d7f68b0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java @@ -44,7 +44,7 @@ public class LoadTsFileMemoryManager { private final AtomicLong usedMemorySizeInBytes = new AtomicLong(0); private LoadTsFileDataCacheMemoryBlock dataCacheMemoryBlock; - private synchronized void forceAllocatedFromQuery(long sizeInBytes) + private synchronized void forceAllocateFromQuery(long sizeInBytes) throws LoadRuntimeOutOfMemoryException { for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) { // allocate memory from queryEngine @@ -82,6 +82,12 @@ public synchronized long tryAllocateFromQuery(long sizeInBytes) { } public synchronized void releaseToQuery(long sizeInBytes) { + if (usedMemorySizeInBytes.get() < sizeInBytes) { + LOGGER.error( + "Load: Attempting to release more memory ({}) than allocated ({})", + sizeInBytes, + usedMemorySizeInBytes.get()); + } usedMemorySizeInBytes.addAndGet(-sizeInBytes); QUERY_ENGINE_MEMORY_MANAGER.releaseToFreeMemoryForOperators(sizeInBytes); this.notifyAll(); @@ -90,7 +96,7 @@ public synchronized void releaseToQuery(long sizeInBytes) { public synchronized LoadTsFileAnalyzeSchemaMemoryBlock allocateAnalyzeSchemaMemoryBlock( long sizeInBytes) throws LoadRuntimeOutOfMemoryException { try { - forceAllocatedFromQuery(sizeInBytes); + forceAllocateFromQuery(sizeInBytes); } catch (LoadRuntimeOutOfMemoryException e) { if (dataCacheMemoryBlock != null && dataCacheMemoryBlock.doShrink(sizeInBytes)) { return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);