Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: store the cache using hashmaps in same place #2872

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ private Map<String, Object> runQueryWithLowLevelClient(String query) throws Atla
private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException {
DirectIndexQueryResult result = null;
try {
if(StringUtils.isNotEmpty(searchParams.getSearchContextId())) {
// If the search context id is present, then we need to delete the previous search context async
if(StringUtils.isNotEmpty(searchParams.getSearchContextId()) && searchParams.getSearchContextSequenceNo() != null) {
// If the search context id and greater sequence no is present, then we need to delete the previous search context async
processRequestWithSameSearchContextId(searchParams);
}
AsyncQueryResult response = submitAsyncSearch(searchParams, false).get();
Expand All @@ -187,12 +187,10 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP
*
*/
String esSearchId = response.getId();
if (StringUtils.isNotEmpty(searchParams.getSearchContextId())) {
if (searchParams.getSearchContextSequenceNo() != null) {
// If user pases the sequence number, then we need to update the cache to use it later to delete the search
SearchContextCache.putSequence(searchParams.getSearchContextId(), searchParams.getSearchContextSequenceNo());
}
SearchContextCache.put(searchParams.getSearchContextId(), esSearchId);
String searchContextId = searchParams.getSearchContextId();
Integer searchContextSequenceNo = searchParams.getSearchContextSequenceNo();
if (StringUtils.isNotEmpty(searchContextId) && searchContextSequenceNo != null) {
SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId);
}
response = getAsyncSearchResponse(searchParams, esSearchId).get();
if (response == null) {
Expand All @@ -211,8 +209,9 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP
}

/*
* Process the request with the same search context ID
* Process the request with the same search context ID and sequence number
* @param searchParams
* @return void
* Function to process the request with the same search context ID
* If the sequence number is greater than the cache sequence number,
* then we need to cancel the request and update the cache
Expand All @@ -222,26 +221,13 @@ private void processRequestWithSameSearchContextId(SearchParams searchParams) {
// Extract search context ID and sequence number
String currentSearchContextId = searchParams.getSearchContextId();
Integer currentSequenceNumber = searchParams.getSearchContextSequenceNo();
Integer cacheSequenceNumber = SearchContextCache.getSequence(currentSearchContextId);

// Check if cache entry exists for the given ID
boolean cacheEntryExists = SearchContextCache.get(currentSearchContextId) != null;

// Handle cases where sequence number is available and greater
if (currentSequenceNumber != null && cacheSequenceNumber!= null && currentSequenceNumber > cacheSequenceNumber) {
// Sequence number is greater, cancel the request and update cache
handleCacheUpdate(currentSearchContextId);
} else if (currentSequenceNumber == null || cacheEntryExists) {
handleCacheUpdate(currentSearchContextId);
}
}
private void handleCacheUpdate(String currentSearchContextId) {
// Retrieve existing search ID from cache
String esSearchId = SearchContextCache.get(currentSearchContextId);
// Get the search ID from the cache if sequence number is greater than the current sequence number
String previousESSearchId = SearchContextCache.getESAsyncSearchIdFromContextCache(currentSearchContextId, currentSequenceNumber);

// Check if search ID exists and delete if necessary
if (StringUtils.isNotEmpty(esSearchId)) {
deleteAsyncSearchResponse(esSearchId);
if (StringUtils.isNotEmpty(previousESSearchId)) {
LOG.debug("Deleting the previous async search response with ID {}", previousESSearchId);
// If the search ID exists, then we need to delete the search context
deleteAsyncSearchResponse(previousESSearchId);
SearchContextCache.remove(currentSearchContextId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,41 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import javax.annotation.ParametersAreNonnullByDefault;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class SearchContextCache {
private static final Cache<Object, Object> searchContextCache = CacheBuilder.newBuilder()
private static final Cache<String, HashMap<Integer, String>> searchContextCache = CacheBuilder.newBuilder()
.maximumSize(200)
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();

private static final Cache<Object, Object> searchContextSequenceCache = CacheBuilder.newBuilder()
.maximumSize(200)
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();

public static void put(String key, String value) {
searchContextCache.put(key, value);
public static void put(String key, Integer sequence, String esAsyncId) {
HashMap<Integer, String> entry = new HashMap<>();
entry.put(sequence, esAsyncId);
searchContextCache.put(key, entry);
}

public static void putSequence(String key, Integer value) {
searchContextSequenceCache.put(key, value);
}

public static Integer getSequence(String key) {
return (Integer) searchContextSequenceCache.getIfPresent(key);
public static HashMap<Integer, String> get(String key){
return searchContextCache.getIfPresent(key);
}

public static String get(String key){
return (String) searchContextCache.getIfPresent(key);
public static String getESAsyncSearchIdFromContextCache(String key, Integer sequence){
//Get the context cache for the given key
HashMap<Integer, String> contextCache = get(key);
if(contextCache == null || sequence == null){
return null;
}
//Find the highest sequence number
int maxStoredSequence = 0;
for (Integer seq : contextCache.keySet()) {
if (seq > maxStoredSequence) {
maxStoredSequence = seq;
}
}
//If the given sequence is greater than the max stored sequence, return the ESAsyncId else return null
return sequence > maxStoredSequence ? contextCache.getOrDefault(maxStoredSequence, null) : null;
}

public static void remove(String key) {
Expand Down
Loading