From ed0759635556eb79b7742d1a21292feca94aea04 Mon Sep 17 00:00:00 2001 From: Pavan Manish Date: Sun, 2 Mar 2025 01:51:43 +0530 Subject: [PATCH 1/8] Added new Error code for service unavailable --- .../janus/AtlasElasticsearchQuery.java | 24 +++++++++++++++++++ .../java/org/apache/atlas/AtlasErrorCode.java | 1 + 2 files changed, 25 insertions(+) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 4303fd1a43..09e65ae49e 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -143,6 +143,12 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar } } catch (IOException e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); + + if (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || e.getMessage().contains("Network error"))) { + throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); + } + throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); } } @@ -170,6 +176,12 @@ private Map runQueryWithLowLevelClient(String query) throws Atla } catch (IOException e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); + + if (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || e.getMessage().contains("Network error"))) { + throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); + } + throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); } } @@ -183,6 +195,12 @@ private Map runUpdateByQueryWithLowLevelClient(String que } catch (IOException e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); + + if (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || e.getMessage().contains("Network error"))) { + throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); + } + throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); } } @@ -234,6 +252,12 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP } }catch (Exception e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); + + if (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || e.getMessage().contains("Network error"))) { + throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); + } + throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); } finally { if (contextIdExists) { diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 20bbf456bc..19261fa25f 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -279,6 +279,7 @@ public enum AtlasErrorCode { KEYCLOAK_INIT_FAILED(500, "ATLAS-500-00-022", "Failed to initialize keycloak client: {0}"), MAINTENANCE_MODE_ENABLED(503, "ATLAS-503-00-001", "Atlas is in maintenance mode for this specific operation. Please try again later."), + SERVICE_UNAVAILABLE(503, "ATLAS-503-00-002", "Service is unavailable or a network error occurred: {0}"), BATCH_SIZE_TOO_LARGE(406, "ATLAS-406-00-001", "Batch size is too large, please use a smaller batch size"), From ea274b8517b4883cdedf17eb3f09257698755bc7 Mon Sep 17 00:00:00 2001 From: Pavan Manish Date: Sun, 2 Mar 2025 02:25:28 +0530 Subject: [PATCH 2/8] Socket Timeout Exception --- .../janus/AtlasElasticsearchQuery.java | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 09e65ae49e..a13cc05d05 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -54,6 +54,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -144,8 +146,11 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar } catch (IOException e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); - if (e.getMessage() != null && - (e.getMessage().contains("Connection reset by peer") || e.getMessage().contains("Network error"))) { + if (e instanceof SocketTimeoutException || + e instanceof UnknownHostException || + (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || + e.getMessage().contains("Network error")))) { throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); } @@ -177,8 +182,11 @@ private Map runQueryWithLowLevelClient(String query) throws Atla } catch (IOException e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); - if (e.getMessage() != null && - (e.getMessage().contains("Connection reset by peer") || e.getMessage().contains("Network error"))) { + if (e instanceof SocketTimeoutException || + e instanceof UnknownHostException || + (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || + e.getMessage().contains("Network error")))) { throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); } @@ -196,8 +204,11 @@ private Map runUpdateByQueryWithLowLevelClient(String que } catch (IOException e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); - if (e.getMessage() != null && - (e.getMessage().contains("Connection reset by peer") || e.getMessage().contains("Network error"))) { + if (e instanceof SocketTimeoutException || + e instanceof UnknownHostException || + (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || + e.getMessage().contains("Network error")))) { throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); } @@ -253,8 +264,11 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP }catch (Exception e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); - if (e.getMessage() != null && - (e.getMessage().contains("Connection reset by peer") || e.getMessage().contains("Network error"))) { + if (e instanceof SocketTimeoutException || + e instanceof UnknownHostException || + (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || + e.getMessage().contains("Network error")))) { throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); } From b28cb2f64a60ad013dc0f3e1adc731c53354166c Mon Sep 17 00:00:00 2001 From: Pavan Manish Date: Mon, 3 Mar 2025 11:50:37 +0530 Subject: [PATCH 3/8] Removed repeated code and throw with a detailed message --- .../janus/AtlasElasticsearchQuery.java | 46 ++++++++----------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index a13cc05d05..b3e37d8bc7 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -146,13 +146,7 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar } catch (IOException e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); - if (e instanceof SocketTimeoutException || - e instanceof UnknownHostException || - (e.getMessage() != null && - (e.getMessage().contains("Connection reset by peer") || - e.getMessage().contains("Network error")))) { - throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); - } + handleNetworkErrors(e); throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); } @@ -182,13 +176,7 @@ private Map runQueryWithLowLevelClient(String query) throws Atla } catch (IOException e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); - if (e instanceof SocketTimeoutException || - e instanceof UnknownHostException || - (e.getMessage() != null && - (e.getMessage().contains("Connection reset by peer") || - e.getMessage().contains("Network error")))) { - throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); - } + handleNetworkErrors(e); throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); } @@ -204,13 +192,7 @@ private Map runUpdateByQueryWithLowLevelClient(String que } catch (IOException e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); - if (e instanceof SocketTimeoutException || - e instanceof UnknownHostException || - (e.getMessage() != null && - (e.getMessage().contains("Connection reset by peer") || - e.getMessage().contains("Network error")))) { - throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); - } + handleNetworkErrors(e); throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); } @@ -264,12 +246,8 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP }catch (Exception e) { LOG.error("Failed to execute direct query on ES {}", e.getMessage()); - if (e instanceof SocketTimeoutException || - e instanceof UnknownHostException || - (e.getMessage() != null && - (e.getMessage().contains("Connection reset by peer") || - e.getMessage().contains("Network error")))) { - throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, e.getMessage()); + if (e instanceof IOException) { + handleNetworkErrors((IOException) e); } throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); @@ -287,6 +265,20 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP return result; } + /* + * Checks if the exception is a network-related issue and throws a SERVICE_UNAVAILABLE error. + */ + private void handleNetworkErrors(IOException e) throws AtlasBaseException { + if (e instanceof SocketTimeoutException || + e instanceof UnknownHostException || + (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || + e.getMessage().contains("Network error")))) { + throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, + "Service is unavailable or a network error occurred: Elasticsearch - " + e.getMessage()); + } + } + /* * Process the request with the same search context ID and sequence number * @param searchParams From ef00f80b709cbb3b46f63066aa4232a6b371a513 Mon Sep 17 00:00:00 2001 From: Pavan Manish Date: Mon, 3 Mar 2025 13:19:20 +0530 Subject: [PATCH 4/8] Added extra network error --- .../repository/graphdb/janus/AtlasElasticsearchQuery.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index b3e37d8bc7..ea9cec7778 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -273,7 +273,8 @@ private void handleNetworkErrors(IOException e) throws AtlasBaseException { e instanceof UnknownHostException || (e.getMessage() != null && (e.getMessage().contains("Connection reset by peer") || - e.getMessage().contains("Network error")))) { + e.getMessage().contains("Network error") || + e.getMessage().contains("Connection refused")))) { throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, "Service is unavailable or a network error occurred: Elasticsearch - " + e.getMessage()); } From 4cbca2a215110043904b9f97b9f1d85dcbbc6d07 Mon Sep 17 00:00:00 2001 From: Pavan Manish Date: Tue, 4 Mar 2025 00:18:18 +0530 Subject: [PATCH 5/8] Gather network error messages --- .../graphdb/janus/AtlasElasticsearchQuery.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index ea9cec7778..a1d8c689b5 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -268,13 +268,14 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP /* * Checks if the exception is a network-related issue and throws a SERVICE_UNAVAILABLE error. */ - private void handleNetworkErrors(IOException e) throws AtlasBaseException { - if (e instanceof SocketTimeoutException || - e instanceof UnknownHostException || - (e.getMessage() != null && - (e.getMessage().contains("Connection reset by peer") || - e.getMessage().contains("Network error") || - e.getMessage().contains("Connection refused")))) { + private void handleNetworkErrors(Exception e) throws AtlasBaseException { + List NETWORK_ERROR_MESSAGES = Arrays.asList( + "Connection reset by peer", + "Network error", + "Connection refused" + ); + if (e instanceof SocketTimeoutException || e instanceof UnknownHostException || + (e.getMessage() != null && NETWORK_ERROR_MESSAGES.stream().anyMatch(e.getMessage()::contains))) { throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, "Service is unavailable or a network error occurred: Elasticsearch - " + e.getMessage()); } From b21b5a44b3843f2ac3fd87850e26a6bdca725ae3 Mon Sep 17 00:00:00 2001 From: Pavan Manish Date: Tue, 4 Mar 2025 00:39:13 +0530 Subject: [PATCH 6/8] Added static variable --- .../graphdb/janus/AtlasElasticsearchQuery.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index a1d8c689b5..3c2cfc2850 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -265,15 +265,15 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP return result; } - /* - * Checks if the exception is a network-related issue and throws a SERVICE_UNAVAILABLE error. - */ - private void handleNetworkErrors(Exception e) throws AtlasBaseException { - List NETWORK_ERROR_MESSAGES = Arrays.asList( + private static final List NETWORK_ERROR_MESSAGES = Arrays.asList( "Connection reset by peer", "Network error", "Connection refused" ); + /* + * Checks if the exception is a network-related issue and throws a SERVICE_UNAVAILABLE error. + */ + private void handleNetworkErrors(Exception e) throws AtlasBaseException { if (e instanceof SocketTimeoutException || e instanceof UnknownHostException || (e.getMessage() != null && NETWORK_ERROR_MESSAGES.stream().anyMatch(e.getMessage()::contains))) { throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, From 9ac3d6ca8658a4a20e93131a124d8a59a42bab34 Mon Sep 17 00:00:00 2001 From: Pavan Manish Date: Tue, 4 Mar 2025 00:54:16 +0530 Subject: [PATCH 7/8] Use Set instead of List --- .../graphdb/janus/AtlasElasticsearchQuery.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 3c2cfc2850..9a6239364a 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -66,7 +66,12 @@ public class AtlasElasticsearchQuery implements AtlasIndexQuery { private static final Logger LOG = LoggerFactory.getLogger(AtlasElasticsearchQuery.class); - + private static final Set NETWORK_ERROR_MESSAGES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + "Connection reset by peer", + "Network error", + "Connection refused" + ))); private AtlasJanusGraph graph; private RestHighLevelClient esClient; private RestClient lowLevelRestClient; @@ -265,11 +270,6 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP return result; } - private static final List NETWORK_ERROR_MESSAGES = Arrays.asList( - "Connection reset by peer", - "Network error", - "Connection refused" - ); /* * Checks if the exception is a network-related issue and throws a SERVICE_UNAVAILABLE error. */ From 57618f712b900c06c4330379808d8cfd544a614f Mon Sep 17 00:00:00 2001 From: Pavan Manish Date: Tue, 4 Mar 2025 11:58:55 +0530 Subject: [PATCH 8/8] Original conditional statement --- .../graphdb/janus/AtlasElasticsearchQuery.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index 9a6239364a..fd84ce821a 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -66,12 +66,7 @@ public class AtlasElasticsearchQuery implements AtlasIndexQuery { private static final Logger LOG = LoggerFactory.getLogger(AtlasElasticsearchQuery.class); - private static final Set NETWORK_ERROR_MESSAGES = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - "Connection reset by peer", - "Network error", - "Connection refused" - ))); + private AtlasJanusGraph graph; private RestHighLevelClient esClient; private RestClient lowLevelRestClient; @@ -275,7 +270,10 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP */ private void handleNetworkErrors(Exception e) throws AtlasBaseException { if (e instanceof SocketTimeoutException || e instanceof UnknownHostException || - (e.getMessage() != null && NETWORK_ERROR_MESSAGES.stream().anyMatch(e.getMessage()::contains))) { + (e.getMessage() != null && + (e.getMessage().contains("Connection reset by peer") || + e.getMessage().contains("Network error") || + e.getMessage().contains("Connection refused")))) { throw new AtlasBaseException(AtlasErrorCode.SERVICE_UNAVAILABLE, "Service is unavailable or a network error occurred: Elasticsearch - " + e.getMessage()); }