Skip to content

Commit

Permalink
Merge branch 'cache_key_serialization' into feature/tiered-caching
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Nov 6, 2023
2 parents 27cc265 + 40cc1e2 commit d8421ac
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 41 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# CHANGELOG
All notable changes to this project are documented in this file.

Expand Down Expand Up @@ -99,6 +100,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Tiered caching] Framework changes ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753)
- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,45 @@ public void testProfileDisableCache() throws Exception {
}
}

public void testCacheWithInvalidation() throws Exception {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get()
);
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
OpenSearchAssertions.assertAllSuccessful(resp);
assertThat(resp.getHits().getTotalHits().value, equalTo(1L));

assertCacheState(client, "index", 0, 1);
// Index but don't refresh
indexRandom(false, client.prepareIndex("index").setSource("k", "hello2"));
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect hit as here as refresh didn't happen
assertCacheState(client, "index", 1, 1);

// Explicit refresh would invalidate cache
refresh();
// Hit same query again
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh)
assertCacheState(client, "index", 1, 2);
}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
Expand All @@ -648,6 +687,7 @@ private static void assertCacheState(Client client, String index, long expectedH
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions())
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.core.index.shard.ShardId;

import java.io.IOException;
import java.util.UUID;

/**
* A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes
Expand All @@ -51,11 +52,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader {
private final ShardId shardId;
private final FilterDirectoryReader.SubReaderWrapper wrapper;

private DelegatingCacheHelper delegatingCacheHelper;

private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId)
throws IOException {
super(in, wrapper);
this.wrapper = wrapper;
this.shardId = shardId;
this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper());
}

/**
Expand All @@ -68,7 +72,61 @@ public ShardId shardId() {
@Override
public CacheHelper getReaderCacheHelper() {
// safe to delegate since this reader does not alter the index
return in.getReaderCacheHelper();
return this.delegatingCacheHelper;
}

public DelegatingCacheHelper getDelegatingCacheHelper() {
return this.delegatingCacheHelper;
}

/**
* Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey.
* @opensearch.internal
*/
public class DelegatingCacheHelper implements CacheHelper {
CacheHelper cacheHelper;
DelegatingCacheKey serializableCacheKey;

DelegatingCacheHelper(CacheHelper cacheHelper) {
this.cacheHelper = cacheHelper;
this.serializableCacheKey = new DelegatingCacheKey(cacheHelper.getKey());
}

@Override
public CacheKey getKey() {
return this.cacheHelper.getKey();
}

public DelegatingCacheKey getDelegatingCacheKey() {
return this.serializableCacheKey;
}

@Override
public void addClosedListener(ClosedListener listener) {
this.cacheHelper.addClosedListener(listener);
}
}

/**
* Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of
* object itself for serialization purposes.
*/
public class DelegatingCacheKey {
CacheKey cacheKey;
private final String uniqueId;

DelegatingCacheKey(CacheKey cacheKey) {
this.cacheKey = cacheKey;
this.uniqueId = UUID.randomUUID().toString();
}

public CacheKey getCacheKey() {
return this.cacheKey;
}

public String getId() {
return uniqueId;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.unit.ByteSizeValue;

import java.io.Closeable;
Expand Down Expand Up @@ -112,7 +115,9 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
private final TimeValue expire;
private final TieredCacheService<Key, BytesReference> tieredCacheService;

IndicesRequestCache(Settings settings) {
private final IndicesService indicesService;

IndicesRequestCache(Settings settings, IndicesService indicesService) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
long sizeInBytes = size.getBytes();
Expand All @@ -126,6 +131,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
tieredCacheService = new TieredCacheSpilloverStrategyService.Builder<Key, BytesReference>().setOnHeapCachingTier(
openSearchOnHeapCache
).setTieredCacheEventListener(this).build();
this.indicesService = indicesService;
}

@Override
Expand Down Expand Up @@ -165,12 +171,18 @@ BytesReference getOrCompute(
BytesReference cacheKey
) throws Exception {
assert reader.getReaderCacheHelper() != null;
final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey);
assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper;

OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader
.getReaderCacheHelper();
String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId();
assert readerCacheKeyId != null;
final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyId);
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = tieredCacheService.computeIfAbsent(key, cacheLoader);
if (cacheLoader.isLoaded()) {
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey());
CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId);
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
if (previous == null) {
Expand All @@ -189,7 +201,12 @@ BytesReference getOrCompute(
*/
void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) {
assert reader.getReaderCacheHelper() != null;
tieredCacheService.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey));
String readerCacheKeyId = null;
if (reader instanceof OpenSearchDirectoryReader) {
IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper();
readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId();
}
tieredCacheService.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId));
}

