Skip to content

Commit

Permalink
Move TSC took-time policy to guard both heap and disk tier
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Jan 29, 2025
1 parent e6fc600 commit 7d8c7e9
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,12 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());

private final Map<ICache<K, V>, TierInfo> caches;

// Policies guarding access to the cache overall.
private final List<Predicate<V>> policies;

// Policies guarding access to the disk tier.
private final List<Predicate<V>> diskPolicies;

private final TieredSpilloverCacheStatsHolder statsHolder;

private final long onHeapCacheMaxWeight;
Expand Down Expand Up @@ -220,7 +223,8 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
cacheListMap.put(onHeapCache, new TierInfo(true, TIER_DIMENSION_VALUE_ON_HEAP));
cacheListMap.put(diskCache, new TierInfo(isDiskCacheEnabled, TIER_DIMENSION_VALUE_DISK));
this.caches = Collections.synchronizedMap(cacheListMap);
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
this.policies = builder.policies;
this.diskPolicies = builder.diskPolicies; // Will never be null; builder initializes it to an empty list
this.onHeapCacheMaxWeight = onHeapCacheSizeInBytes;
this.diskCacheMaxWeight = diskCacheSizeInBytes;
}
Expand Down Expand Up @@ -257,19 +261,23 @@ public void put(ICacheKey<K> key, V value) {
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(true).apply(key);
if (cacheValueTuple == null) {
// In case it is not present in any tier, put it inside onHeap cache by default.
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
if (evaluatePoliciesList(value, policies)) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
}
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
} else {
// Put it inside desired tier.
try (ReleasableLock ignore = writeLock.acquire()) {
for (Map.Entry<ICache<K, V>, TierInfo> entry : this.caches.entrySet()) {
if (cacheValueTuple.v2().equals(entry.getValue().tierName)) {
entry.getKey().put(key, value);
if (evaluatePoliciesList(value, policies)) {
try (ReleasableLock ignore = writeLock.acquire()) {
for (Map.Entry<ICache<K, V>, TierInfo> entry : this.caches.entrySet()) {
if (cacheValueTuple.v2().equals(entry.getValue().tierName)) {
entry.getKey().put(key, value);
}
}
updateStatsOnPut(cacheValueTuple.v2(), key, value);
}
updateStatsOnPut(cacheValueTuple.v2(), key, value);
}
}
}
Expand Down Expand Up @@ -297,13 +305,13 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = compute(key, loader, future);
Tuple<V, Boolean> computedValueTuple = compute(key, loader, future);
// Handle stats
if (loader.isLoaded()) {
if (computedValueTuple.v2()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk
// cache
// if present
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, computedValueTuple.v1());
statsHolder.incrementMisses(heapDimensionValues);
if (caches.get(diskCache).isEnabled()) {
statsHolder.incrementMisses(diskDimensionValues);
Expand All @@ -312,7 +320,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// Another thread requesting this key already loaded the value. Register a hit for the heap cache
statsHolder.incrementHits(heapDimensionValues);
}
return value;
return computedValueTuple.v1();
} else {
// Handle stats for an initial hit from getValueFromTieredCache()
if (cacheValueTuple.v2().equals(TIER_DIMENSION_VALUE_ON_HEAP)) {
Expand All @@ -327,32 +335,45 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader, CompletableFuture<Tuple<ICacheKey<K>, V>> future)
throws Exception {
private Tuple<V, Boolean> compute(
ICacheKey<K> key,
LoadAwareCacheLoader<ICacheKey<K>, V> loader,
CompletableFuture<Tuple<ICacheKey<K>, V>> future
) throws Exception {
// Handler to handle results post-processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Void> handler = (pair, ex) -> {
// a tuple of the value and a boolean for whether it entered the cache. Also before returning value, puts the value in cache
// if this is allowed by the cache policies.
boolean didPutIntoCache = false;
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Boolean> handler = (pair, ex) -> {
boolean lambdaDidPutIntoCache = false;
if (pair != null) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
if (evaluatePoliciesList(pair.v2(), policies)) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
// We must load the value for the policy to check it, so we can't rely on loader.isLoaded()
// to determine if the new value entered the cache since the policy may have blocked it.
// Instead return this boolean as well.
lambdaDidPutIntoCache = true;
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
}
}
} else {
if (ex != null) {
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
// Safe to remove from the map even if policy blocks value from entering the cache
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
return null;
return lambdaDidPutIntoCache;
};
V value = null;
if (future == null) {
future = completableFutureMap.get(key);
future.handle(handler);
CompletableFuture<Boolean> didPutIntoCacheFuture = future.handle(handler);
try {
value = loader.load(key);
} catch (Exception ex) {
Expand All @@ -365,6 +386,8 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
throw new ExecutionException(npe);
} else {
future.complete(new Tuple<>(key, value));
// If the future is completed, didPutIntoCacheFuture should also be completed, so it's safe to run .get() on it
didPutIntoCache = didPutIntoCacheFuture.get();
}
} else {
try {
Expand All @@ -373,7 +396,7 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
throw new IllegalStateException(ex);
}
}
return value;
return new Tuple<>(value, didPutIntoCache);
}

@Override
Expand Down Expand Up @@ -442,7 +465,9 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason());
boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier
boolean exceptionOccurredOnDiskCachePut = false;
boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue());
boolean canCacheOnDisk = caches.get(diskCache).isEnabled()
&& wasEvicted
&& evaluatePoliciesList(notification.getValue(), diskPolicies);
if (canCacheOnDisk) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
Expand All @@ -465,8 +490,8 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue(), countEvictionTowardsTotal);
}

