Skip to content

Commit

Permalink
Merge branch 'master' into beta-helper
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilbonte21 committed Mar 5, 2025
2 parents b80b13c + dbfc853 commit 566e07d
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public class AtlasElasticsearchDatabase {
private static volatile RestHighLevelClient searchClient;
private static volatile RestClient lowLevelClient;

private static volatile RestClient esProductClusterClient;
private static volatile RestClient esNonProductClusterClient;
private static volatile RestClient esUiClusterClient;
private static volatile RestClient esNonUiClusterClient;
public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.hostname";

public static List<HttpHost> getHttpHosts() throws AtlasException {
Expand Down Expand Up @@ -105,62 +105,62 @@ public static RestClient getLowLevelClient() {
return lowLevelClient;
}

public static RestClient getProductClusterClient() {
public static RestClient getUiClusterClient() {
if (!AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION.getBoolean()) {
return null;
}

if (esProductClusterClient == null) {
if (esUiClusterClient == null) {
synchronized (AtlasElasticsearchDatabase.class) {
if (esProductClusterClient == null) {
if (esUiClusterClient == null) {
try {
HttpHost productHost = HttpHost.create(AtlasConfiguration.ATLAS_ELASTICSEARCH_PRODUCT_SEARCH_CLUSTER_URL.getString());
HttpHost UiHost = HttpHost.create(AtlasConfiguration.ATLAS_ELASTICSEARCH_UI_SEARCH_CLUSTER_URL.getString());

RestClientBuilder builder = RestClient.builder(productHost);
RestClientBuilder builder = RestClient.builder(UiHost);
builder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> 3600000)));
builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(AtlasConfiguration.INDEX_CLIENT_CONNECTION_TIMEOUT.getInt())
.setSocketTimeout(AtlasConfiguration.INDEX_CLIENT_SOCKET_TIMEOUT.getInt()));

esProductClusterClient = builder.build();
esUiClusterClient = builder.build();
} catch (Exception e) {
LOG.error("Failed to initialize product cluster client", e);
LOG.error("Failed to initialize UI cluster client", e);
return null;
}
}
}
}
return esProductClusterClient;
return esUiClusterClient;
}

public static RestClient getNonProductSearchClusterClient() {
public static RestClient getNonUiClusterClient() {
if (!AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION.getBoolean()) {
return null;
}

if (esNonProductClusterClient == null) {
if (esNonUiClusterClient == null) {
synchronized (AtlasElasticsearchDatabase.class) {
if (esNonProductClusterClient == null) {
if (esNonUiClusterClient == null) {
try {
HttpHost nonProductHost = HttpHost.create(AtlasConfiguration.ATLAS_ELASTICSEARCH_NON_PRODUCT_SEARCH_CLUSTER_URL.getString());
HttpHost nonUiHost = HttpHost.create(AtlasConfiguration.ATLAS_ELASTICSEARCH_NON_UI_SEARCH_CLUSTER_URL.getString());

RestClientBuilder builder = RestClient.builder(nonProductHost);
RestClientBuilder builder = RestClient.builder(nonUiHost);
builder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> 3600000)));
builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(AtlasConfiguration.INDEX_CLIENT_CONNECTION_TIMEOUT.getInt())
.setSocketTimeout(AtlasConfiguration.INDEX_CLIENT_SOCKET_TIMEOUT.getInt()));

esNonProductClusterClient = builder.build();
esNonUiClusterClient = builder.build();
} catch (Exception e) {
LOG.error("Failed to initialize Non-product cluster client", e);
LOG.error("Failed to initialize Non-Ui cluster client", e);
return null;
}
}
}
}
return esNonProductClusterClient;
return esNonUiClusterClient;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public class AtlasElasticsearchQuery implements AtlasIndexQuery<AtlasJanusVertex
private RestHighLevelClient esClient;
private RestClient lowLevelRestClient;

private RestClient esProductClusterClient;
private RestClient esNonProductClusterClient;
private RestClient esUiClusterClient;
private RestClient esNonUiClusterClient;
private String index;
private SearchSourceBuilder sourceBuilder;
private SearchResponse searchResponse;
Expand All @@ -87,11 +87,11 @@ public AtlasElasticsearchQuery(AtlasJanusGraph graph, RestHighLevelClient esClie
this.sourceBuilder = sourceBuilder;
}

public AtlasElasticsearchQuery(AtlasJanusGraph graph, RestClient restClient, String index, SearchParams searchParams, RestClient esProductClusterClient, RestClient esNonProductClusterClient) {
public AtlasElasticsearchQuery(AtlasJanusGraph graph, RestClient restClient, String index, SearchParams searchParams, RestClient esUiClusterClient, RestClient esNonUiClusterClient) {
this(graph, index);
this.lowLevelRestClient = restClient;
this.esProductClusterClient = esProductClusterClient;
this.esNonProductClusterClient = esNonProductClusterClient;
this.esUiClusterClient = esUiClusterClient;
this.esNonUiClusterClient = esNonUiClusterClient;
this.searchParams = searchParams;
}

Expand All @@ -101,8 +101,8 @@ private AtlasElasticsearchQuery(AtlasJanusGraph graph, String index) {
searchResponse = null;
}

public AtlasElasticsearchQuery(AtlasJanusGraph graph, String index, RestClient restClient,RestClient esProductClusterClient, RestClient esNonProductClusterClient) {
this(graph, restClient, index, null, esProductClusterClient, esNonProductClusterClient);
public AtlasElasticsearchQuery(AtlasJanusGraph graph, String index, RestClient restClient, RestClient esUiClusterClient, RestClient esNonUiClusterClient) {
this(graph, restClient, index, null, esUiClusterClient, esNonUiClusterClient);
}

public AtlasElasticsearchQuery(String index, RestClient restClient) {
Expand All @@ -119,24 +119,23 @@ private SearchRequest getSearchRequest(String index, SearchSourceBuilder sourceB
/**
* Returns the appropriate Elasticsearch RestClient based on client origin and configuration settings if isolation is enabled.
*
* @return RestClient configured for either product or SDK cluster, falling back to low-level client
* @return RestClient configured for either UI or Non-UI cluster, falling back to low-level client
*/
private RestClient getESClient() {
RestClient client = lowLevelRestClient;
if (!AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION.getBoolean()) {
return client;
return lowLevelRestClient;
}

try {
String clientOrigin = RequestContext.get().getClientOrigin();
if (clientOrigin == null) {
return client;
return lowLevelRestClient;
}
if (CLIENT_ORIGIN_PRODUCT.equals(clientOrigin)) {
return Optional.ofNullable(esProductClusterClient)
return Optional.ofNullable(esUiClusterClient)
.orElse(lowLevelRestClient);
} else {
return Optional.ofNullable(esNonProductClusterClient)
return Optional.ofNullable(esNonUiClusterClient)
.orElse(lowLevelRestClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getLowLevelClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getProductClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getNonProductSearchClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getUiClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getNonUiClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase.getGraphInstance;
import static org.apache.atlas.type.Constants.STATE_PROPERTY_KEY;

Expand All @@ -117,8 +117,8 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
private final RestHighLevelClient elasticsearchClient;
private final RestClient restClient;

private final RestClient esProductClusterClient;
private final RestClient esNonProductClusterClient;
private final RestClient esUiClusterClient;
private final RestClient esNonUiClusterClient;

private final ThreadLocal<GremlinGroovyScriptEngine> scriptEngine = ThreadLocal.withInitial(() -> {
DefaultImportCustomizer.Builder builder = DefaultImportCustomizer.build()
Expand All @@ -129,10 +129,10 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
});

public AtlasJanusGraph() {
this(getGraphInstance(), getClient(), getLowLevelClient(), getProductClusterClient(), getNonProductSearchClusterClient());
this(getGraphInstance(), getClient(), getLowLevelClient(), getUiClusterClient(), getNonUiClusterClient());
}

public AtlasJanusGraph(JanusGraph graphInstance, RestHighLevelClient elasticsearchClient, RestClient restClient,RestClient esProductClusterClient, RestClient esNonProductClusterClient) {
public AtlasJanusGraph(JanusGraph graphInstance, RestHighLevelClient elasticsearchClient, RestClient restClient, RestClient esUiClusterClient, RestClient esNonUiClusterClient) {
//determine multi-properties once at startup
JanusGraphManagement mgmt = null;

Expand All @@ -155,8 +155,8 @@ public AtlasJanusGraph(JanusGraph graphInstance, RestHighLevelClient elasticsear
janusGraph = (StandardJanusGraph) graphInstance;
this.restClient = restClient;
this.elasticsearchClient = elasticsearchClient;
this.esProductClusterClient = esProductClusterClient;
this.esNonProductClusterClient = esNonProductClusterClient;
this.esUiClusterClient = esUiClusterClient;
this.esNonUiClusterClient = esNonUiClusterClient;
}

@Override
Expand Down Expand Up @@ -328,7 +328,7 @@ public AtlasIndexQuery<AtlasJanusVertex, AtlasJanusEdge> elasticsearchQuery(Stri
LOG.error("restClient is not initiated, failed to run query on ES");
throw new AtlasBaseException(INDEX_SEARCH_FAILED, "restClient is not initiated");
}
return new AtlasElasticsearchQuery(this, restClient, INDEX_PREFIX + indexName, searchParams, esProductClusterClient, esNonProductClusterClient);
return new AtlasElasticsearchQuery(this, restClient, INDEX_PREFIX + indexName, searchParams, esUiClusterClient, esNonUiClusterClient);
}

@Override
Expand Down Expand Up @@ -384,7 +384,7 @@ public AtlasIndexQuery elasticsearchQuery(String indexName) throws AtlasBaseExce
LOG.error("restClient is not initiated, failed to run query on ES");
throw new AtlasBaseException(INDEX_SEARCH_FAILED, "restClient is not initiated");
}
return new AtlasElasticsearchQuery(this, indexName, restClient, esProductClusterClient, esNonProductClusterClient);
return new AtlasElasticsearchQuery(this, indexName, restClient, esUiClusterClient, esNonUiClusterClient);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@

import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getLowLevelClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getProductClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getNonProductSearchClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getUiClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getNonUiClusterClient;


import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
Expand Down Expand Up @@ -309,7 +309,7 @@ public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraph() {

@Override
public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraphBulkLoading() {
return new AtlasJanusGraph(getBulkLoadingGraphInstance(), getClient(), getLowLevelClient(), getProductClusterClient(), getNonProductSearchClusterClient());
return new AtlasJanusGraph(getBulkLoadingGraphInstance(), getClient(), getLowLevelClient(), getUiClusterClient(), getNonUiClusterClient());
}

private static void startEmbeddedSolr() throws AtlasException {
Expand Down
4 changes: 2 additions & 2 deletions intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ public enum AtlasConfiguration {
ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000),
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION("atlas.indexsearch.request.isolation.enable", false),
ATLAS_ELASTICSEARCH_PRODUCT_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.product.cluster.url","atlas-elasticsearch2-product-search-headless.atlas.svc.cluster.local:9200"),
ATLAS_ELASTICSEARCH_NON_PRODUCT_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.nonproduct.cluster.url","atlas-elasticsearch2-non-product-search-headless.atlas.svc.cluster.local:9200"),
ATLAS_ELASTICSEARCH_UI_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.ui.cluster.url","atlas-elasticsearch2-ui-search.atlas.svc.cluster.local:9200"),
ATLAS_ELASTICSEARCH_NON_UI_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.nonui.cluster.url","atlas-elasticsearch2-non-ui-search.atlas.svc.cluster.local:9200"),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),

/**
Expand Down

0 comments on commit 566e07d

Please sign in to comment.