Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Oct 3, 2024
1 parent bea8fd7 commit bd1fce1
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE;
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE;
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -609,7 +610,11 @@ public void testWithInvalidSegmentNumberSetting() throws Exception {
// that all items are
// cached onto disk.
assertThrows(
INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE,
String.format(
Locale.ROOT,
INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE,
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
),
IllegalArgumentException.class,
() -> internalCluster().startNode(
Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand All @@ -49,11 +50,11 @@
import java.util.function.ToLongBiFunction;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP;
import static org.opensearch.common.cache.settings.CacheSettings.VALID_SEGMENT_NUMBER_LIST;
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE;
import static org.opensearch.common.cache.settings.CacheSettings.VALID_SEGMENT_COUNT_VALUES;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
Expand All @@ -72,16 +73,13 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
private static final List<RemovalReason> SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);
private static final Logger logger = LogManager.getLogger(TieredSpilloverCache.class);

static final String NUMBER_OF_SEGMENTS_ZERO_EXCEPTION_MESSAGE = "Number of segments cannot be less than or equal " + "to zero";
static final String ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE = "Segment count cannot be less than one for tiered cache";

// In future we want to just read the stats from the individual tiers' statsHolder objects, but this isn't
// possible right now because of the way computeIfAbsent is implemented.
private final TieredSpilloverCacheStatsHolder statsHolder;
private ToLongBiFunction<ICacheKey<K>, V> weigher;
private final List<String> dimensionNames;

private final List<Predicate<V>> policies;

private final int numberOfSegments;

final TieredSpilloverCacheSegment<K, V>[] tieredSpilloverCacheSegments;
Expand All @@ -97,7 +95,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
Objects.requireNonNull(builder.cacheConfig, "cache config can't be null");
Objects.requireNonNull(builder.cacheConfig.getSettings(), "settings can't be null");
if (builder.numberOfSegments <= 0) {
throw new IllegalArgumentException(NUMBER_OF_SEGMENTS_ZERO_EXCEPTION_MESSAGE);
throw new IllegalArgumentException(ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE);
}
this.numberOfSegments = builder.numberOfSegments;
Boolean isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings());
Expand All @@ -108,7 +106,6 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
for (int i = 0; i < numberOfSegments; i++) {
tieredSpilloverCacheSegments[i] = new TieredSpilloverCacheSegment<K, V>(builder, statsHolder, i + 1, this.numberOfSegments);
}
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
builder.cacheConfig.getClusterSettings()
.addSettingsUpdateConsumer(DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType), this::enableDisableDiskCache);
}
Expand Down Expand Up @@ -170,7 +167,7 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
.setClusterSettings(builder.cacheConfig.getClusterSettings())
.setNumberOfSegments(numberOfSegments)
.setSegmentCount(numberOfSegments)
.setSegmentNumber(segmentNumber)
.setStatsTrackingEnabled(false)
.build(),
Expand All @@ -187,7 +184,7 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
.setKeySerializer(builder.cacheConfig.getKeySerializer())
.setValueSerializer(builder.cacheConfig.getValueSerializer())
.setDimensionNames(builder.cacheConfig.getDimensionNames())
.setNumberOfSegments(numberOfSegments)
.setSegmentCount(numberOfSegments)
.setSegmentNumber(segmentNumber)
.setStatsTrackingEnabled(false)
.setStoragePath(builder.cacheConfig.getStoragePath() + "/" + segmentNumber)
Expand Down Expand Up @@ -223,7 +220,7 @@ void enableDisableDiskCache(Boolean isDiskCacheEnabled) {

@Override
public V get(ICacheKey<K> key) {
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(true, false).apply(key);
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(true).apply(key);
if (cacheValueTuple == null) {
return null;
}
Expand All @@ -233,7 +230,7 @@ public V get(ICacheKey<K> key) {
@Override
public void put(ICacheKey<K> key, V value) {
// First check in case the key is already present in either of tiers.
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(true, false).apply(key);
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(true).apply(key);
if (cacheValueTuple == null) {
// In case it is not present in any tier, put it inside onHeap cache by default.
try (ReleasableLock ignore = writeLock.acquire()) {
Expand Down Expand Up @@ -262,7 +259,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
Tuple<V, String> cacheValueTuple;
CompletableFuture<Tuple<ICacheKey<K>, V>> future = null;
try (ReleasableLock ignore = readLock.acquire()) {
cacheValueTuple = getValueFromTieredCache(false, false).apply(key);
cacheValueTuple = getValueFromTieredCache(false).apply(key);
if (cacheValueTuple == null) {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
Expand Down Expand Up @@ -460,11 +457,11 @@ boolean evaluatePolicies(V value) {
* @param captureStats Whether to record hits/misses for this call of the function
* @return A tuple of the value and the name of the tier it was found in.
*/
private Function<ICacheKey<K>, Tuple<V, String>> getValueFromTieredCache(boolean captureStats, boolean forceCheck) {
private Function<ICacheKey<K>, Tuple<V, String>> getValueFromTieredCache(boolean captureStats) {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (Map.Entry<ICache<K, V>, TierInfo> cacheEntry : caches.entrySet()) {
if (cacheEntry.getValue().isEnabled() || forceCheck) {
if (cacheEntry.getValue().isEnabled()) {
V value = cacheEntry.getKey().get(key);
// Get the tier value corresponding to this cache
String tierValue = cacheEntry.getValue().tierName;
Expand Down Expand Up @@ -802,8 +799,10 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,

int numberOfSegments = TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(cacheType.getSettingPrefix()).get(settings);

if (!VALID_SEGMENT_NUMBER_LIST.contains(numberOfSegments)) {
throw new IllegalArgumentException(INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE);
if (!VALID_SEGMENT_COUNT_VALUES.contains(numberOfSegments)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE, TIERED_SPILLOVER_CACHE_NAME)
);
}

return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,19 @@
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.common.cache.settings.CacheSettings.VALID_SEGMENT_NUMBER_LIST;
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE;
import static org.opensearch.common.cache.settings.CacheSettings.VALID_SEGMENT_COUNT_VALUES;
import static org.opensearch.common.settings.Setting.Property.NodeScope;

/**
* Settings related to TieredSpilloverCache.
*/
public class TieredSpilloverCacheSettings {

/**
* Exception message for invalid segment number.
*/
public static final String INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE = "Tiered cache segment number should be "
+ "power of two up-to 256";
/**
* Setting which defines the onHeap cache store to be used in TieredSpilloverCache.
*
Expand Down Expand Up @@ -62,8 +59,14 @@ public class TieredSpilloverCacheSettings {
public static final Setting.AffixSetting<Integer> TIERED_SPILLOVER_SEGMENTS = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".segments",
(key) -> Setting.intSetting(key, 16, 1, k -> {
if (!VALID_SEGMENT_NUMBER_LIST.contains(k)) {
throw new IllegalArgumentException(INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE);
if (!VALID_SEGMENT_COUNT_VALUES.contains(k)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE,
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
);
}
}, NodeScope)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,19 +155,12 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.setDeliberateDelay(delay)
.setRemovalListener(config.getRemovalListener())
.setStatsTrackingEnabled(config.getStatsTrackingEnabled());
if (config.getSegmentNumber() > 0 && config.getNumberOfSegments() > 0) {
int perSegmentSize = maxSize / config.getNumberOfSegments();
if (config.getSegmentCount() > 0) {
int perSegmentSize = maxSize / config.getSegmentCount();
if (perSegmentSize <= 0) {
throw new IllegalArgumentException("Per segment size for mock disk cache should be " + "greater than 0");
}
builder.setMaxSize(perSegmentSize);
// In case this is the last segment, assign the remainder of bytes accordingly
if (config.getSegmentNumber() == config.getNumberOfSegments()) {
if (maxSize % config.getNumberOfSegments() != 0) {
builder.setMaxSize(perSegmentSize + maxSize % config.getNumberOfSegments());
}
}

} else {
builder.setMaxSize(maxSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@
import java.util.function.Function;
import java.util.function.Predicate;

import static org.opensearch.cache.common.tier.TieredSpilloverCache.NUMBER_OF_SEGMENTS_ZERO_EXCEPTION_MESSAGE;
import static org.opensearch.cache.common.tier.TieredSpilloverCache.ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_NAME;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP;
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE;
import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -292,7 +292,7 @@ public void testComputeIfAbsentWithSegmentedCache() throws Exception {
// 20_000_000 ns = 20 ms to compute
.setClusterSettings(clusterSettings)
.setStoragePath(storagePath)
.setNumberOfSegments(numberOfSegments)
.setSegmentCount(numberOfSegments)
.build(),
CacheType.INDICES_REQUEST_CACHE,
Map.of(
Expand Down Expand Up @@ -1898,7 +1898,7 @@ public void testTieredCacheWithZeroNumberOfSegments() {
int keyValueSize = 1;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
assertThrows(
NUMBER_OF_SEGMENTS_ZERO_EXCEPTION_MESSAGE,
ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE,
IllegalArgumentException.class,
() -> initializeTieredSpilloverCache(
keyValueSize,
Expand Down Expand Up @@ -2183,7 +2183,7 @@ private CacheConfig<String, String> getCacheConfig(
)
.setClusterSettings(clusterSettings)
.setStoragePath(storagePath)
.setNumberOfSegments(numberOfSegments)
.setSegmentCount(numberOfSegments)
.build();
}

Expand Down Expand Up @@ -2306,13 +2306,6 @@ private Map<Integer, Integer> getSegmentOnHeapCacheSize(int numberOfSegments, in
int perSegmentOnHeapCacheSizeBytes = onHeapCacheSizeInBytes / numberOfSegments;
int perSegmentOnHeapCacheEntries = perSegmentOnHeapCacheSizeBytes / keyValueSize;
expectedSegmentOnHeapCacheSize.put(i, perSegmentOnHeapCacheEntries);
if (i == (numberOfSegments - 1)) {
if (onHeapCacheSizeInBytes % numberOfSegments != 0) {
// 400
int lastSegmentOnHeapCacheSizeBytes = perSegmentOnHeapCacheSizeBytes + onHeapCacheSizeInBytes % numberOfSegments;
expectedSegmentOnHeapCacheSize.put(i, lastSegmentOnHeapCacheSizeBytes / keyValueSize);
}
}
}
return expectedSegmentOnHeapCacheSize;
}
Expand All @@ -2322,12 +2315,6 @@ private Map<Integer, Integer> getSegmentMockDiskCacheSize(int numberOfSegments,
for (int i = 0; i < numberOfSegments; i++) {
int perSegmentDiskCacheEntries = diskCacheSize / numberOfSegments;
expectedSegmentDiskCacheSize.put(i, perSegmentDiskCacheEntries);
if (i == (numberOfSegments - 1)) {
if (diskCacheSize % numberOfSegments != 0) {
int lastSegmentDiskCacheEntries = perSegmentDiskCacheEntries + diskCacheSize % numberOfSegments;
expectedSegmentDiskCacheSize.put(i, lastSegmentDiskCacheEntries);
}
}
}
return expectedSegmentDiskCacheSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,18 +736,12 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.setSettings(settings);
// If this is suppose to be a segmented cache, then accordingly set max size
long maxSizeInBytes = (Long) settingList.get(DISK_MAX_SIZE_IN_BYTES_KEY).get(settings);
if (config.getSegmentNumber() > 0 && config.getNumberOfSegments() > 0) {
long perSegmentSizeInBytes = maxSizeInBytes / config.getNumberOfSegments();
if (config.getSegmentCount() > 0) {
long perSegmentSizeInBytes = maxSizeInBytes / config.getSegmentCount();
if (perSegmentSizeInBytes <= 0) {
throw new IllegalArgumentException("Per segment size for ehcache disk cache should be greater than 0");
}
builder.setMaximumWeightInBytes(perSegmentSizeInBytes);
// In case this is the last segment, assign the remainder of bytes accordingly
if (config.getSegmentNumber() == config.getNumberOfSegments()) {
if (config.getMaxSizeInBytes() % config.getNumberOfSegments() != 0) {
builder.setMaximumWeightInBytes(config.getMaxSizeInBytes() % config.getNumberOfSegments());
}
}
} else {
builder.setMaximumWeightInBytes(maxSizeInBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@

import org.opensearch.common.unit.TimeValue;

import java.util.Locale;
import java.util.Objects;
import java.util.function.ToLongBiFunction;

import static org.opensearch.common.cache.settings.CacheSettings.VALID_SEGMENT_NUMBER_LIST;
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE;
import static org.opensearch.common.cache.settings.CacheSettings.VALID_SEGMENT_COUNT_VALUES;

/**
* The cache builder.
Expand All @@ -59,8 +61,8 @@ public static <K, V> CacheBuilder<K, V> builder() {
private CacheBuilder() {}

public CacheBuilder<K, V> setNumberOfSegments(int numberOfSegments) {
if (!VALID_SEGMENT_NUMBER_LIST.contains(numberOfSegments)) {
throw new IllegalArgumentException("Number of segments for cache should be a power of two up-to 256");
if (!VALID_SEGMENT_COUNT_VALUES.contains(numberOfSegments)) {
throw new IllegalArgumentException(String.format(Locale.ROOT, INVALID_SEGMENT_NUMBER_EXCEPTION_MESSAGE, "Cache"));
}
this.numberOfSegments = numberOfSegments;
return this;
Expand Down
Loading

0 comments on commit bd1fce1

Please sign in to comment.