Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move TSC took-time policy to guard both heap and disk tier #17190

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Limit reader writer separation to remote store enabled clusters ([#16760](https://github.com/opensearch-project/OpenSearch/pull/16760)
- Optimize innerhits query performance ([#16937](https://github.com/opensearch-project/OpenSearch/pull/16937)
- Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233))
- TieredSpilloverCache took-time threshold now guards heap tier as well as disk tier [#17190](https://github.com/opensearch-project/OpenSearch/pull/17190)

### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
);
}

public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
public void testWithDynamicDiskTookTimePolicyWithMultiSegments() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizePerSegmentInBytes = 800; // Per cache entry below is around ~700 bytes, so keeping this
// just a bit higher so that each segment can atleast hold 1 entry.
Expand All @@ -139,12 +139,13 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
)
.get()
);
// Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
// Set a very high value for took time disk policy so that no items evicted from onHeap cache are spilled
// to disk. And then hit requests so that few items are cached into cache.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(100, TimeUnit.SECONDS)
)
.build()
Expand Down Expand Up @@ -182,12 +183,13 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
assertEquals(0, requestCacheStats.getHitCount());
long lastEvictionSeen = requestCacheStats.getEvictions();

// Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
// Decrease disk took time policy to zero so that disk cache also comes into play. Now we should be able
// to cache all entries.
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand All @@ -206,7 +208,7 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
assertEquals(lastEvictionSeen, requestCacheStats.getEvictions());
}

public void testWithDynamicTookTimePolicy() throws Exception {
public void testWithDynamicHeapTookTimePolicy() throws Exception {
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
Client client = client();
Expand All @@ -224,12 +226,65 @@ public void testWithDynamicTookTimePolicy() throws Exception {
)
.get()
);
// Step 1 : Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
// Set a high threshold for the overall cache took time policy so nothing will enter the cache.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_HEAP_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(100, TimeUnit.SECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
int numberOfIndexedItems = randomIntBetween(6, 10);
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
long perQuerySizeInCacheInBytes = -1;
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
// Check the old stats API has properly subtracted off the byte size for key, without incrementing evictions.
assertEquals(0, requestCacheStats.getMemorySizeInBytes());
assertEquals(0, requestCacheStats.getEvictions());
}

public void testWithDynamicDiskTookTimePolicy() throws Exception {
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
// Step 1 : Set a very high value for disk took time policy so that no items evicted from onHeap cache are spilled
// to disk. And then hit requests so that few items are cached into cache.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(100, TimeUnit.SECONDS)
)
.build()
Expand Down Expand Up @@ -282,12 +337,13 @@ public void testWithDynamicTookTimePolicy() throws Exception {
assertEquals(0, requestCacheStats.getHitCount());
long lastEvictionSeen = requestCacheStats.getEvictions();

// Step 3: Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
// Step 3: Decrease disk took time policy to zero so that disk cache also comes into play. Now we should be able
// to cache all entries.
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand Down Expand Up @@ -352,11 +408,12 @@ public void testInvalidationWithIndicesRequestCache() throws Exception {
)
.get()
);
// Update took time policy to zero so that all entries are eligible to be cached on disk.
// Update disk took time policy to zero so that all entries are eligible to be cached on disk.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand Down Expand Up @@ -437,11 +494,12 @@ public void testWithExplicitCacheClear() throws Exception {
)
.get()
);
// Update took time policy to zero so that all entries are eligible to be cached on disk.
// Update disk took time policy to zero so that all entries are eligible to be cached on disk.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand Down Expand Up @@ -512,11 +570,12 @@ public void testWithDynamicDiskCacheSetting() throws Exception {
)
.get()
);
// Update took time policy to zero so that all entries are eligible to be cached on disk.
// Update disk took time policy to zero so that all entries are eligible to be cached on disk.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
* Test aggregating by indices
*/
public void testIndicesLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
Client client = client();
Map<String, Integer> values = setupCacheForAggregationTests(client);

Expand Down Expand Up @@ -115,16 +106,7 @@ public void testIndicesLevelAggregation() throws Exception {
* Test aggregating by indices and tier
*/
public void testIndicesAndTierLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
Client client = client();
Map<String, Integer> values = setupCacheForAggregationTests(client);

Expand Down Expand Up @@ -195,16 +177,7 @@ public void testIndicesAndTierLevelAggregation() throws Exception {
* Test aggregating by tier only
*/
public void testTierLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
Client client = client();
Map<String, Integer> values = setupCacheForAggregationTests(client);
// Get values for tiers alone and check they add correctly across indices
Expand Down Expand Up @@ -236,16 +209,7 @@ public void testTierLevelAggregation() throws Exception {
}

public void testInvalidLevelsAreIgnored() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments()))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
Client client = client();
Map<String, Integer> values = setupCacheForAggregationTests(client);

Expand Down Expand Up @@ -287,16 +251,7 @@ public void testInvalidLevelsAreIgnored() throws Exception {
* Check the new stats API returns the same values as the old stats API.
*/
public void testStatsMatchOldApi() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments()))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
String index = "index";
Client client = client();
startIndex(client, index);
Expand Down Expand Up @@ -353,8 +308,14 @@ public void testStatsWithMultipleSegments() throws Exception {
Settings.builder()
.put(defaultSettings(heap_cache_size_per_segment * numberOfSegments + "B", numberOfSegments))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
TieredSpilloverCacheSettings.TOOK_TIME_HEAP_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
TimeValue.ZERO
)
.put(
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
TimeValue.ZERO
)
.build()
);
Expand Down Expand Up @@ -426,7 +387,13 @@ public void testClosingShard() throws Exception {
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments()))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_HEAP_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.put(
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
Expand Down Expand Up @@ -631,4 +598,23 @@ private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client,
NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats();
return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE);
}

private void startNodesDefaultSettings() {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_HEAP_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
TimeValue.ZERO
)
.put(
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
TimeValue.ZERO
)
.build()
);
}
}
Loading
Loading