From 99b27a42830f692d28e04ae4b10668e41c025d7e Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Tue, 3 Dec 2024 13:13:00 -0800 Subject: [PATCH 01/15] Add thread to perform pending cache maintenance every minute Signed-off-by: owenhalpert --- CHANGELOG.md | 1 + .../memory/NativeMemoryCacheManager.java | 19 ++++++++ .../knn/index/util/ScheduledExecutor.java | 46 +++++++++++++++++++ .../QuantizationStateCache.java | 26 ++++++++++- .../QuantizationStateCacheManager.java | 4 ++ .../opensearch/knn/KNNSingleNodeTestCase.java | 2 + .../java/org/opensearch/knn/KNNTestCase.java | 5 +- .../knn/index/CacheMaintainerTests.java | 35 ++++++++++++++ .../memory/NativeMemoryCacheManagerTests.java | 2 + .../QuantizationStateCacheTests.java | 4 +- 10 files changed, 141 insertions(+), 3 deletions(-) create mode 100644 src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java create mode 100644 src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a5fda2a2..465fd8053 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Documentation ### Maintenance * Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236) +* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2239](https://github.com/opensearch-project/k-NN/issues/2239) * Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278] * Added Lucene BWC tests (#2313)[https://github.com/opensearch-project/k-NN/pull/2313] * Upgrade jsonpath from 2.8.0 to 2.9.0[2325](https://github.com/opensearch-project/k-NN/pull/2325) diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index b8aecc5a5..28d25fbf6 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -23,6 +23,7 @@ import org.opensearch.knn.common.exception.OutOfNativeMemoryException; import org.opensearch.knn.common.featureflags.KNNFeatureFlags; import org.opensearch.knn.index.KNNSettings; +import org.opensearch.knn.index.util.ScheduledExecutor; import org.opensearch.knn.plugin.stats.StatNames; import java.io.Closeable; @@ -51,6 +52,7 @@ public class NativeMemoryCacheManager implements Closeable { private Cache cache; private Deque accessRecencyQueue; private final ExecutorService executor; + private ScheduledExecutor cacheMaintainer; private AtomicBoolean cacheCapacityReached; private long maxWeight; @@ -87,6 +89,10 @@ private void initialize() { } private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { + if (cacheMaintainer != null) { + cacheMaintainer.close(); + } + CacheBuilder cacheBuilder = CacheBuilder.newBuilder() .recordStats() .concurrencyLevel(1) @@ -99,6 +105,16 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { if (nativeMemoryCacheDTO.isExpirationLimited()) { cacheBuilder.expireAfterAccess(nativeMemoryCacheDTO.getExpiryTimeInMin(), TimeUnit.MINUTES); + Runnable cleanUp = () -> { + try { + cache.cleanUp(); + } catch (Exception e) { + logger.error("Error cleaning up cache", e); + } + }; + long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)) + .getMillis(); + this.cacheMaintainer = new ScheduledExecutor(cleanUp, scheduleMillis); } cacheCapacityReached = new AtomicBoolean(false); @@ -142,6 +158,9 @@ public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCa @Override public void close() { executor.shutdown(); + if (cacheMaintainer != null) { + cacheMaintainer.close(); + } } /** diff --git a/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java b/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java new file mode 100644 index 000000000..85af705db --- /dev/null +++ b/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.index.util; + +import lombok.Getter; + +import java.io.Closeable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Executes a task periodically + */ +public class ScheduledExecutor implements Closeable { + final ScheduledExecutorService executor; + @Getter + private final Runnable task; + + /** + * @param task task to be completed + * @param scheduleMillis time in milliseconds to wait before executing the task again + */ + public ScheduledExecutor(Runnable task, long scheduleMillis) { + this.task = task; + this.executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(task, 0, scheduleMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + executor.shutdown(); + try { + if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + +} diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index f057026b9..fd74e7060 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -15,7 +15,9 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.knn.index.KNNSettings; +import org.opensearch.knn.index.util.ScheduledExecutor; +import java.io.Closeable; import java.io.IOException; import java.time.Instant; import java.util.concurrent.TimeUnit; @@ -27,10 +29,11 @@ * A thread-safe singleton cache that contains quantization states. */ @Log4j2 -public class QuantizationStateCache { +public class QuantizationStateCache implements Closeable { private static volatile QuantizationStateCache instance; private Cache cache; + private ScheduledExecutor cacheMaintainer; @Getter private long maxCacheSizeInKB; @Getter @@ -58,6 +61,10 @@ static QuantizationStateCache getInstance() { } private void buildCache() { + if (cacheMaintainer != null) { + cacheMaintainer.close(); + } + this.cache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumWeight(maxCacheSizeInKB).weigher((k, v) -> { try { return ((QuantizationState) v).toByteArray().length; @@ -71,6 +78,16 @@ private void buildCache() { ) .removalListener(this::onRemoval) .build(); + + Runnable cleanUp = () -> { + try { + cache.cleanUp(); + } catch (Exception e) { + log.error("Error cleaning up cache", e); + } + }; + long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMillis(); + this.cacheMaintainer = new ScheduledExecutor(cleanUp, scheduleMillis); } synchronized void rebuildCache() { @@ -129,4 +146,11 @@ private void updateEvictedDueToSizeAt() { public void clear() { cache.invalidateAll(); } + + @Override + public void close() throws IOException { + if (cacheMaintainer != null) { + cacheMaintainer.close(); + } + } } diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java index 932d5cde0..193abed80 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java @@ -79,4 +79,8 @@ public void setMaxCacheSizeInKB(long maxCacheSizeInKB) { public void clear() { QuantizationStateCache.getInstance().clear(); } + + public void close() throws IOException { + QuantizationStateCache.getInstance().close(); + } } diff --git a/src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java b/src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java index 41cd4e8a5..2ec9ce6b5 100644 --- a/src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java +++ b/src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java @@ -35,6 +35,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.index.IndexService; +import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager; import org.opensearch.plugins.Plugin; import org.opensearch.core.rest.RestStatus; import org.opensearch.test.OpenSearchSingleNodeTestCase; @@ -86,6 +87,7 @@ protected boolean resetNodeAfterTest() { public void tearDown() throws Exception { NativeMemoryCacheManager.getInstance().invalidateAll(); NativeMemoryCacheManager.getInstance().close(); + QuantizationStateCacheManager.getInstance().close(); NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close(); NativeMemoryLoadStrategy.TrainingLoadStrategy.getInstance().close(); NativeMemoryLoadStrategy.AnonymousLoadStrategy.getInstance().close(); diff --git a/src/test/java/org/opensearch/knn/KNNTestCase.java b/src/test/java/org/opensearch/knn/KNNTestCase.java index 21b3298be..376692f26 100644 --- a/src/test/java/org/opensearch/knn/KNNTestCase.java +++ b/src/test/java/org/opensearch/knn/KNNTestCase.java @@ -24,8 +24,10 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -73,7 +75,7 @@ protected boolean enableWarningsCheck() { return false; } - public void resetState() { + public void resetState() throws IOException { // Reset all of the counters for (KNNCounter knnCounter : KNNCounter.values()) { knnCounter.set(0L); @@ -83,6 +85,7 @@ public void resetState() { // Clean up the cache NativeMemoryCacheManager.getInstance().invalidateAll(); NativeMemoryCacheManager.getInstance().close(); + QuantizationStateCacheManager.getInstance().close(); } private void initKNNSettings() { diff --git a/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java b/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java new file mode 100644 index 000000000..fa230c68a --- /dev/null +++ b/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.index; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.junit.Test; +import org.opensearch.knn.index.util.ScheduledExecutor; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class CacheMaintainerTests { + @Test + public void testCacheEviction() throws InterruptedException { + Cache testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(); + + ScheduledExecutor executor = new ScheduledExecutor(testCache::cleanUp, 60 * 1000); + + testCache.put("key1", "value1"); + assertEquals(testCache.size(), 1); + + Thread.sleep(1500); + + executor.getTask().run(); + + assertEquals(testCache.size(), 0); + + executor.close(); + } +} diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java index 5fe41c88c..2433f0265 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java @@ -41,6 +41,7 @@ public void tearDown() throws Exception { Settings circuitBreakerSettings = Settings.builder().putNull(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED).build(); clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings); client().admin().cluster().updateSettings(clusterUpdateSettingsRequest).get(); + NativeMemoryCacheManager.getInstance().close(); super.tearDown(); } @@ -378,6 +379,7 @@ public void testCacheCapacity() { nativeMemoryCacheManager.setCacheCapacityReached(false); assertFalse(nativeMemoryCacheManager.isCacheCapacityReached()); + nativeMemoryCacheManager.close(); } public void testGetIndicesCacheStats() throws IOException, ExecutionException { diff --git a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java index e5381aec7..88fe21d90 100644 --- a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java +++ b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java @@ -16,6 +16,7 @@ import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -417,7 +418,7 @@ public void testRebuildOnTimeExpirySettingsChange() { assertNull("State should be null", retrievedState); } - public void testCacheEvictionDueToSize() { + public void testCacheEvictionDueToSize() throws IOException { String fieldName = "evictionField"; // States have size of slightly over 500 bytes so that adding two will reach the max size of 1 kb for the cache int arrayLength = 112; @@ -445,6 +446,7 @@ public void testCacheEvictionDueToSize() { cache.addQuantizationState(fieldName, state); cache.addQuantizationState(fieldName, state2); cache.clear(); + cache.close(); assertNotNull(cache.getEvictedDueToSizeAt()); } } From 651b63e1fcd625bd4f530977842f8426a1a742c9 Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Fri, 20 Dec 2024 14:13:19 -0800 Subject: [PATCH 02/15] Modify QSC maintainer to use QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES Signed-off-by: owenhalpert Signed-off-by: owenhalpert --- .../models/quantizationState/QuantizationStateCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index fd74e7060..045c5192c 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -86,7 +86,7 @@ private void buildCache() { log.error("Error cleaning up cache", e); } }; - long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMillis(); + long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES)).getMillis(); this.cacheMaintainer = new ScheduledExecutor(cleanUp, scheduleMillis); } From 1e8762f1f1b555e745014c59c80592fce01ad91d Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Fri, 20 Dec 2024 14:13:19 -0800 Subject: [PATCH 03/15] Modify QSC maintainer to use QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES Signed-off-by: owenhalpert --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 465fd8053..346c8781c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,7 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Documentation ### Maintenance * Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236) -* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2239](https://github.com/opensearch-project/k-NN/issues/2239) +* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2308](https://github.com/opensearch-project/k-NN/pull/2308) * Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278] * Added Lucene BWC tests (#2313)[https://github.com/opensearch-project/k-NN/pull/2313] * Upgrade jsonpath from 2.8.0 to 2.9.0[2325](https://github.com/opensearch-project/k-NN/pull/2325) From 93b03452e04fe83889b72f0ac572255960f38f8d Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Mon, 23 Dec 2024 11:47:05 -0800 Subject: [PATCH 04/15] Implement Closeable in QSCManager, document exception swallowing Signed-off-by: owenhalpert --- .../opensearch/knn/index/memory/NativeMemoryCacheManager.java | 1 + .../models/quantizationState/QuantizationStateCache.java | 1 + .../quantizationState/QuantizationStateCacheManager.java | 4 +++- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index 28d25fbf6..e61a6ee77 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -109,6 +109,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { try { cache.cleanUp(); } catch (Exception e) { + // Exceptions from Guava shouldn't halt the executor logger.error("Error cleaning up cache", e); } }; diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index 045c5192c..74fe2e0d2 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -83,6 +83,7 @@ private void buildCache() { try { cache.cleanUp(); } catch (Exception e) { + // Exceptions from Guava shouldn't halt the executor log.error("Error cleaning up cache", e); } }; diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java index 193abed80..63282029a 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java @@ -9,10 +9,11 @@ import lombok.NoArgsConstructor; import org.opensearch.knn.index.codec.KNN990Codec.KNN990QuantizationStateReader; +import java.io.Closeable; import java.io.IOException; @NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class QuantizationStateCacheManager { +public final class QuantizationStateCacheManager implements Closeable { private static volatile QuantizationStateCacheManager instance; @@ -80,6 +81,7 @@ public void clear() { QuantizationStateCache.getInstance().clear(); } + @Override public void close() throws IOException { QuantizationStateCache.getInstance().close(); } From 00cd299765c9779dbf2669be26a47e44ae6b592b Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Mon, 23 Dec 2024 11:47:05 -0800 Subject: [PATCH 05/15] Make ScheduledExecutor accept external ExecutorService to prevent unbounded thread creation Signed-off-by: owenhalpert --- .../knn/index/memory/NativeMemoryCacheManager.java | 2 +- .../opensearch/knn/index/util/ScheduledExecutor.java | 5 ++--- .../quantizationState/QuantizationStateCache.java | 3 ++- .../opensearch/knn/index/CacheMaintainerTests.java | 11 ++++++++--- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index e61a6ee77..d487fb6c5 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -115,7 +115,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { }; long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)) .getMillis(); - this.cacheMaintainer = new ScheduledExecutor(cleanUp, scheduleMillis); + this.cacheMaintainer = new ScheduledExecutor(Executors.newSingleThreadScheduledExecutor(), cleanUp, scheduleMillis); } cacheCapacityReached = new AtomicBoolean(false); diff --git a/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java b/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java index 85af705db..0fcd1214b 100644 --- a/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java +++ b/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java @@ -8,7 +8,6 @@ import lombok.Getter; import java.io.Closeable; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -24,9 +23,9 @@ public class ScheduledExecutor implements Closeable { * @param task task to be completed * @param scheduleMillis time in milliseconds to wait before executing the task again */ - public ScheduledExecutor(Runnable task, long scheduleMillis) { + public ScheduledExecutor(ScheduledExecutorService executor, Runnable task, long scheduleMillis) { this.task = task; - this.executor = Executors.newSingleThreadScheduledExecutor(); + this.executor = executor; executor.scheduleAtFixedRate(task, 0, scheduleMillis, TimeUnit.MILLISECONDS); } diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index 74fe2e0d2..4760421d6 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; import java.time.Instant; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.opensearch.knn.index.KNNSettings.QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES; @@ -88,7 +89,7 @@ private void buildCache() { } }; long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES)).getMillis(); - this.cacheMaintainer = new ScheduledExecutor(cleanUp, scheduleMillis); + this.cacheMaintainer = new ScheduledExecutor(Executors.newSingleThreadScheduledExecutor(), cleanUp, scheduleMillis); } synchronized void rebuildCache() { diff --git a/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java b/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java index fa230c68a..9007ca01d 100644 --- a/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java +++ b/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java @@ -10,6 +10,7 @@ import org.junit.Test; import org.opensearch.knn.index.util.ScheduledExecutor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -19,17 +20,21 @@ public class CacheMaintainerTests { public void testCacheEviction() throws InterruptedException { Cache testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(); - ScheduledExecutor executor = new ScheduledExecutor(testCache::cleanUp, 60 * 1000); + ScheduledExecutor cacheMaintainer = new ScheduledExecutor( + Executors.newSingleThreadScheduledExecutor(), + testCache::cleanUp, + 60 * 1000 + ); testCache.put("key1", "value1"); assertEquals(testCache.size(), 1); Thread.sleep(1500); - executor.getTask().run(); + cacheMaintainer.getTask().run(); assertEquals(testCache.size(), 0); - executor.close(); + cacheMaintainer.close(); } } From 0fba1c7eb922a86b4975234f0876b5d67fc311e4 Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Mon, 6 Jan 2025 15:35:14 -0800 Subject: [PATCH 06/15] Use OpenSearch threadpool to perform cache maintenance Signed-off-by: owenhalpert --- .../memory/NativeMemoryCacheManager.java | 54 ++++++++++--------- .../knn/index/util/ScheduledExecutor.java | 45 ---------------- .../org/opensearch/knn/plugin/KNNPlugin.java | 4 ++ .../QuantizationStateCache.java | 36 ++++++++----- .../knn/index/CacheMaintainerTests.java | 40 -------------- 5 files changed, 57 insertions(+), 122 deletions(-) delete mode 100644 src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java delete mode 100644 src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index d487fb6c5..31bbd0947 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -16,6 +16,7 @@ import com.google.common.cache.CacheStats; import com.google.common.cache.RemovalCause; import com.google.common.cache.RemovalNotification; +import lombok.Setter; import org.apache.commons.lang.Validate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -23,8 +24,9 @@ import org.opensearch.knn.common.exception.OutOfNativeMemoryException; import org.opensearch.knn.common.featureflags.KNNFeatureFlags; import org.opensearch.knn.index.KNNSettings; -import org.opensearch.knn.index.util.ScheduledExecutor; import org.opensearch.knn.plugin.stats.StatNames; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.Scheduler.Cancellable; import java.io.Closeable; import java.util.Deque; @@ -32,11 +34,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -52,9 +50,11 @@ public class NativeMemoryCacheManager implements Closeable { private Cache cache; private Deque accessRecencyQueue; private final ExecutorService executor; - private ScheduledExecutor cacheMaintainer; private AtomicBoolean cacheCapacityReached; private long maxWeight; + @Setter + private static ThreadPool threadPool; + private Cancellable maintenanceTask; NativeMemoryCacheManager() { this.executor = Executors.newSingleThreadExecutor(); @@ -89,10 +89,6 @@ private void initialize() { } private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { - if (cacheMaintainer != null) { - cacheMaintainer.close(); - } - CacheBuilder cacheBuilder = CacheBuilder.newBuilder() .recordStats() .concurrencyLevel(1) @@ -105,22 +101,12 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { if (nativeMemoryCacheDTO.isExpirationLimited()) { cacheBuilder.expireAfterAccess(nativeMemoryCacheDTO.getExpiryTimeInMin(), TimeUnit.MINUTES); - Runnable cleanUp = () -> { - try { - cache.cleanUp(); - } catch (Exception e) { - // Exceptions from Guava shouldn't halt the executor - logger.error("Error cleaning up cache", e); - } - }; - long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)) - .getMillis(); - this.cacheMaintainer = new ScheduledExecutor(Executors.newSingleThreadScheduledExecutor(), cleanUp, scheduleMillis); } cacheCapacityReached = new AtomicBoolean(false); accessRecencyQueue = new ConcurrentLinkedDeque<>(); cache = cacheBuilder.build(); + startMaintenance(cache); } /** @@ -159,8 +145,8 @@ public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCa @Override public void close() { executor.shutdown(); - if (cacheMaintainer != null) { - cacheMaintainer.close(); + if (maintenanceTask != null) { + maintenanceTask.cancel(); } } @@ -469,4 +455,24 @@ private Float getSizeAsPercentage(long size) { } return 100 * size / (float) cbLimit; } + + public void startMaintenance(Cache cacheInstance) { + if (maintenanceTask != null) { + maintenanceTask.cancel(); + } + + Runnable cleanUp = () -> { + try { + cacheInstance.cleanUp(); + } catch (Exception e) { + logger.error("Error cleaning up cache", e); + } + }; + + TimeValue interval = KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES); + + if (threadPool != null) { + maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT); + } + } } diff --git a/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java b/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java deleted file mode 100644 index 0fcd1214b..000000000 --- a/src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.knn.index.util; - -import lombok.Getter; - -import java.io.Closeable; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Executes a task periodically - */ -public class ScheduledExecutor implements Closeable { - final ScheduledExecutorService executor; - @Getter - private final Runnable task; - - /** - * @param task task to be completed - * @param scheduleMillis time in milliseconds to wait before executing the task again - */ - public ScheduledExecutor(ScheduledExecutorService executor, Runnable task, long scheduleMillis) { - this.task = task; - this.executor = executor; - executor.scheduleAtFixedRate(task, 0, scheduleMillis, TimeUnit.MILLISECONDS); - } - - @Override - public void close() { - executor.shutdown(); - try { - if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - executor.shutdownNow(); - Thread.currentThread().interrupt(); - } - } - -} diff --git a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java index d27f502e1..7fb880f19 100644 --- a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java +++ b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java @@ -13,6 +13,7 @@ import org.opensearch.index.engine.EngineFactory; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.knn.index.KNNCircuitBreaker; +import org.opensearch.knn.index.memory.NativeMemoryCacheManager; import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider; import org.opensearch.knn.index.util.KNNClusterUtil; import org.opensearch.knn.index.query.KNNQueryBuilder; @@ -79,6 +80,7 @@ import org.opensearch.knn.plugin.transport.UpdateModelMetadataTransportAction; import org.opensearch.knn.plugin.transport.UpdateModelGraveyardAction; import org.opensearch.knn.plugin.transport.UpdateModelGraveyardTransportAction; +import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCache; import org.opensearch.knn.training.TrainingJobClusterStateListener; import org.opensearch.knn.training.TrainingJobRunner; import org.opensearch.knn.training.VectorReader; @@ -201,6 +203,8 @@ public Collection createComponents( ModelCache.initialize(ModelDao.OpenSearchKNNModelDao.getInstance(), clusterService); TrainingJobRunner.initialize(threadPool, ModelDao.OpenSearchKNNModelDao.getInstance()); TrainingJobClusterStateListener.initialize(threadPool, ModelDao.OpenSearchKNNModelDao.getInstance(), clusterService); + QuantizationStateCache.setThreadPool(threadPool); + NativeMemoryCacheManager.setThreadPool(threadPool); KNNCircuitBreaker.getInstance().initialize(threadPool, clusterService, client); KNNQueryBuilder.initialize(ModelDao.OpenSearchKNNModelDao.getInstance()); KNNWeight.initialize(ModelDao.OpenSearchKNNModelDao.getInstance()); diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index 4760421d6..0ea2b9a80 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -11,16 +11,17 @@ import com.google.common.cache.RemovalCause; import com.google.common.cache.RemovalNotification; import lombok.Getter; +import lombok.Setter; import lombok.extern.log4j.Log4j2; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.knn.index.KNNSettings; -import org.opensearch.knn.index.util.ScheduledExecutor; +import org.opensearch.threadpool.Scheduler.Cancellable; +import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; import java.time.Instant; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.opensearch.knn.index.KNNSettings.QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES; @@ -34,11 +35,13 @@ public class QuantizationStateCache implements Closeable { private static volatile QuantizationStateCache instance; private Cache cache; - private ScheduledExecutor cacheMaintainer; @Getter private long maxCacheSizeInKB; @Getter private Instant evictedDueToSizeAt; + private Cancellable maintenanceTask; + @Setter + private static ThreadPool threadPool; @VisibleForTesting QuantizationStateCache() { @@ -62,10 +65,6 @@ static QuantizationStateCache getInstance() { } private void buildCache() { - if (cacheMaintainer != null) { - cacheMaintainer.close(); - } - this.cache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumWeight(maxCacheSizeInKB).weigher((k, v) -> { try { return ((QuantizationState) v).toByteArray().length; @@ -79,17 +78,27 @@ private void buildCache() { ) .removalListener(this::onRemoval) .build(); + startMaintenance(cache); + } + + public void startMaintenance(Cache cacheInstance) { + if (maintenanceTask != null) { + maintenanceTask.cancel(); + } Runnable cleanUp = () -> { try { - cache.cleanUp(); + cacheInstance.cleanUp(); } catch (Exception e) { - // Exceptions from Guava shouldn't halt the executor log.error("Error cleaning up cache", e); } }; - long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES)).getMillis(); - this.cacheMaintainer = new ScheduledExecutor(Executors.newSingleThreadScheduledExecutor(), cleanUp, scheduleMillis); + + TimeValue interval = KNNSettings.state().getSettingValue(QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES); + + if (threadPool != null) { + maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT); + } } synchronized void rebuildCache() { @@ -151,8 +160,9 @@ public void clear() { @Override public void close() throws IOException { - if (cacheMaintainer != null) { - cacheMaintainer.close(); + if (maintenanceTask != null) { + maintenanceTask.cancel(); } } + } diff --git a/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java b/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java deleted file mode 100644 index 9007ca01d..000000000 --- a/src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.knn.index; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.junit.Test; -import org.opensearch.knn.index.util.ScheduledExecutor; - -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; - -public class CacheMaintainerTests { - @Test - public void testCacheEviction() throws InterruptedException { - Cache testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(); - - ScheduledExecutor cacheMaintainer = new ScheduledExecutor( - Executors.newSingleThreadScheduledExecutor(), - testCache::cleanUp, - 60 * 1000 - ); - - testCache.put("key1", "value1"); - assertEquals(testCache.size(), 1); - - Thread.sleep(1500); - - cacheMaintainer.getTask().run(); - - assertEquals(testCache.size(), 0); - - cacheMaintainer.close(); - } -} From de7e191880985b83cae0f74821532de60e7b4fb1 Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Tue, 7 Jan 2025 16:23:22 -0800 Subject: [PATCH 07/15] Fix wildcard imports and access modifiers Signed-off-by: owenhalpert --- .../knn/index/memory/NativeMemoryCacheManager.java | 8 ++++++-- .../models/quantizationState/QuantizationStateCache.java | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index 31bbd0947..fd2cf0ec8 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -34,7 +34,11 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -456,7 +460,7 @@ private Float getSizeAsPercentage(long size) { return 100 * size / (float) cbLimit; } - public void startMaintenance(Cache cacheInstance) { + private void startMaintenance(Cache cacheInstance) { if (maintenanceTask != null) { maintenanceTask.cancel(); } diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index 0ea2b9a80..b0fa6f912 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -81,7 +81,7 @@ private void buildCache() { startMaintenance(cache); } - public void startMaintenance(Cache cacheInstance) { + private void startMaintenance(Cache cacheInstance) { if (maintenanceTask != null) { maintenanceTask.cancel(); } From 34188914a61227ca1dd89bc6ae1391217f5377f2 Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Thu, 9 Jan 2025 14:12:08 -0800 Subject: [PATCH 08/15] Provide threadpool for cache unit tests to draw from Signed-off-by: owenhalpert --- .../index/memory/NativeMemoryCacheManager.java | 12 ++++++------ .../QuantizationStateCache.java | 12 ++++++------ .../memory/NativeMemoryCacheManagerTests.java | 18 ++++++++++++++++++ .../QuantizationStateCacheTests.java | 18 ++++++++++++++++++ 4 files changed, 48 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index fd2cf0ec8..a597d0719 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -50,14 +50,14 @@ public class NativeMemoryCacheManager implements Closeable { private static final Logger logger = LogManager.getLogger(NativeMemoryCacheManager.class); private static NativeMemoryCacheManager INSTANCE; + @Setter + private static ThreadPool threadPool; private Cache cache; private Deque accessRecencyQueue; private final ExecutorService executor; private AtomicBoolean cacheCapacityReached; private long maxWeight; - @Setter - private static ThreadPool threadPool; private Cancellable maintenanceTask; NativeMemoryCacheManager() { @@ -110,7 +110,9 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { cacheCapacityReached = new AtomicBoolean(false); accessRecencyQueue = new ConcurrentLinkedDeque<>(); cache = cacheBuilder.build(); - startMaintenance(cache); + if (threadPool != null) { + startMaintenance(cache); + } } /** @@ -475,8 +477,6 @@ private void startMaintenance(Cache cacheInstanc TimeValue interval = KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES); - if (threadPool != null) { - maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT); - } + maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT); } } diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index b0fa6f912..09df65992 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -34,14 +34,14 @@ public class QuantizationStateCache implements Closeable { private static volatile QuantizationStateCache instance; + @Setter + private static ThreadPool threadPool; private Cache cache; @Getter private long maxCacheSizeInKB; @Getter private Instant evictedDueToSizeAt; private Cancellable maintenanceTask; - @Setter - private static ThreadPool threadPool; @VisibleForTesting QuantizationStateCache() { @@ -78,7 +78,9 @@ private void buildCache() { ) .removalListener(this::onRemoval) .build(); - startMaintenance(cache); + if (threadPool != null) { + startMaintenance(cache); + } } private void startMaintenance(Cache cacheInstance) { @@ -96,9 +98,7 @@ private void startMaintenance(Cache cacheInstance) { TimeValue interval = KNNSettings.state().getSettingValue(QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES); - if (threadPool != null) { - maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT); - } + maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT); } synchronized void rebuildCache() { diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java index 2433f0265..49d0dd4a7 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java @@ -12,6 +12,8 @@ package org.opensearch.knn.index.memory; import com.google.common.cache.CacheStats; +import org.junit.After; +import org.junit.Before; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.common.settings.Settings; import org.opensearch.knn.common.exception.OutOfNativeMemoryException; @@ -20,6 +22,7 @@ import org.opensearch.knn.plugin.KNNPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; @@ -34,6 +37,21 @@ public class NativeMemoryCacheManagerTests extends OpenSearchSingleNodeTestCase { + private ThreadPool threadPool; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new ThreadPool(Settings.builder().put("node.name", "NativeMemoryCacheManagerTests").build()); + NativeMemoryCacheManager.setThreadPool(threadPool); + } + + @After + public void shutdown() throws Exception { + super.tearDown(); + terminate(threadPool); + } + @Override public void tearDown() throws Exception { // Clear out persistent metadata diff --git a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java index 88fe21d90..b8a429b92 100644 --- a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java +++ b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java @@ -7,6 +7,8 @@ import com.google.common.collect.ImmutableSet; import lombok.SneakyThrows; +import org.junit.After; +import org.junit.Before; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; @@ -15,6 +17,7 @@ import org.opensearch.knn.KNNTestCase; import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.concurrent.CountDownLatch; @@ -29,6 +32,21 @@ public class QuantizationStateCacheTests extends KNNTestCase { + private ThreadPool threadPool; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new ThreadPool(Settings.builder().put("node.name", "QuantizationStateCacheTests").build()); + QuantizationStateCache.setThreadPool(threadPool); + } + + @After + public void shutdown() throws Exception { + super.tearDown(); + terminate(threadPool); + } + @SneakyThrows public void testSingleThreadedAddAndRetrieve() { String fieldName = "singleThreadField"; From 3a99cff33e1d9f8d684103f0f1f6991e57f2dffb Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Thu, 9 Jan 2025 20:25:21 -0800 Subject: [PATCH 09/15] Log warning when threadpool for cache maintenance is null Signed-off-by: owenhalpert --- .../opensearch/knn/index/memory/NativeMemoryCacheManager.java | 3 +++ .../models/quantizationState/QuantizationStateCache.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index a597d0719..e700ee27c 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -110,8 +110,11 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { cacheCapacityReached = new AtomicBoolean(false); accessRecencyQueue = new ConcurrentLinkedDeque<>(); cache = cacheBuilder.build(); + if (threadPool != null) { startMaintenance(cache); + } else { + logger.warn("ThreadPool is null during NativeMemoryCacheManager initialization. Scheduled maintenance will not be started."); } } diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index 09df65992..2102c0c7c 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -78,8 +78,11 @@ private void buildCache() { ) .removalListener(this::onRemoval) .build(); + if (threadPool != null) { startMaintenance(cache); + } else { + log.warn("ThreadPool is null during QuantizationStateCache initialization. Scheduled maintenance will not be started."); } } From 66db564daa850b892acddc2ce71a41a3b8eeadc4 Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Thu, 9 Jan 2025 22:07:18 -0800 Subject: [PATCH 10/15] Fix branch for Github Action Signed-off-by: owenhalpert --- .../opensearch/knn/index/memory/NativeMemoryCacheManager.java | 2 +- .../models/quantizationState/QuantizationStateCache.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index e700ee27c..7b1920be0 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -114,7 +114,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { if (threadPool != null) { startMaintenance(cache); } else { - logger.warn("ThreadPool is null during NativeMemoryCacheManager initialization. Scheduled maintenance will not be started."); + logger.warn("ThreadPool is null during NativeMemoryCacheManager initialization. Maintenance will not start."); } } diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index 2102c0c7c..332c55f0a 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -82,7 +82,7 @@ private void buildCache() { if (threadPool != null) { startMaintenance(cache); } else { - log.warn("ThreadPool is null during QuantizationStateCache initialization. Scheduled maintenance will not be started."); + log.warn("ThreadPool is null during QuantizationStateCache initialization. Maintenance will not start."); } } From e08885d96e852e34fdb5c9252d7f9f88c432b4ec Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Fri, 10 Jan 2025 13:45:18 -0800 Subject: [PATCH 11/15] Add unit tests for maintenance scheduling Signed-off-by: owenhalpert --- .../memory/NativeMemoryCacheManager.java | 2 ++ .../QuantizationStateCache.java | 1 + .../memory/NativeMemoryCacheManagerTests.java | 16 +++++++++++++ .../QuantizationStateCacheTests.java | 24 +++++++++++++++++++ 4 files changed, 43 insertions(+) diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index 7b1920be0..44f06182e 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -16,6 +16,7 @@ import com.google.common.cache.CacheStats; import com.google.common.cache.RemovalCause; import com.google.common.cache.RemovalNotification; +import lombok.Getter; import lombok.Setter; import org.apache.commons.lang.Validate; import org.apache.logging.log4j.LogManager; @@ -58,6 +59,7 @@ public class NativeMemoryCacheManager implements Closeable { private final ExecutorService executor; private AtomicBoolean cacheCapacityReached; private long maxWeight; + @Getter private Cancellable maintenanceTask; NativeMemoryCacheManager() { diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index 332c55f0a..6ef247403 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -41,6 +41,7 @@ public class QuantizationStateCache implements Closeable { private long maxCacheSizeInKB; @Getter private Instant evictedDueToSizeAt; + @Getter private Cancellable maintenanceTask; @VisibleForTesting diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java index 49d0dd4a7..8a46a781e 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java @@ -22,6 +22,7 @@ import org.opensearch.knn.plugin.KNNPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.threadpool.Scheduler.Cancellable; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -70,6 +71,8 @@ protected Collection> getPlugins() { public void testRebuildCache() throws ExecutionException, InterruptedException { NativeMemoryCacheManager nativeMemoryCacheManager = new NativeMemoryCacheManager(); + Cancellable task1 = nativeMemoryCacheManager.getMaintenanceTask(); + assertNotNull(task1); // Put entry in cache and check that the weight matches int size = 10; @@ -84,6 +87,9 @@ public void testRebuildCache() throws ExecutionException, InterruptedException { // Sleep for a second or two so that the executor can invalidate all entries Thread.sleep(2000); + assertTrue(task1.isCancelled()); + assertNotNull(nativeMemoryCacheManager.getMaintenanceTask()); + assertEquals(0, nativeMemoryCacheManager.getCacheSizeInKilobytes()); nativeMemoryCacheManager.close(); } @@ -484,6 +490,16 @@ public void testGetIndicesCacheStats() throws IOException, ExecutionException { nativeMemoryCacheManager.close(); } + public void testMaintenanceScheduled() { + NativeMemoryCacheManager nativeMemoryCacheManager = new NativeMemoryCacheManager(); + Cancellable maintenanceTask = nativeMemoryCacheManager.getMaintenanceTask(); + + assertNotNull(maintenanceTask); + + nativeMemoryCacheManager.close(); + assertTrue(maintenanceTask.isCancelled()); + } + private static class TestNativeMemoryAllocation implements NativeMemoryAllocation { int size; diff --git a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java index b8a429b92..e509cfbb7 100644 --- a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java +++ b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java @@ -17,6 +17,7 @@ import org.opensearch.knn.KNNTestCase; import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -467,4 +468,27 @@ public void testCacheEvictionDueToSize() throws IOException { cache.close(); assertNotNull(cache.getEvictedDueToSizeAt()); } + + public void testMaintenanceScheduled() throws Exception { + QuantizationStateCache quantizationStateCache= new QuantizationStateCache(); + Scheduler.Cancellable maintenanceTask = quantizationStateCache.getMaintenanceTask(); + + assertNotNull(maintenanceTask); + + quantizationStateCache.close(); + assertTrue(maintenanceTask.isCancelled()); + } + + public void testMaintenanceWithRebuild() throws Exception { + QuantizationStateCache quantizationStateCache= new QuantizationStateCache(); + Scheduler.Cancellable task1 = quantizationStateCache.getMaintenanceTask(); + assertNotNull(task1); + + quantizationStateCache.rebuildCache(); + + Scheduler.Cancellable task2 = quantizationStateCache.getMaintenanceTask(); + assertTrue(task1.isCancelled()); + assertNotNull(task2); + quantizationStateCache.close(); + } } From 239af2aa6515b24b88fcc5142f4eb88d6cd635d4 Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Fri, 10 Jan 2025 14:10:36 -0800 Subject: [PATCH 12/15] spotlessApply Signed-off-by: owenhalpert --- jni/external/faiss | 2 +- jni/external/nmslib | 2 +- .../models/quantizationState/QuantizationStateCacheTests.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/jni/external/faiss b/jni/external/faiss index 1f42e815d..441df6789 160000 --- a/jni/external/faiss +++ b/jni/external/faiss @@ -1 +1 @@ -Subproject commit 1f42e815db7754297e3b4467763352b829b6cde0 +Subproject commit 441df678905a3093ce10cbca71dcbf52fa550109 diff --git a/jni/external/nmslib b/jni/external/nmslib index a2d6624e1..73c0658c0 160000 --- a/jni/external/nmslib +++ b/jni/external/nmslib @@ -1 +1 @@ -Subproject commit a2d6624e1315402662025debfdd614b505d9c3ef +Subproject commit 73c0658c0c003262e36b3ed4c05da3e4d4d113c5 diff --git a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java index e509cfbb7..1a3c56e9a 100644 --- a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java +++ b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java @@ -470,7 +470,7 @@ public void testCacheEvictionDueToSize() throws IOException { } public void testMaintenanceScheduled() throws Exception { - QuantizationStateCache quantizationStateCache= new QuantizationStateCache(); + QuantizationStateCache quantizationStateCache = new QuantizationStateCache(); Scheduler.Cancellable maintenanceTask = quantizationStateCache.getMaintenanceTask(); assertNotNull(maintenanceTask); @@ -480,7 +480,7 @@ public void testMaintenanceScheduled() throws Exception { } public void testMaintenanceWithRebuild() throws Exception { - QuantizationStateCache quantizationStateCache= new QuantizationStateCache(); + QuantizationStateCache quantizationStateCache = new QuantizationStateCache(); Scheduler.Cancellable task1 = quantizationStateCache.getMaintenanceTask(); assertNotNull(task1); From 4489f6d916d7d028a05cc8ba5f7621b474548c01 Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Fri, 10 Jan 2025 14:21:16 -0800 Subject: [PATCH 13/15] Revert "spotlessApply" This reverts commit 66829ea7e0a3227ac25e53d22b3c0885ca5bd912. Signed-off-by: owenhalpert --- jni/external/faiss | 2 +- jni/external/nmslib | 2 +- .../models/quantizationState/QuantizationStateCacheTests.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/jni/external/faiss b/jni/external/faiss index 441df6789..1f42e815d 160000 --- a/jni/external/faiss +++ b/jni/external/faiss @@ -1 +1 @@ -Subproject commit 441df678905a3093ce10cbca71dcbf52fa550109 +Subproject commit 1f42e815db7754297e3b4467763352b829b6cde0 diff --git a/jni/external/nmslib b/jni/external/nmslib index 73c0658c0..a2d6624e1 160000 --- a/jni/external/nmslib +++ b/jni/external/nmslib @@ -1 +1 @@ -Subproject commit 73c0658c0c003262e36b3ed4c05da3e4d4d113c5 +Subproject commit a2d6624e1315402662025debfdd614b505d9c3ef diff --git a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java index 1a3c56e9a..e509cfbb7 100644 --- a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java +++ b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java @@ -470,7 +470,7 @@ public void testCacheEvictionDueToSize() throws IOException { } public void testMaintenanceScheduled() throws Exception { - QuantizationStateCache quantizationStateCache = new QuantizationStateCache(); + QuantizationStateCache quantizationStateCache= new QuantizationStateCache(); Scheduler.Cancellable maintenanceTask = quantizationStateCache.getMaintenanceTask(); assertNotNull(maintenanceTask); @@ -480,7 +480,7 @@ public void testMaintenanceScheduled() throws Exception { } public void testMaintenanceWithRebuild() throws Exception { - QuantizationStateCache quantizationStateCache = new QuantizationStateCache(); + QuantizationStateCache quantizationStateCache= new QuantizationStateCache(); Scheduler.Cancellable task1 = quantizationStateCache.getMaintenanceTask(); assertNotNull(task1); From 71d97596a4167ff49e0261857635abc29a949c62 Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Fri, 10 Jan 2025 14:22:44 -0800 Subject: [PATCH 14/15] spotlessApply spotlessApply Signed-off-by: owenhalpert --- .../models/quantizationState/QuantizationStateCacheTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java index e509cfbb7..1a3c56e9a 100644 --- a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java +++ b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java @@ -470,7 +470,7 @@ public void testCacheEvictionDueToSize() throws IOException { } public void testMaintenanceScheduled() throws Exception { - QuantizationStateCache quantizationStateCache= new QuantizationStateCache(); + QuantizationStateCache quantizationStateCache = new QuantizationStateCache(); Scheduler.Cancellable maintenanceTask = quantizationStateCache.getMaintenanceTask(); assertNotNull(maintenanceTask); @@ -480,7 +480,7 @@ public void testMaintenanceScheduled() throws Exception { } public void testMaintenanceWithRebuild() throws Exception { - QuantizationStateCache quantizationStateCache= new QuantizationStateCache(); + QuantizationStateCache quantizationStateCache = new QuantizationStateCache(); Scheduler.Cancellable task1 = quantizationStateCache.getMaintenanceTask(); assertNotNull(task1); From b135f2648bd9fdcf021a1de544515854d612910b Mon Sep 17 00:00:00 2001 From: owenhalpert Date: Fri, 10 Jan 2025 14:36:45 -0800 Subject: [PATCH 15/15] Document maintenance method Signed-off-by: owenhalpert --- .../knn/index/memory/NativeMemoryCacheManager.java | 7 +++++++ .../models/quantizationState/QuantizationStateCache.java | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index 44f06182e..76e94ee66 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -467,6 +467,13 @@ private Float getSizeAsPercentage(long size) { return 100 * size / (float) cbLimit; } + /** + * Starts the scheduled maintenance for the cache. Without this thread calling cleanUp(), the Guava cache only + * performs maintenance operations (such as evicting expired entries) when the cache is accessed. This + * ensures that the cache is also cleaned up based on the configured expiry time. + * @see Guava Cache Guide + * @param cacheInstance cache on which to call cleanUp() + */ private void startMaintenance(Cache cacheInstance) { if (maintenanceTask != null) { maintenanceTask.cancel(); diff --git a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java index 6ef247403..d2b99fef0 100644 --- a/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java +++ b/src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java @@ -87,6 +87,13 @@ private void buildCache() { } } + /** + * Starts the scheduled maintenance for the cache. Without this thread calling cleanUp(), the Guava cache only + * performs maintenance operations (such as evicting expired entries) when the cache is accessed. This + * ensures that the cache is also cleaned up based on the configured expiry time. + * @see Guava Cache Guide + * @param cacheInstance cache on which to call cleanUp() + */ private void startMaintenance(Cache cacheInstance) { if (maintenanceTask != null) { maintenanceTask.cancel();