boolean evaluatePolicies(V value) {
for (Predicate<V> policy : policies) {
boolean evaluatePoliciesList(V value, List<Predicate<V>> policiesList) {
for (Predicate<V> policy : policiesList) {
if (!policy.test(value)) {
return false;
}
Expand Down Expand Up @@ -873,7 +898,8 @@ public static class Builder<K, V> {
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
private final ArrayList<Predicate<V>> policies = new ArrayList<>();
private final List<Predicate<V>> policies = new ArrayList<>();
private final List<Predicate<V>> diskPolicies = new ArrayList<>();

private int numberOfSegments;
private long onHeapCacheSizeInBytes;
Expand Down Expand Up @@ -945,7 +971,7 @@ public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactorie
}

/**
* Set a cache policy to be used to limit access to this cache's disk tier.
* Set a cache policy to be used to limit access to this cache.
* @param policy the policy
* @return builder
*/
Expand All @@ -955,7 +981,7 @@ public Builder<K, V> addPolicy(Predicate<V> policy) {
}

/**
* Set multiple policies to be used to limit access to this cache's disk tier.
* Set multiple policies to be used to limit access to this cache.
* @param policies the policies
* @return builder
*/
Expand All @@ -964,6 +990,26 @@ public Builder<K, V> addPolicies(List<Predicate<V>> policies) {
return this;
}

/**
* Set a cache policy to be used to limit access to this cache's disk tier.
* @param diskPolicy the policy
* @return builder
*/
public Builder<K, V> addDiskPolicy(Predicate<V> diskPolicy) {
this.diskPolicies.add(diskPolicy);
return this;
}

/**
* Set multiple policies to be used to limit access to this cache's disk tier.
* @param diskPolicies the policies
* @return builder
*/
public Builder<K, V> addDiskPolicies(List<Predicate<V>> diskPolicies) {
this.diskPolicies.addAll(diskPolicies);
return this;
}

/**
* Sets number of segments for tiered cache
* @param numberOfSegments number of segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class TieredSpilloverCacheSettings {
*/
public static final long MIN_DISK_CACHE_SIZE_IN_BYTES = 10485760L;

public static final TimeValue DEFAULT_TOOK_TIME_THRESHOLD = new TimeValue(10, TimeUnit.MILLISECONDS);

/**
* Setting which defines the onHeap cache store to be used in TieredSpilloverCache.
*
Expand Down Expand Up @@ -109,13 +111,15 @@ public class TieredSpilloverCacheSettings {
);

/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
* Setting defining the minimum took time for a query to be allowed in the cache.
* For backwards compatibility, the setting key still has "disk" in it, but the threshold is applied to enter into
* any tier of the cache.
*/
private static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
private static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
(key) -> Setting.timeSetting(
key,
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
DEFAULT_TOOK_TIME_THRESHOLD,
TimeValue.ZERO, // Minimum value for this setting
NodeScope,
Setting.Property.Dynamic
Expand Down Expand Up @@ -143,7 +147,7 @@ public class TieredSpilloverCacheSettings {
for (CacheType cacheType : CacheType.values()) {
concreteTookTimePolicySettingMap.put(
cacheType,
TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
TIERED_SPILLOVER_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
diskCacheSettingMap.put(
cacheType,
Expand Down
Loading

0 comments on commit 7d8c7e9

Please sign in to comment.