/**
Expand Down Expand Up @@ -223,7 +240,7 @@ public BytesReference load(Key key) throws Exception {
/**
* Basic interface to make this cache testable.
*/
interface CacheEntity extends Accountable {
interface CacheEntity extends Accountable, Writeable {

/**
* Called after the value was loaded.
Expand Down Expand Up @@ -256,24 +273,31 @@ interface CacheEntity extends Accountable {
* Called when this entity instance is removed
*/
void onRemoval(RemovalNotification<Key, BytesReference> notification);

}

/**
* Unique key for the cache
*
* @opensearch.internal
*/
public static class Key implements Accountable {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);
class Key implements Accountable, Writeable {
private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);

public final CacheEntity entity; // use as identity equality
public final IndexReader.CacheKey readerCacheKey;
public final String readerCacheKeyId;
public final BytesReference value;

Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) {
Key(CacheEntity entity, BytesReference value, String readerCacheKeyId) {
this.entity = entity;
this.readerCacheKey = Objects.requireNonNull(readerCacheKey);
this.value = value;
this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId);
}

Key(StreamInput in) throws IOException {
this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1));
this.readerCacheKeyId = in.readOptionalString();
this.value = in.readBytesReference();
}

@Override
Expand All @@ -292,7 +316,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Key key = (Key) o;
if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false;
if (Objects.equals(readerCacheKeyId, key.readerCacheKeyId) == false) return false;
if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false;
if (!value.equals(key.value)) return false;
return true;
Expand All @@ -301,19 +325,26 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
int result = entity.getCacheIdentity().hashCode();
result = 31 * result + readerCacheKey.hashCode();
result = 31 * result + readerCacheKeyId.hashCode();
result = 31 * result + value.hashCode();
return result;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(entity);
out.writeOptionalString(readerCacheKeyId);
out.writeBytesReference(value);
}
}

private class CleanupKey implements IndexReader.ClosedListener {
final CacheEntity entity;
final IndexReader.CacheKey readerCacheKey;
final String readerCacheKeyId;

private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) {
private CleanupKey(CacheEntity entity, String readerCacheKeyId) {
this.entity = entity;
this.readerCacheKey = readerCacheKey;
this.readerCacheKeyId = readerCacheKeyId;
}

@Override
Expand All @@ -331,15 +362,15 @@ public boolean equals(Object o) {
return false;
}
CleanupKey that = (CleanupKey) o;
if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false;
if (Objects.equals(readerCacheKeyId, that.readerCacheKeyId) == false) return false;
if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false;
return true;
}

@Override
public int hashCode() {
int result = entity.getCacheIdentity().hashCode();
result = 31 * result + Objects.hashCode(readerCacheKey);
result = 31 * result + Objects.hashCode(readerCacheKeyId);
return result;
}
}
Expand All @@ -355,7 +386,7 @@ synchronized void cleanCache() {
for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext();) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) {
if (cleanupKey.readerCacheKeyId == null || cleanupKey.entity.isOpen() == false) {
// null indicates full cleanup, as does a closed shard
currentFullClean.add(cleanupKey.entity.getCacheIdentity());
} else {
Expand All @@ -368,7 +399,7 @@ synchronized void cleanCache() {
if (currentFullClean.contains(key.entity.getCacheIdentity())) {
iterator.remove();
} else {
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) {
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyId))) {
iterator.remove();
}
}
Expand Down
21 changes: 17 additions & 4 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public IndicesService(
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
this.analysisRegistry = analysisRegistry;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indicesRequestCache = new IndicesRequestCache(settings);
this.indicesRequestCache = new IndicesRequestCache(settings, this);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = mapperRegistry;
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -1762,14 +1762,21 @@ private BytesReference cacheShardLevelResult(
*
* @opensearch.internal
*/
static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class);
public final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity {
private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class);
private final IndexShard indexShard;

protected IndexShardCacheEntity(IndexShard indexShard) {
public IndexShardCacheEntity(IndexShard indexShard) {
this.indexShard = indexShard;
}

public IndexShardCacheEntity(StreamInput in) throws IOException {
Index index = in.readOptionalWriteable(Index::new);
int shardId = in.readVInt();
IndexService indexService = indices.get(index.getUUID());
this.indexShard = Optional.ofNullable(indexService).map(indexService1 -> indexService1.getShard(shardId)).orElse(null);
}

@Override
protected ShardRequestCache stats() {
return indexShard.requestCache();
Expand All @@ -1791,6 +1798,12 @@ public long ramBytesUsed() {
// across many entities
return BASE_RAM_BYTES_USED;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(indexShard.shardId().getIndex());
out.writeVInt(indexShard.shardId().id());
}
}

@FunctionalInterface
Expand Down
Loading

0 comments on commit d8421ac

Please sign in to comment.