From 29adef55734966e79cb30ed99cc88c566c0cc658 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Wed, 1 May 2024 16:05:04 +0530 Subject: [PATCH 01/16] feat: run Redis ops async and reduce wait time for search cache --- .../service/redis/AbstractRedisService.java | 28 +++++++++++++++++-- .../atlas/service/redis/RedisServiceImpl.java | 2 ++ .../service/redis/RedisServiceLocalImpl.java | 1 + .../janus/AtlasElasticsearchQuery.java | 8 +++--- 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index 5bd2785ba0..3a4a4101ca 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -32,6 +32,8 @@ public abstract class AbstractRedisService implements RedisService { private static final String ATLAS_METASTORE_SERVICE = "atlas-metastore-service"; RedissonClient redisClient; + + RedissonClient searchContextCacheRedisClient; Map keyLockMap; Configuration atlasConfig; long waitTimeInMS; @@ -74,20 +76,20 @@ public void releaseDistributedLock(String key) { @Override public String getValue(String key) { // If value doesn't exist, return null else return the value - return (String) redisClient.getBucket(convertToNamespace(key)).get(); + return (String) searchContextCacheRedisClient.getBucket(convertToNamespace(key)).get(); } @Override public String putValue(String key, String value) { // Put the value in the redis cache with TTL - redisClient.getBucket(convertToNamespace(key)).set(value, 30, TimeUnit.SECONDS); + searchContextCacheRedisClient.getBucket(convertToNamespace(key)).set(value, 30, TimeUnit.SECONDS); return value; } @Override public void removeValue(String key) { // Remove the value from the redis cache - redisClient.getBucket(convertToNamespace(key)).delete(); + searchContextCacheRedisClient.getBucket(convertToNamespace(key)).delete(); } private String getHostAddress() throws UnknownHostException { @@ -136,6 +138,26 @@ protected Config getProdConfig() throws AtlasException { return config; } + protected Config getSearchContextCacheConfig() throws AtlasException { + Config config = new Config(); + config.useSentinelServers() + .setClientName(ATLAS_METASTORE_SERVICE) + .setReadMode(ReadMode.MASTER_SLAVE) + .setCheckSentinelsList(false) + .setKeepAlive(true) + .setMasterConnectionMinimumIdleSize(10) + .setMasterConnectionPoolSize(20) + .setSlaveConnectionMinimumIdleSize(10) + .setSlaveConnectionPoolSize(20) + .setMasterName(atlasConfig.getString(ATLAS_REDIS_MASTER_NAME)) + .addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS))) + .setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME)) + .setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD)) + .setTimeout(100) //Setting UP timeout to 100ms + .setRetryAttempts(0); + return config; + } + private String[] formatUrls(String[] urls) throws IllegalArgumentException { if (ArrayUtils.isEmpty(urls)) { getLogger().error("Invalid redis cluster urls"); diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java index 42dec6fa78..c553cacfdb 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java @@ -3,6 +3,7 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.redisson.Redisson; +import org.redisson.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -18,6 +19,7 @@ public class RedisServiceImpl extends AbstractRedisService{ @PostConstruct public void init() throws AtlasException { redisClient = Redisson.create(getProdConfig()); + searchContextCacheRedisClient = Redisson.create(getSearchContextCacheConfig()); LOG.info("Sentinel redis client created successfully."); } diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java index 1ddad0f3e8..61db05ee30 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java @@ -18,6 +18,7 @@ public class RedisServiceLocalImpl extends AbstractRedisService { @PostConstruct public void init() throws AtlasException { redisClient = Redisson.create(getLocalConfig()); + searchContextCacheRedisClient = Redisson.create(getLocalConfig()); LOG.info("Local redis client created successfully."); } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 51e1837975..02686c39a7 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -180,13 +180,13 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP if(contextIdExists) { // If the search context id and greater sequence no is present, then we need to delete the previous search context async try { - processRequestWithSameSearchContextId(searchParams); + CompletableFuture.runAsync(() -> processRequestWithSameSearchContextId(searchParams)); } catch (Exception e) { LOG.error("Failed to process the request with the same search context ID {}", e.getMessage()); } } AsyncQueryResult response = submitAsyncSearch(searchParams, false).get(); - + //Sleep for 5 seconds to allow ES to process the request if(response.isRunning()) { /* * If the response is still running, then we need to wait for the response @@ -199,7 +199,7 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP Integer searchContextSequenceNo = searchParams.getSearchContextSequenceNo(); if (contextIdExists) { try { - SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId); + CompletableFuture.runAsync(() -> SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId)); } catch (Exception e) { LOG.error("Failed to update the search context cache {}", e.getMessage()); } @@ -220,7 +220,7 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP if (contextIdExists) { // If the search context id is present, then we need to remove the search context from the cache try { - SearchContextCache.remove(searchParams.getSearchContextId()); + CompletableFuture.runAsync(() -> SearchContextCache.remove(searchParams.getSearchContextId())); } catch (Exception e) { LOG.error("Failed to remove the search context from the cache {}", e.getMessage()); } From 2d8cbd97c9dddee38c7ec99abd7ecae40e3f5138 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Wed, 1 May 2024 16:18:31 +0530 Subject: [PATCH 02/16] fix: just use new config --- .../org/apache/atlas/service/redis/AbstractRedisService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index 3a4a4101ca..bbb8f80242 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -121,7 +121,7 @@ Config getLocalConfig() throws AtlasException { } protected Config getProdConfig() throws AtlasException { - Config config = initAtlasConfig(); + Config config = new Config(); config.useSentinelServers() .setClientName(ATLAS_METASTORE_SERVICE) .setReadMode(ReadMode.MASTER_SLAVE) @@ -135,6 +135,7 @@ protected Config getProdConfig() throws AtlasException { .addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS))) .setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME)) .setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD)); + return config; } From 1cb04d5f3ef05514f75f4abcf0c1be55924150b0 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Wed, 1 May 2024 16:30:32 +0530 Subject: [PATCH 03/16] fix: update config --- .../apache/atlas/service/redis/AbstractRedisService.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index bbb8f80242..967c8fb5ec 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -120,8 +120,8 @@ Config getLocalConfig() throws AtlasException { return config; } - protected Config getProdConfig() throws AtlasException { - Config config = new Config(); + Config getProdConfig() throws AtlasException { + Config config = initAtlasConfig(); config.useSentinelServers() .setClientName(ATLAS_METASTORE_SERVICE) .setReadMode(ReadMode.MASTER_SLAVE) @@ -135,11 +135,10 @@ protected Config getProdConfig() throws AtlasException { .addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS))) .setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME)) .setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD)); - return config; } - protected Config getSearchContextCacheConfig() throws AtlasException { + Config getSearchContextCacheConfig() throws AtlasException { Config config = new Config(); config.useSentinelServers() .setClientName(ATLAS_METASTORE_SERVICE) From d38091a62a40f00d5820e539266987d9930c0cfb Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Wed, 1 May 2024 16:52:37 +0530 Subject: [PATCH 04/16] feat: update conetxt naming in redis client --- .../org/apache/atlas/service/redis/AbstractRedisService.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index 967c8fb5ec..59703f5f52 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -32,7 +32,6 @@ public abstract class AbstractRedisService implements RedisService { private static final String ATLAS_METASTORE_SERVICE = "atlas-metastore-service"; RedissonClient redisClient; - RedissonClient searchContextCacheRedisClient; Map keyLockMap; Configuration atlasConfig; @@ -138,10 +137,10 @@ Config getProdConfig() throws AtlasException { return config; } - Config getSearchContextCacheConfig() throws AtlasException { + Config getSearchContextCacheConfig() { Config config = new Config(); config.useSentinelServers() - .setClientName(ATLAS_METASTORE_SERVICE) + .setClientName(ATLAS_METASTORE_SERVICE+"-searchContextCache") .setReadMode(ReadMode.MASTER_SLAVE) .setCheckSentinelsList(false) .setKeepAlive(true) From df3864595ab9958245570d3458cbb3c6998f8b66 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Thu, 2 May 2024 18:32:19 +0530 Subject: [PATCH 05/16] fix: set redis timeout as 10 sec --- .../org/apache/atlas/service/redis/AbstractRedisService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index 59703f5f52..8238d32fbb 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -152,7 +152,7 @@ Config getSearchContextCacheConfig() { .addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS))) .setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME)) .setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD)) - .setTimeout(100) //Setting UP timeout to 100ms + .setTimeout(10) //Setting UP timeout to 10ms .setRetryAttempts(0); return config; } From 0f68575311822f621143771dddc68cd5de342866 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Thu, 2 May 2024 18:34:16 +0530 Subject: [PATCH 06/16] Revert "Revert "feat: Search cancellation from Metastore with global context cache using Redis"" --- .../service/redis/AbstractRedisService.java | 26 +++++- .../service/redis/NoRedisServiceImpl.java | 15 ++++ .../atlas/service/redis/RedisService.java | 6 ++ .../service/redis/RedisServiceLocalImpl.java | 15 ++++ graphdb/janus/pom.xml | 4 + .../janus/AtlasElasticsearchQuery.java | 56 +++++++++---- .../graphdb/janus/SearchContextCache.java | 81 +++++++++++++------ 7 files changed, 161 insertions(+), 42 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index 0ae20a60cc..5bd2785ba0 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -71,6 +71,25 @@ public void releaseDistributedLock(String key) { } } + @Override + public String getValue(String key) { + // If value doesn't exist, return null else return the value + return (String) redisClient.getBucket(convertToNamespace(key)).get(); + } + + @Override + public String putValue(String key, String value) { + // Put the value in the redis cache with TTL + redisClient.getBucket(convertToNamespace(key)).set(value, 30, TimeUnit.SECONDS); + return value; + } + + @Override + public void removeValue(String key) { + // Remove the value from the redis cache + redisClient.getBucket(convertToNamespace(key)).delete(); + } + private String getHostAddress() throws UnknownHostException { return InetAddress.getLocalHost().getHostAddress(); } @@ -85,6 +104,11 @@ private Config initAtlasConfig() throws AtlasException { return redisConfig; } + private String convertToNamespace(String key){ + // Append key with namespace :atlas + return "atlas:"+key; + } + Config getLocalConfig() throws AtlasException { Config config = initAtlasConfig(); config.useSingleServer() @@ -94,7 +118,7 @@ Config getLocalConfig() throws AtlasException { return config; } - Config getProdConfig() throws AtlasException { + protected Config getProdConfig() throws AtlasException { Config config = initAtlasConfig(); config.useSentinelServers() .setClientName(ATLAS_METASTORE_SERVICE) diff --git a/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java b/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java index 96a8fadc99..5bc089d955 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java @@ -29,6 +29,21 @@ public void releaseDistributedLock(String key) { //do nothing } + @Override + public String getValue(String key) { + return null; + } + + @Override + public String putValue(String key, String value) { + return null; + } + + @Override + public void removeValue(String key) { + + } + @Override public Logger getLogger() { return LOG; diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisService.java b/common/src/main/java/org/apache/atlas/service/redis/RedisService.java index 1475f93e83..6cb2f04ada 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisService.java @@ -8,6 +8,12 @@ public interface RedisService { void releaseDistributedLock(String key); + String getValue(String key); + + String putValue(String key, String value); + + void removeValue(String key); + Logger getLogger(); } diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java index 2eb774920e..1ddad0f3e8 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java @@ -21,6 +21,21 @@ public void init() throws AtlasException { LOG.info("Local redis client created successfully."); } + @Override + public String getValue(String key) { + return null; + } + + @Override + public String putValue(String key, String value) { + return null; + } + + @Override + public void removeValue(String key) { + + } + @Override public Logger getLogger() { return LOG; diff --git a/graphdb/janus/pom.xml b/graphdb/janus/pom.xml index 5daef76b4e..75c9079eee 100644 --- a/graphdb/janus/pom.xml +++ b/graphdb/janus/pom.xml @@ -282,6 +282,10 @@ mockito-all test + + org.apache.atlas + atlas-server-api + diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 9d2e2489e8..51e1837975 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -19,12 +19,14 @@ import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.SearchParams; import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.DirectIndexQueryResult; import org.apache.atlas.type.AtlasType; +import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.StringUtils; @@ -171,11 +173,17 @@ private Map runQueryWithLowLevelClient(String query) throws Atla } private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("performAsyncDirectIndexQuery"); DirectIndexQueryResult result = null; + boolean contextIdExists = StringUtils.isNotEmpty(searchParams.getSearchContextId()) && searchParams.getSearchContextSequenceNo() != null; try { - if(StringUtils.isNotEmpty(searchParams.getSearchContextId()) && searchParams.getSearchContextSequenceNo() != null) { + if(contextIdExists) { // If the search context id and greater sequence no is present, then we need to delete the previous search context async - processRequestWithSameSearchContextId(searchParams); + try { + processRequestWithSameSearchContextId(searchParams); + } catch (Exception e) { + LOG.error("Failed to process the request with the same search context ID {}", e.getMessage()); + } } AsyncQueryResult response = submitAsyncSearch(searchParams, false).get(); @@ -189,8 +197,12 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP String esSearchId = response.getId(); String searchContextId = searchParams.getSearchContextId(); Integer searchContextSequenceNo = searchParams.getSearchContextSequenceNo(); - if (StringUtils.isNotEmpty(searchContextId) && searchContextSequenceNo != null) { - SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId); + if (contextIdExists) { + try { + SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId); + } catch (Exception e) { + LOG.error("Failed to update the search context cache {}", e.getMessage()); + } } response = getAsyncSearchResponse(searchParams, esSearchId).get(); if (response == null) { @@ -204,6 +216,16 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP }catch (Exception e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); + } finally { + if (contextIdExists) { + // If the search context id is present, then we need to remove the search context from the cache + try { + SearchContextCache.remove(searchParams.getSearchContextId()); + } catch (Exception e) { + LOG.error("Failed to remove the search context from the cache {}", e.getMessage()); + } + } + RequestContext.get().endMetricRecord(metric); } return result; } @@ -218,17 +240,21 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP * We also need to check if the search ID exists and delete if necessary */ private void processRequestWithSameSearchContextId(SearchParams searchParams) { - // Extract search context ID and sequence number - String currentSearchContextId = searchParams.getSearchContextId(); - Integer currentSequenceNumber = searchParams.getSearchContextSequenceNo(); - // Get the search ID from the cache if sequence number is greater than the current sequence number - String previousESSearchId = SearchContextCache.getESAsyncSearchIdFromContextCache(currentSearchContextId, currentSequenceNumber); - - if (StringUtils.isNotEmpty(previousESSearchId)) { - LOG.debug("Deleting the previous async search response with ID {}", previousESSearchId); - // If the search ID exists, then we need to delete the search context - deleteAsyncSearchResponse(previousESSearchId); - SearchContextCache.remove(currentSearchContextId); + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("processRequestWithSameSearchContextId"); + try { + // Extract search context ID and sequence number + String currentSearchContextId = searchParams.getSearchContextId(); + Integer currentSequenceNumber = searchParams.getSearchContextSequenceNo(); + // Get the search ID from the cache if sequence number is greater than the current sequence number + String previousESSearchId = SearchContextCache.getESAsyncSearchIdFromContextCache(currentSearchContextId, currentSequenceNumber); + + if (StringUtils.isNotEmpty(previousESSearchId)) { + LOG.debug("Deleting the previous async search response with ID {}", previousESSearchId); + // If the search ID exists, then we need to delete the search context + deleteAsyncSearchResponse(previousESSearchId); + } + } finally { + RequestContext.get().endMetricRecord(metric); } } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java index 1780e5d1f9..fc627ef51f 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java @@ -2,48 +2,77 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import org.apache.atlas.RequestContext; +import org.apache.atlas.service.redis.AbstractRedisService; +import org.apache.atlas.service.redis.RedisService; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.slf4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +@Component public class SearchContextCache { - private static final Cache> searchContextCache = CacheBuilder.newBuilder() - .maximumSize(200) - .expireAfterWrite(30, TimeUnit.SECONDS) - .build(); + private static RedisService redisService = null; + + public static final String INVALID_SEQUENCE = "invalid_sequence"; + + + public SearchContextCache(@Qualifier("redisServiceImpl") RedisService redisService) { + SearchContextCache.redisService = redisService; + } + public static void put(String key, Integer sequence, String esAsyncId) { - HashMap entry = new HashMap<>(); - entry.put(sequence, esAsyncId); - searchContextCache.put(key, entry); + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("putInCache"); + try { + // Build the string in format `sequence/esAsyncId` and store it in redis + String val = sequence + "/" + esAsyncId; + redisService.putValue(key, val); + } finally { + RequestContext.get().endMetricRecord(metric); + } } - public static HashMap get(String key){ - return searchContextCache.getIfPresent(key); + public static String get(String key){ + return redisService.getValue(key); } public static String getESAsyncSearchIdFromContextCache(String key, Integer sequence){ - //Get the context cache for the given key - HashMap contextCache = get(key); - if(contextCache == null || sequence == null){ - return null; - } - //Find the highest sequence number - int maxStoredSequence = 0; - for (Integer seq : contextCache.keySet()) { - if (seq > maxStoredSequence) { - maxStoredSequence = seq; + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getESAsyncSearchIdFromContextCache"); + try { + //Get the context cache for the given key + String contextCache = get(key); + if(contextCache == null || sequence == null){ + return null; + } + // Split the context cache to get the sequence and ESAsyncId + String[] contextCacheSplit = contextCache.split("/"); + if(contextCacheSplit.length != 2){ + return null; } + int seq = Integer.parseInt(contextCacheSplit[0]); + if(sequence > seq){ + return contextCacheSplit[1]; + } else if (sequence < seq) { + return INVALID_SEQUENCE; + } + return null; + } finally { + RequestContext.get().endMetricRecord(metric); } - //If the given sequence is greater than the max stored sequence, return the ESAsyncId else return null - return sequence > maxStoredSequence ? contextCache.getOrDefault(maxStoredSequence, null) : null; - } - public static void remove(String key) { - searchContextCache.invalidate(key); } + public static void remove(String key) { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("removeFromCache"); + try { + redisService.removeValue(key); + } finally { + RequestContext.get().endMetricRecord(metric); + } - public static void clear() { - searchContextCache.cleanUp(); } } + From 93c08346bc1fbb1083fc679a90ccb026f79bc0bd Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Thu, 2 May 2024 21:04:34 +0530 Subject: [PATCH 07/16] feat: add failure metrics for redis --- .../service/metrics/MetricsRegistry.java | 2 +- .../metrics/MetricsRegistryServiceImpl.java | 67 ++++++++++--------- .../apache/atlas/utils/AtlasMetricType.java | 9 +++ .../apache/atlas/utils/AtlasPerfMetrics.java | 14 ++++ .../janus/AtlasElasticsearchQuery.java | 20 ++++-- .../java/org/apache/atlas/RequestContext.java | 2 +- 6 files changed, 77 insertions(+), 37 deletions(-) create mode 100644 common/src/main/java/org/apache/atlas/utils/AtlasMetricType.java diff --git a/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistry.java b/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistry.java index 9fdf5b903e..ef8a49951c 100644 --- a/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistry.java +++ b/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistry.java @@ -10,7 +10,7 @@ public interface MetricsRegistry { void collect(String requestId, String requestUri, AtlasPerfMetrics metrics); - void collectIndexsearch(String requestId, String requestUri, List applicationMetrics); + void collectIApplicationMetrics(String requestId, String requestUri, List applicationMetrics); void scrape(PrintWriter writer) throws IOException; diff --git a/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistryServiceImpl.java b/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistryServiceImpl.java index 1ae6d2980c..e732ad13f0 100644 --- a/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistryServiceImpl.java +++ b/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistryServiceImpl.java @@ -1,12 +1,10 @@ package org.apache.atlas.service.metrics; -import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Tags; -import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.*; import io.micrometer.prometheus.PrometheusMeterRegistry; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; +import org.apache.atlas.utils.AtlasMetricType; import org.apache.atlas.utils.AtlasPerfMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,35 +62,42 @@ public void collect(String requestId, String requestUri, AtlasPerfMetrics metric } } //Use this if you want to publish Histograms - public void collectIndexsearch(String requestId, String requestUri, List applicationMetrics){ + public void collectIApplicationMetrics(String requestId, String requestUri, List applicationMetrics){ try { for(AtlasPerfMetrics.Metric metric : applicationMetrics){ - Timer.builder(APPLICATION_LEVEL_METRICS_SUMMARY) - .serviceLevelObjectives( - Duration.ofMillis(500), - Duration.ofMillis(750), - Duration.ofMillis(1000), - Duration.ofMillis(1200), - Duration.ofMillis(1500), - Duration.ofSeconds(2), - Duration.ofSeconds(3), - Duration.ofSeconds(4), - Duration.ofSeconds(5), - Duration.ofSeconds(7), - Duration.ofSeconds(10), - Duration.ofSeconds(15), - Duration.ofSeconds(20), - Duration.ofSeconds(25), - Duration.ofSeconds(30), - Duration.ofSeconds(40), - Duration.ofSeconds(60), - Duration.ofSeconds(90), - Duration.ofSeconds(120), - Duration.ofSeconds(180) - ) - .publishPercentiles(PERCENTILES) - .tags(convertToMicrometerTags(metric.getTags())) - .register(getMeterRegistry()).record(metric.getTotalTimeMSecs(), TimeUnit.MILLISECONDS); + if (metric.getMetricType() == AtlasMetricType.COUNTER) { + Counter.builder(metric.getName()) + .tags(convertToMicrometerTags(metric.getTags())) + .register(getMeterRegistry()) + .increment(metric.getInvocations()); + } else { + Timer.builder(APPLICATION_LEVEL_METRICS_SUMMARY) + .serviceLevelObjectives( + Duration.ofMillis(500), + Duration.ofMillis(750), + Duration.ofMillis(1000), + Duration.ofMillis(1200), + Duration.ofMillis(1500), + Duration.ofSeconds(2), + Duration.ofSeconds(3), + Duration.ofSeconds(4), + Duration.ofSeconds(5), + Duration.ofSeconds(7), + Duration.ofSeconds(10), + Duration.ofSeconds(15), + Duration.ofSeconds(20), + Duration.ofSeconds(25), + Duration.ofSeconds(30), + Duration.ofSeconds(40), + Duration.ofSeconds(60), + Duration.ofSeconds(90), + Duration.ofSeconds(120), + Duration.ofSeconds(180) + ) + .publishPercentiles(PERCENTILES) + .tags(convertToMicrometerTags(metric.getTags())) + .register(getMeterRegistry()).record(metric.getTotalTimeMSecs(), TimeUnit.MILLISECONDS); + } } } catch (Exception e) { LOG.error("Failed to collect metrics", e); diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasMetricType.java b/common/src/main/java/org/apache/atlas/utils/AtlasMetricType.java new file mode 100644 index 0000000000..6752b7fbd4 --- /dev/null +++ b/common/src/main/java/org/apache/atlas/utils/AtlasMetricType.java @@ -0,0 +1,9 @@ +package org.apache.atlas.utils; + +public enum AtlasMetricType { + COUNTER, + GAUGE, + HISTOGRAM, + METER, + TIMER +} diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java index beebcb6ab1..dd8a101ad5 100644 --- a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java +++ b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java @@ -104,6 +104,8 @@ long getElapsedTime() { public static class Metric { private final String name; + + private AtlasMetricType metricType; private long invocations = 0; private long totalTimeMSecs = 0; HashMap tags = new HashMap<>(); @@ -112,6 +114,14 @@ public Metric(String name) { this.name = name; } + public void setMetricType(AtlasMetricType metricType) { + this.metricType = metricType; + } + + public AtlasMetricType getMetricType() { + return metricType; + } + public String getName() { return name; } @@ -135,5 +145,9 @@ public HashMap getTags() { return tags; } + public void incrementInvocations() { + invocations++; + } + } } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 02686c39a7..8f4719ee41 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -26,6 +26,7 @@ import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.DirectIndexQueryResult; import org.apache.atlas.type.AtlasType; +import org.apache.atlas.utils.AtlasMetricType; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.NotImplementedException; @@ -48,6 +49,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.janusgraph.util.encoding.LongEncoding; +import org.redisson.client.RedisException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,7 +182,7 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP if(contextIdExists) { // If the search context id and greater sequence no is present, then we need to delete the previous search context async try { - CompletableFuture.runAsync(() -> processRequestWithSameSearchContextId(searchParams)); + processRequestWithSameSearchContextId(searchParams); } catch (Exception e) { LOG.error("Failed to process the request with the same search context ID {}", e.getMessage()); } @@ -240,8 +242,9 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP * We also need to check if the search ID exists and delete if necessary */ private void processRequestWithSameSearchContextId(SearchParams searchParams) { - AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("processRequestWithSameSearchContextId"); try { + AtlasPerfMetrics.Metric metric = new AtlasPerfMetrics.Metric("process_async_request_count"); + metric.setMetricType(AtlasMetricType.COUNTER); // Extract search context ID and sequence number String currentSearchContextId = searchParams.getSearchContextId(); Integer currentSequenceNumber = searchParams.getSearchContextSequenceNo(); @@ -253,8 +256,17 @@ private void processRequestWithSameSearchContextId(SearchParams searchParams) { // If the search ID exists, then we need to delete the search context deleteAsyncSearchResponse(previousESSearchId); } - } finally { - RequestContext.get().endMetricRecord(metric); + metric.incrementInvocations(); + RequestContext.get().addApplicationMetrics(metric); + } catch (RedisException e) { + AtlasPerfMetrics.Metric metric = new AtlasPerfMetrics.Metric("async_request_redis_failure_counter"); + metric.setMetricType(AtlasMetricType.COUNTER); + metric.incrementInvocations(); + LOG.error("Failed to process the request with the same search context ID {}", e.getMessage()); + RequestContext.get().addApplicationMetrics(metric); + } + catch (Exception e) { + LOG.error("Failed to process the request with the same search context ID {}", e.getMessage()); } } diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index d27be16759..9fe4097f5f 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -177,7 +177,7 @@ public void clearCache() { } if (CollectionUtils.isNotEmpty(applicationMetrics)) { if (Objects.nonNull(this.metricsRegistry)){ - this.metricsRegistry.collectIndexsearch(traceId, this.requestUri, applicationMetrics); + this.metricsRegistry.collectIApplicationMetrics(traceId, this.requestUri, applicationMetrics); } applicationMetrics.clear(); } From fce4132f05fd994d1b784c4453f5bdd281da3dce Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Thu, 2 May 2024 21:13:14 +0530 Subject: [PATCH 08/16] feat: redis client rename --- .../apache/atlas/service/redis/AbstractRedisService.java | 8 ++++---- .../org/apache/atlas/service/redis/RedisServiceImpl.java | 3 +-- .../apache/atlas/service/redis/RedisServiceLocalImpl.java | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index 8238d32fbb..f7114f4f6a 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -32,7 +32,7 @@ public abstract class AbstractRedisService implements RedisService { private static final String ATLAS_METASTORE_SERVICE = "atlas-metastore-service"; RedissonClient redisClient; - RedissonClient searchContextCacheRedisClient; + RedissonClient redisCacheClient; Map keyLockMap; Configuration atlasConfig; long waitTimeInMS; @@ -75,20 +75,20 @@ public void releaseDistributedLock(String key) { @Override public String getValue(String key) { // If value doesn't exist, return null else return the value - return (String) searchContextCacheRedisClient.getBucket(convertToNamespace(key)).get(); + return (String) redisCacheClient.getBucket(convertToNamespace(key)).get(); } @Override public String putValue(String key, String value) { // Put the value in the redis cache with TTL - searchContextCacheRedisClient.getBucket(convertToNamespace(key)).set(value, 30, TimeUnit.SECONDS); + redisCacheClient.getBucket(convertToNamespace(key)).set(value, 30, TimeUnit.SECONDS); return value; } @Override public void removeValue(String key) { // Remove the value from the redis cache - searchContextCacheRedisClient.getBucket(convertToNamespace(key)).delete(); + redisCacheClient.getBucket(convertToNamespace(key)).delete(); } private String getHostAddress() throws UnknownHostException { diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java index c553cacfdb..1dcc6f56ba 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java @@ -3,7 +3,6 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.redisson.Redisson; -import org.redisson.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -19,7 +18,7 @@ public class RedisServiceImpl extends AbstractRedisService{ @PostConstruct public void init() throws AtlasException { redisClient = Redisson.create(getProdConfig()); - searchContextCacheRedisClient = Redisson.create(getSearchContextCacheConfig()); + redisCacheClient = Redisson.create(getSearchContextCacheConfig()); LOG.info("Sentinel redis client created successfully."); } diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java index 61db05ee30..c625f105f1 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java @@ -18,7 +18,7 @@ public class RedisServiceLocalImpl extends AbstractRedisService { @PostConstruct public void init() throws AtlasException { redisClient = Redisson.create(getLocalConfig()); - searchContextCacheRedisClient = Redisson.create(getLocalConfig()); + redisCacheClient = Redisson.create(getLocalConfig()); LOG.info("Local redis client created successfully."); } From 5232e56278aca0ce19512caf79a78488b3c2bc87 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Thu, 2 May 2024 21:31:56 +0530 Subject: [PATCH 09/16] nit: remove red. excp handling and rename cfg --- .../apache/atlas/service/redis/AbstractRedisService.java | 4 ++-- .../org/apache/atlas/service/redis/RedisServiceImpl.java | 2 +- .../graphdb/janus/AtlasElasticsearchQuery.java | 9 +++++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index f7114f4f6a..08d8b21c66 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -137,10 +137,10 @@ Config getProdConfig() throws AtlasException { return config; } - Config getSearchContextCacheConfig() { + Config getCacheImplConfig() { Config config = new Config(); config.useSentinelServers() - .setClientName(ATLAS_METASTORE_SERVICE+"-searchContextCache") + .setClientName(ATLAS_METASTORE_SERVICE+"-redisCache") .setReadMode(ReadMode.MASTER_SLAVE) .setCheckSentinelsList(false) .setKeepAlive(true) diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java index 1dcc6f56ba..48f199473e 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java @@ -18,7 +18,7 @@ public class RedisServiceImpl extends AbstractRedisService{ @PostConstruct public void init() throws AtlasException { redisClient = Redisson.create(getProdConfig()); - redisCacheClient = Redisson.create(getSearchContextCacheConfig()); + redisCacheClient = Redisson.create(getCacheImplConfig()); LOG.info("Sentinel redis client created successfully."); } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 8f4719ee41..5419da45c6 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -181,11 +181,7 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP try { if(contextIdExists) { // If the search context id and greater sequence no is present, then we need to delete the previous search context async - try { processRequestWithSameSearchContextId(searchParams); - } catch (Exception e) { - LOG.error("Failed to process the request with the same search context ID {}", e.getMessage()); - } } AsyncQueryResult response = submitAsyncSearch(searchParams, false).get(); //Sleep for 5 seconds to allow ES to process the request @@ -242,6 +238,8 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP * We also need to check if the search ID exists and delete if necessary */ private void processRequestWithSameSearchContextId(SearchParams searchParams) { + AtlasPerfMetrics.MetricRecorder funcMetric = RequestContext.get().startMetricRecord("processRequestWithSameSearchContextId"); + try { AtlasPerfMetrics.Metric metric = new AtlasPerfMetrics.Metric("process_async_request_count"); metric.setMetricType(AtlasMetricType.COUNTER); @@ -268,6 +266,9 @@ private void processRequestWithSameSearchContextId(SearchParams searchParams) { catch (Exception e) { LOG.error("Failed to process the request with the same search context ID {}", e.getMessage()); } + finally { + RequestContext.get().endMetricRecord(funcMetric); + } } /* From da3191cb62bbcb59d036739c1daab5c65c5d9121 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Thu, 2 May 2024 23:45:44 +0530 Subject: [PATCH 10/16] fix: remove redundant search metric --- .../graphdb/janus/AtlasElasticsearchQuery.java | 13 ++++--------- pom.xml | 2 +- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 5419da45c6..a291ac40a0 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -239,10 +239,7 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP */ private void processRequestWithSameSearchContextId(SearchParams searchParams) { AtlasPerfMetrics.MetricRecorder funcMetric = RequestContext.get().startMetricRecord("processRequestWithSameSearchContextId"); - try { - AtlasPerfMetrics.Metric metric = new AtlasPerfMetrics.Metric("process_async_request_count"); - metric.setMetricType(AtlasMetricType.COUNTER); // Extract search context ID and sequence number String currentSearchContextId = searchParams.getSearchContextId(); Integer currentSequenceNumber = searchParams.getSearchContextSequenceNo(); @@ -254,14 +251,12 @@ private void processRequestWithSameSearchContextId(SearchParams searchParams) { // If the search ID exists, then we need to delete the search context deleteAsyncSearchResponse(previousESSearchId); } - metric.incrementInvocations(); - RequestContext.get().addApplicationMetrics(metric); } catch (RedisException e) { - AtlasPerfMetrics.Metric metric = new AtlasPerfMetrics.Metric("async_request_redis_failure_counter"); - metric.setMetricType(AtlasMetricType.COUNTER); - metric.incrementInvocations(); + AtlasPerfMetrics.Metric failureCounter = new AtlasPerfMetrics.Metric("async_request_redis_failure_counter"); + failureCounter.setMetricType(AtlasMetricType.COUNTER); + failureCounter.incrementInvocations(); LOG.error("Failed to process the request with the same search context ID {}", e.getMessage()); - RequestContext.get().addApplicationMetrics(metric); + RequestContext.get().addApplicationMetrics(failureCounter); } catch (Exception e) { LOG.error("Failed to process the request with the same search context ID {}", e.getMessage()); diff --git a/pom.xml b/pom.xml index 1cc9aa70dc..3abc39041b 100644 --- a/pom.xml +++ b/pom.xml @@ -717,7 +717,7 @@ 4.4.13 2.12.4 2.12.4 - 0.6.03 + 0.6.04 0.5.3 1 3.1.0 From 686a9e23a652963d28641e63b41cdc8581355749 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Fri, 3 May 2024 00:01:07 +0530 Subject: [PATCH 11/16] feat: add local search context cache as backup --- .../graphdb/janus/SearchContextCache.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java index fc627ef51f..0d6e172cf6 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java @@ -3,21 +3,22 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.atlas.RequestContext; -import org.apache.atlas.service.redis.AbstractRedisService; import org.apache.atlas.service.redis.RedisService; import org.apache.atlas.utils.AtlasPerfMetrics; -import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; @Component public class SearchContextCache { private static RedisService redisService = null; + private static final Cache searchContextLocalCache = CacheBuilder.newBuilder() + .maximumSize(200) + .expireAfterWrite(30, TimeUnit.SECONDS) + .build(); + public static final String INVALID_SEQUENCE = "invalid_sequence"; @@ -32,12 +33,23 @@ public static void put(String key, Integer sequence, String esAsyncId) { // Build the string in format `sequence/esAsyncId` and store it in redis String val = sequence + "/" + esAsyncId; redisService.putValue(key, val); + searchContextLocalCache.put(key, val); } finally { RequestContext.get().endMetricRecord(metric); } } - public static String get(String key){ - return redisService.getValue(key); + public static String get(String key) { + String ret = null; + try { + ret = searchContextLocalCache.getIfPresent(key); + if (ret == null) { + ret = redisService.getValue(key); + } + return ret; + } catch (Exception e) { + return null; + } + } public static String getESAsyncSearchIdFromContextCache(String key, Integer sequence){ @@ -68,6 +80,7 @@ public static String getESAsyncSearchIdFromContextCache(String key, Integer sequ public static void remove(String key) { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("removeFromCache"); try { + searchContextLocalCache.invalidate(key); redisService.removeValue(key); } finally { RequestContext.get().endMetricRecord(metric); From 5bf86cf3a0f95325b0a75d015ebcccbb17d1e1dc Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Fri, 3 May 2024 00:02:18 +0530 Subject: [PATCH 12/16] Update pom.xml --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3abc39041b..1cc9aa70dc 100644 --- a/pom.xml +++ b/pom.xml @@ -717,7 +717,7 @@ 4.4.13 2.12.4 2.12.4 - 0.6.04 + 0.6.03 0.5.3 1 3.1.0 From c87cef6f7c229c4ef5375b617abffa1bcdb66d8d Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Fri, 3 May 2024 02:16:10 +0530 Subject: [PATCH 13/16] fix: update timeout to 100ms --- .../org/apache/atlas/service/redis/AbstractRedisService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index 08d8b21c66..40634e6dc1 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -152,7 +152,7 @@ Config getCacheImplConfig() { .addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS))) .setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME)) .setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD)) - .setTimeout(10) //Setting UP timeout to 10ms + .setTimeout(100) //Setting UP timeout to 10ms .setRetryAttempts(0); return config; } From 3a46932a415b6c8b7f3bcca9f3688c78c2f7c97f Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Fri, 3 May 2024 03:22:35 +0530 Subject: [PATCH 14/16] feat: update the timeout to 50 --- .../org/apache/atlas/service/redis/AbstractRedisService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index 40634e6dc1..b259c32c16 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -152,7 +152,7 @@ Config getCacheImplConfig() { .addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS))) .setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME)) .setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD)) - .setTimeout(100) //Setting UP timeout to 10ms + .setTimeout(50) //Setting UP timeout to 10ms .setRetryAttempts(0); return config; } From eacf1c701d9a787636fe377505112ddd4357408e Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Fri, 3 May 2024 03:44:07 +0530 Subject: [PATCH 15/16] fix: remove local cache --- .../graphdb/janus/SearchContextCache.java | 20 +------------------ 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java index 0d6e172cf6..0b80ce0334 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java @@ -1,24 +1,13 @@ package org.apache.atlas.repository.graphdb.janus; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import org.apache.atlas.RequestContext; import org.apache.atlas.service.redis.RedisService; import org.apache.atlas.utils.AtlasPerfMetrics; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; - -import java.util.concurrent.TimeUnit; - @Component public class SearchContextCache { private static RedisService redisService = null; - private static final Cache searchContextLocalCache = CacheBuilder.newBuilder() - .maximumSize(200) - .expireAfterWrite(30, TimeUnit.SECONDS) - .build(); - public static final String INVALID_SEQUENCE = "invalid_sequence"; @@ -33,19 +22,13 @@ public static void put(String key, Integer sequence, String esAsyncId) { // Build the string in format `sequence/esAsyncId` and store it in redis String val = sequence + "/" + esAsyncId; redisService.putValue(key, val); - searchContextLocalCache.put(key, val); } finally { RequestContext.get().endMetricRecord(metric); } } public static String get(String key) { - String ret = null; try { - ret = searchContextLocalCache.getIfPresent(key); - if (ret == null) { - ret = redisService.getValue(key); - } - return ret; + return redisService.getValue(key); } catch (Exception e) { return null; } @@ -80,7 +63,6 @@ public static String getESAsyncSearchIdFromContextCache(String key, Integer sequ public static void remove(String key) { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("removeFromCache"); try { - searchContextLocalCache.invalidate(key); redisService.removeValue(key); } finally { RequestContext.get().endMetricRecord(metric); From e129a391c5d09802814ee990178f25f8e2cdfee3 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Fri, 3 May 2024 11:37:43 +0530 Subject: [PATCH 16/16] fix: review fix --- .../apache/atlas/service/metrics/MetricsRegistry.java | 2 +- .../service/metrics/MetricsRegistryServiceImpl.java | 2 +- .../atlas/service/redis/AbstractRedisService.java | 2 +- .../graphdb/janus/AtlasElasticsearchQuery.java | 10 +++------- .../repository/graphdb/janus/SearchContextCache.java | 4 ++++ .../src/main/java/org/apache/atlas/RequestContext.java | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistry.java b/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistry.java index ef8a49951c..8974155487 100644 --- a/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistry.java +++ b/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistry.java @@ -10,7 +10,7 @@ public interface MetricsRegistry { void collect(String requestId, String requestUri, AtlasPerfMetrics metrics); - void collectIApplicationMetrics(String requestId, String requestUri, List applicationMetrics); + void collectApplicationMetrics(String requestId, String requestUri, List applicationMetrics); void scrape(PrintWriter writer) throws IOException; diff --git a/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistryServiceImpl.java b/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistryServiceImpl.java index e732ad13f0..19171325e2 100644 --- a/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistryServiceImpl.java +++ b/common/src/main/java/org/apache/atlas/service/metrics/MetricsRegistryServiceImpl.java @@ -62,7 +62,7 @@ public void collect(String requestId, String requestUri, AtlasPerfMetrics metric } } //Use this if you want to publish Histograms - public void collectIApplicationMetrics(String requestId, String requestUri, List applicationMetrics){ + public void collectApplicationMetrics(String requestId, String requestUri, List applicationMetrics){ try { for(AtlasPerfMetrics.Metric metric : applicationMetrics){ if (metric.getMetricType() == AtlasMetricType.COUNTER) { diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index b259c32c16..efe90aa887 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -152,7 +152,7 @@ Config getCacheImplConfig() { .addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS))) .setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME)) .setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD)) - .setTimeout(50) //Setting UP timeout to 10ms + .setTimeout(50) //Setting UP timeout to 50ms .setRetryAttempts(0); return config; } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index a291ac40a0..c303029146 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -180,11 +180,11 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP boolean contextIdExists = StringUtils.isNotEmpty(searchParams.getSearchContextId()) && searchParams.getSearchContextSequenceNo() != null; try { if(contextIdExists) { - // If the search context id and greater sequence no is present, then we need to delete the previous search context async + // If the search context id and greater sequence no is present, + // then we need to delete the previous search context async processRequestWithSameSearchContextId(searchParams); } AsyncQueryResult response = submitAsyncSearch(searchParams, false).get(); - //Sleep for 5 seconds to allow ES to process the request if(response.isRunning()) { /* * If the response is still running, then we need to wait for the response @@ -196,11 +196,7 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP String searchContextId = searchParams.getSearchContextId(); Integer searchContextSequenceNo = searchParams.getSearchContextSequenceNo(); if (contextIdExists) { - try { - CompletableFuture.runAsync(() -> SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId)); - } catch (Exception e) { - LOG.error("Failed to update the search context cache {}", e.getMessage()); - } + CompletableFuture.runAsync(() -> SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId)); } response = getAsyncSearchResponse(searchParams, esSearchId).get(); if (response == null) { diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java index 0b80ce0334..06937f4646 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/SearchContextCache.java @@ -2,10 +2,13 @@ import org.apache.atlas.RequestContext; import org.apache.atlas.service.redis.RedisService; import org.apache.atlas.utils.AtlasPerfMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; @Component public class SearchContextCache { + private static final Logger LOG = LoggerFactory.getLogger(SearchContextCache.class); private static RedisService redisService = null; public static final String INVALID_SEQUENCE = "invalid_sequence"; @@ -30,6 +33,7 @@ public static String get(String key) { try { return redisService.getValue(key); } catch (Exception e) { + LOG.error("Error while fetching value from Redis", e); return null; } diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index 9fe4097f5f..565832b7bd 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -177,7 +177,7 @@ public void clearCache() { } if (CollectionUtils.isNotEmpty(applicationMetrics)) { if (Objects.nonNull(this.metricsRegistry)){ - this.metricsRegistry.collectIApplicationMetrics(traceId, this.requestUri, applicationMetrics); + this.metricsRegistry.collectApplicationMetrics(traceId, this.requestUri, applicationMetrics); } applicationMetrics.clear(); }