-
Notifications
You must be signed in to change notification settings - Fork 126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add thread to periodically perform pending cache maintenance #2308
base: main
Are you sure you want to change the base?
Changes from 3 commits
85b1782
826e80a
1c4b6bf
14ed10f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.index.util; | ||
|
||
import lombok.Getter; | ||
|
||
import java.io.Closeable; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* Executes a task periodically | ||
*/ | ||
public class ScheduledExecutor implements Closeable { | ||
final ScheduledExecutorService executor; | ||
@Getter | ||
private final Runnable task; | ||
|
||
/** | ||
* @param task task to be completed | ||
* @param scheduleMillis time in milliseconds to wait before executing the task again | ||
*/ | ||
public ScheduledExecutor(Runnable task, long scheduleMillis) { | ||
this.task = task; | ||
this.executor = Executors.newSingleThreadScheduledExecutor(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to be creating a new thread for each instance, I wanted to understand the thought process behind this. I understand that this is used for cache so the threads won't grow exponentially, but since its in util package and the name is generic - any misuse of this can create unbounded number of threads with each new instance. Can we be a bit more defensive here? The suggestion here is to refactor and make sure there are fixed number of threads for cache cleanup across caches |
||
executor.scheduleAtFixedRate(task, 0, scheduleMillis, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
executor.shutdown(); | ||
owenhalpert marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { | ||
executor.shutdownNow(); | ||
} | ||
} catch (InterruptedException e) { | ||
executor.shutdownNow(); | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.index; | ||
|
||
import com.google.common.cache.Cache; | ||
import com.google.common.cache.CacheBuilder; | ||
import org.junit.Test; | ||
import org.opensearch.knn.index.util.ScheduledExecutor; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
|
||
public class CacheMaintainerTests { | ||
@Test | ||
public void testCacheEviction() throws InterruptedException { | ||
Cache<String, String> testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(); | ||
|
||
ScheduledExecutor executor = new ScheduledExecutor(testCache::cleanUp, 60 * 1000); | ||
|
||
testCache.put("key1", "value1"); | ||
assertEquals(testCache.size(), 1); | ||
|
||
Thread.sleep(1500); | ||
|
||
executor.getTask().run(); | ||
|
||
assertEquals(testCache.size(), 0); | ||
|
||
executor.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why swallow exception here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any exceptions from Guava cache operations would otherwise halt our scheduled executor. If an exception occurs here, it would be from Guava's internals rather than our logic, so per Dooyong's suggestion above we can log it and continue scheduling cleanup tasks.
This way we can ensure the cache maintenance keeps running even if individual cleanup attempts fail, and we can monitor Guava errors in the logs.