Skip to content

Commit

Permalink
Add thread to perform pending cache maintenance every minute
Browse files Browse the repository at this point in the history
Signed-off-by: owenhalpert <[email protected]>
  • Loading branch information
owenhalpert committed Dec 20, 2024
1 parent dc369e6 commit 85b1782
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Documentation
### Maintenance
* Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236)
* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2239](https://github.com/opensearch-project/k-NN/issues/2239)
* Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278]
* Added Lucene BWC tests (#2313)[https://github.com/opensearch-project/k-NN/pull/2313]
* Upgrade jsonpath from 2.8.0 to 2.9.0[2325](https://github.com/opensearch-project/k-NN/pull/2325)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.util.ScheduledExecutor;
import org.opensearch.knn.plugin.stats.StatNames;

import java.io.Closeable;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class NativeMemoryCacheManager implements Closeable {
private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
private final ExecutorService executor;
private ScheduledExecutor cacheMaintainer;
private AtomicBoolean cacheCapacityReached;
private long maxWeight;

Expand Down Expand Up @@ -87,6 +89,10 @@ private void initialize() {
}

private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}

CacheBuilder<String, NativeMemoryAllocation> cacheBuilder = CacheBuilder.newBuilder()
.recordStats()
.concurrencyLevel(1)
Expand All @@ -99,6 +105,16 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {

if (nativeMemoryCacheDTO.isExpirationLimited()) {
cacheBuilder.expireAfterAccess(nativeMemoryCacheDTO.getExpiryTimeInMin(), TimeUnit.MINUTES);
Runnable cleanUp = () -> {
try {
cache.cleanUp();
} catch (Exception e) {
logger.error("Error cleaning up cache", e);
}
};
long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES))
.getMillis();
this.cacheMaintainer = new ScheduledExecutor(cleanUp, scheduleMillis);
}

cacheCapacityReached = new AtomicBoolean(false);
Expand Down Expand Up @@ -142,6 +158,9 @@ public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCa
@Override
public void close() {
executor.shutdown();
if (cacheMaintainer != null) {
cacheMaintainer.close();
}
}

/**
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/org/opensearch/knn/index/util/ScheduledExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.util;

import lombok.Getter;

import java.io.Closeable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Executes a task periodically
*/
public class ScheduledExecutor implements Closeable {
final ScheduledExecutorService executor;
@Getter
private final Runnable task;

/**
* @param task task to be completed
* @param scheduleMillis time in milliseconds to wait before executing the task again
*/
public ScheduledExecutor(Runnable task, long scheduleMillis) {
this.task = task;
this.executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(task, 0, scheduleMillis, TimeUnit.MILLISECONDS);
}

@Override
public void close() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.util.ScheduledExecutor;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,10 +29,11 @@
* A thread-safe singleton cache that contains quantization states.
*/
@Log4j2
public class QuantizationStateCache {
public class QuantizationStateCache implements Closeable {

private static volatile QuantizationStateCache instance;
private Cache<String, QuantizationState> cache;
private ScheduledExecutor cacheMaintainer;
@Getter
private long maxCacheSizeInKB;
@Getter
Expand Down Expand Up @@ -58,6 +61,10 @@ static QuantizationStateCache getInstance() {
}

private void buildCache() {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}

this.cache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumWeight(maxCacheSizeInKB).weigher((k, v) -> {
try {
return ((QuantizationState) v).toByteArray().length;
Expand All @@ -71,6 +78,16 @@ private void buildCache() {
)
.removalListener(this::onRemoval)
.build();

Runnable cleanUp = () -> {
try {
cache.cleanUp();
} catch (Exception e) {
log.error("Error cleaning up cache", e);
}
};
long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMillis();
this.cacheMaintainer = new ScheduledExecutor(cleanUp, scheduleMillis);
}

synchronized void rebuildCache() {
Expand Down Expand Up @@ -129,4 +146,11 @@ private void updateEvictedDueToSizeAt() {
public void clear() {
cache.invalidateAll();
}

@Override
public void close() throws IOException {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ public void setMaxCacheSizeInKB(long maxCacheSizeInKB) {
public void clear() {
QuantizationStateCache.getInstance().clear();
}

public void close() throws IOException {
QuantizationStateCache.getInstance().close();
}
}
2 changes: 2 additions & 0 deletions src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.index.IndexService;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.plugins.Plugin;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
Expand Down Expand Up @@ -86,6 +87,7 @@ protected boolean resetNodeAfterTest() {
public void tearDown() throws Exception {
NativeMemoryCacheManager.getInstance().invalidateAll();
NativeMemoryCacheManager.getInstance().close();
QuantizationStateCacheManager.getInstance().close();
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close();
NativeMemoryLoadStrategy.TrainingLoadStrategy.getInstance().close();
NativeMemoryLoadStrategy.AnonymousLoadStrategy.getInstance().close();
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/org/opensearch/knn/KNNTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -73,7 +75,7 @@ protected boolean enableWarningsCheck() {
return false;
}

public void resetState() {
public void resetState() throws IOException {
// Reset all of the counters
for (KNNCounter knnCounter : KNNCounter.values()) {
knnCounter.set(0L);
Expand All @@ -83,6 +85,7 @@ public void resetState() {
// Clean up the cache
NativeMemoryCacheManager.getInstance().invalidateAll();
NativeMemoryCacheManager.getInstance().close();
QuantizationStateCacheManager.getInstance().close();
}

private void initKNNSettings() {
Expand Down
35 changes: 35 additions & 0 deletions src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.junit.Test;
import org.opensearch.knn.index.util.ScheduledExecutor;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class CacheMaintainerTests {
@Test
public void testCacheEviction() throws InterruptedException {
Cache<String, String> testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build();

ScheduledExecutor executor = new ScheduledExecutor(testCache::cleanUp, 60 * 1000);

testCache.put("key1", "value1");
assertEquals(testCache.size(), 1);

Thread.sleep(1500);

executor.getTask().run();

assertEquals(testCache.size(), 0);

executor.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void tearDown() throws Exception {
Settings circuitBreakerSettings = Settings.builder().putNull(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED).build();
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
client().admin().cluster().updateSettings(clusterUpdateSettingsRequest).get();
NativeMemoryCacheManager.getInstance().close();
super.tearDown();
}

Expand Down Expand Up @@ -378,6 +379,7 @@ public void testCacheCapacity() {

nativeMemoryCacheManager.setCacheCapacityReached(false);
assertFalse(nativeMemoryCacheManager.isCacheCapacityReached());
nativeMemoryCacheManager.close();
}

public void testGetIndicesCacheStats() throws IOException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -417,7 +418,7 @@ public void testRebuildOnTimeExpirySettingsChange() {
assertNull("State should be null", retrievedState);
}

public void testCacheEvictionDueToSize() {
public void testCacheEvictionDueToSize() throws IOException {
String fieldName = "evictionField";
// States have size of slightly over 500 bytes so that adding two will reach the max size of 1 kb for the cache
int arrayLength = 112;
Expand Down Expand Up @@ -445,6 +446,7 @@ public void testCacheEvictionDueToSize() {
cache.addQuantizationState(fieldName, state);
cache.addQuantizationState(fieldName, state2);
cache.clear();
cache.close();
assertNotNull(cache.getEvictedDueToSizeAt());
}
}

0 comments on commit 85b1782

Please sign in to comment.