Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread to periodically perform pending cache maintenance #2308

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Documentation
### Maintenance
* Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236)
* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2308](https://github.com/opensearch-project/k-NN/pull/2308)
* Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278]
* Added Lucene BWC tests (#2313)[https://github.com/opensearch-project/k-NN/pull/2313]
* Upgrade jsonpath from 2.8.0 to 2.9.0[2325](https://github.com/opensearch-project/k-NN/pull/2325)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.util.ScheduledExecutor;
import org.opensearch.knn.plugin.stats.StatNames;

import java.io.Closeable;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class NativeMemoryCacheManager implements Closeable {
private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
private final ExecutorService executor;
private ScheduledExecutor cacheMaintainer;
private AtomicBoolean cacheCapacityReached;
private long maxWeight;

Expand Down Expand Up @@ -87,6 +89,10 @@ private void initialize() {
}

private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}

CacheBuilder<String, NativeMemoryAllocation> cacheBuilder = CacheBuilder.newBuilder()
.recordStats()
.concurrencyLevel(1)
Expand All @@ -99,6 +105,17 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {

if (nativeMemoryCacheDTO.isExpirationLimited()) {
cacheBuilder.expireAfterAccess(nativeMemoryCacheDTO.getExpiryTimeInMin(), TimeUnit.MINUTES);
Runnable cleanUp = () -> {
try {
cache.cleanUp();
} catch (Exception e) {
// Exceptions from Guava shouldn't halt the executor
logger.error("Error cleaning up cache", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why swallow exception here?

Copy link
Author

@owenhalpert owenhalpert Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any exceptions from Guava cache operations would otherwise halt our scheduled executor. If an exception occurs here, it would be from Guava's internals rather than our logic, so per Dooyong's suggestion above we can log it and continue scheduling cleanup tasks.

This way we can ensure the cache maintenance keeps running even if individual cleanup attempts fail, and we can monitor Guava errors in the logs.

}
};
long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES))
.getMillis();
this.cacheMaintainer = new ScheduledExecutor(cleanUp, scheduleMillis);
}

cacheCapacityReached = new AtomicBoolean(false);
Expand Down Expand Up @@ -142,6 +159,9 @@ public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCa
@Override
public void close() {
executor.shutdown();
if (cacheMaintainer != null) {
cacheMaintainer.close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.util;

import lombok.Getter;

import java.io.Closeable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Executes a task periodically
*/
public class ScheduledExecutor implements Closeable {
final ScheduledExecutorService executor;
@Getter
private final Runnable task;

/**
* @param task task to be completed
* @param scheduleMillis time in milliseconds to wait before executing the task again
*/
public ScheduledExecutor(Runnable task, long scheduleMillis) {
this.task = task;
this.executor = Executors.newSingleThreadScheduledExecutor();
Copy link
Collaborator

@shatejas shatejas Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be creating a new thread for each instance, I wanted to understand the thought process behind this.

I understand that this is used for cache so the threads won't grow exponentially, but since its in util package and the name is generic - any misuse of this can create unbounded number of threads with each new instance. Can we be a bit more defensive here?

The suggestion here is to refactor and make sure there are fixed number of threads for cache cleanup across caches

executor.scheduleAtFixedRate(task, 0, scheduleMillis, TimeUnit.MILLISECONDS);
}

@Override
public void close() {
executor.shutdown();
owenhalpert marked this conversation as resolved.
Show resolved Hide resolved
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.util.ScheduledExecutor;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,10 +29,11 @@
* A thread-safe singleton cache that contains quantization states.
*/
@Log4j2
public class QuantizationStateCache {
public class QuantizationStateCache implements Closeable {

private static volatile QuantizationStateCache instance;
private Cache<String, QuantizationState> cache;
private ScheduledExecutor cacheMaintainer;
@Getter
private long maxCacheSizeInKB;
@Getter
Expand Down Expand Up @@ -58,6 +61,10 @@ static QuantizationStateCache getInstance() {
}

private void buildCache() {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}

this.cache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumWeight(maxCacheSizeInKB).weigher((k, v) -> {
try {
return ((QuantizationState) v).toByteArray().length;
Expand All @@ -71,6 +78,17 @@ private void buildCache() {
)
.removalListener(this::onRemoval)
.build();

Runnable cleanUp = () -> {
try {
cache.cleanUp();
} catch (Exception e) {
// Exceptions from Guava shouldn't halt the executor
log.error("Error cleaning up cache", e);
}
};
long scheduleMillis = ((TimeValue) KNNSettings.state().getSettingValue(QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES)).getMillis();
this.cacheMaintainer = new ScheduledExecutor(cleanUp, scheduleMillis);
}

synchronized void rebuildCache() {
Expand Down Expand Up @@ -129,4 +147,11 @@ private void updateEvictedDueToSizeAt() {
public void clear() {
cache.invalidateAll();
}

@Override
public void close() throws IOException {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import lombok.NoArgsConstructor;
import org.opensearch.knn.index.codec.KNN990Codec.KNN990QuantizationStateReader;

import java.io.Closeable;
import java.io.IOException;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class QuantizationStateCacheManager {
public final class QuantizationStateCacheManager implements Closeable {

private static volatile QuantizationStateCacheManager instance;

Expand Down Expand Up @@ -79,4 +80,9 @@ public void setMaxCacheSizeInKB(long maxCacheSizeInKB) {
public void clear() {
QuantizationStateCache.getInstance().clear();
}

@Override
public void close() throws IOException {
owenhalpert marked this conversation as resolved.
Show resolved Hide resolved
QuantizationStateCache.getInstance().close();
}
}
2 changes: 2 additions & 0 deletions src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.index.IndexService;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.plugins.Plugin;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
Expand Down Expand Up @@ -86,6 +87,7 @@ protected boolean resetNodeAfterTest() {
public void tearDown() throws Exception {
NativeMemoryCacheManager.getInstance().invalidateAll();
NativeMemoryCacheManager.getInstance().close();
QuantizationStateCacheManager.getInstance().close();
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close();
NativeMemoryLoadStrategy.TrainingLoadStrategy.getInstance().close();
NativeMemoryLoadStrategy.AnonymousLoadStrategy.getInstance().close();
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/org/opensearch/knn/KNNTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -73,7 +75,7 @@ protected boolean enableWarningsCheck() {
return false;
}

public void resetState() {
public void resetState() throws IOException {
// Reset all of the counters
for (KNNCounter knnCounter : KNNCounter.values()) {
knnCounter.set(0L);
Expand All @@ -83,6 +85,7 @@ public void resetState() {
// Clean up the cache
NativeMemoryCacheManager.getInstance().invalidateAll();
NativeMemoryCacheManager.getInstance().close();
QuantizationStateCacheManager.getInstance().close();
}

private void initKNNSettings() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.junit.Test;
import org.opensearch.knn.index.util.ScheduledExecutor;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class CacheMaintainerTests {
@Test
public void testCacheEviction() throws InterruptedException {
Cache<String, String> testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build();

ScheduledExecutor executor = new ScheduledExecutor(testCache::cleanUp, 60 * 1000);

testCache.put("key1", "value1");
assertEquals(testCache.size(), 1);

Thread.sleep(1500);

executor.getTask().run();

assertEquals(testCache.size(), 0);

executor.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void tearDown() throws Exception {
Settings circuitBreakerSettings = Settings.builder().putNull(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED).build();
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
client().admin().cluster().updateSettings(clusterUpdateSettingsRequest).get();
NativeMemoryCacheManager.getInstance().close();
super.tearDown();
}

Expand Down Expand Up @@ -378,6 +379,7 @@ public void testCacheCapacity() {

nativeMemoryCacheManager.setCacheCapacityReached(false);
assertFalse(nativeMemoryCacheManager.isCacheCapacityReached());
nativeMemoryCacheManager.close();
}

public void testGetIndicesCacheStats() throws IOException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -417,7 +418,7 @@ public void testRebuildOnTimeExpirySettingsChange() {
assertNull("State should be null", retrievedState);
}

public void testCacheEvictionDueToSize() {
public void testCacheEvictionDueToSize() throws IOException {
String fieldName = "evictionField";
// States have size of slightly over 500 bytes so that adding two will reach the max size of 1 kb for the cache
int arrayLength = 112;
Expand Down Expand Up @@ -445,6 +446,7 @@ public void testCacheEvictionDueToSize() {
cache.addQuantizationState(fieldName, state);
cache.addQuantizationState(fieldName, state2);
cache.clear();
cache.close();
assertNotNull(cache.getEvictedDueToSizeAt());
}
}
Loading