Skip to content

Commit

Permalink
Merge pull request #2869 from atlanhq/cancel-es-req
Browse files Browse the repository at this point in the history
feat: cancel ES request from metastore side if indexsearch get cancelled from the client
  • Loading branch information
sumandas0 authored Feb 27, 2024
2 parents ca7a4e7 + 549c5be commit 57e56f6
Show file tree
Hide file tree
Showing 6 changed files with 404 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.graphdb.janus;

import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParams;
Expand All @@ -26,6 +27,7 @@
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
Expand All @@ -36,6 +38,7 @@
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -48,6 +51,8 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Stream;

import static org.apache.atlas.AtlasErrorCode.INDEX_NOT_FOUND;
Expand Down Expand Up @@ -123,20 +128,19 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar
DirectIndexQueryResult result = null;

try {

String responseString = performDirectIndexQuery(searchParams.getQuery(), false);
if (LOG.isDebugEnabled()) {
LOG.debug("runQueryWithLowLevelClient.response : {}", responseString);
if(searchParams.isCallAsync()) {
return performAsyncDirectIndexQuery(searchParams);
} else{
String responseString = performDirectIndexQuery(searchParams.getQuery(), false);
if (LOG.isDebugEnabled()) {
LOG.debug("runQueryWithLowLevelClient.response : {}", responseString);
}
return getResultFromResponse(responseString);
}

result = getResultFromResponse(responseString);

} catch (IOException e) {
LOG.error("Failed to execute direct query on ES {}", e.getMessage());
throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage());
}

return result;
}

private Map<String, Object> runQueryWithLowLevelClient(String query) throws AtlasBaseException {
Expand Down Expand Up @@ -166,6 +170,212 @@ private Map<String, Object> runQueryWithLowLevelClient(String query) throws Atla
}
}

private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException {
DirectIndexQueryResult result = null;
try {
if(StringUtils.isNotEmpty(searchParams.getSearchContextId()) && searchParams.getSearchContextSequenceNo() != null) {
// If the search context id and greater sequence no is present, then we need to delete the previous search context async
processRequestWithSameSearchContextId(searchParams);
}
AsyncQueryResult response = submitAsyncSearch(searchParams, false).get();

if(response.isRunning()) {
/*
* If the response is still running, then we need to wait for the response
* We need to check if the search context ID is present and update the cache
* We also need to check if the search ID exists and delete if necessary, if the sequence number is greater than the cache sequence number
*
*/
String esSearchId = response.getId();
String searchContextId = searchParams.getSearchContextId();
Integer searchContextSequenceNo = searchParams.getSearchContextSequenceNo();
if (StringUtils.isNotEmpty(searchContextId) && searchContextSequenceNo != null) {
SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId);
}
response = getAsyncSearchResponse(searchParams, esSearchId).get();
if (response == null) {
// Return null, if the response is null wil help returning @204 HTTP_NO_CONTENT to the user
return null;
}
result = getResultFromResponse(response.getFullResponse(), true);
} else {
result = getResultFromResponse(response.getFullResponse(), true);
}
}catch (Exception e) {
LOG.error("Failed to execute direct query on ES {}", e.getMessage());
throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage());
}
return result;
}

/*
* Process the request with the same search context ID and sequence number
* @param searchParams
* @return void
* Function to process the request with the same search context ID
* If the sequence number is greater than the cache sequence number,
* then we need to cancel the request and update the cache
* We also need to check if the search ID exists and delete if necessary
*/
private void processRequestWithSameSearchContextId(SearchParams searchParams) {
// Extract search context ID and sequence number
String currentSearchContextId = searchParams.getSearchContextId();
Integer currentSequenceNumber = searchParams.getSearchContextSequenceNo();
// Get the search ID from the cache if sequence number is greater than the current sequence number
String previousESSearchId = SearchContextCache.getESAsyncSearchIdFromContextCache(currentSearchContextId, currentSequenceNumber);

if (StringUtils.isNotEmpty(previousESSearchId)) {
LOG.debug("Deleting the previous async search response with ID {}", previousESSearchId);
// If the search ID exists, then we need to delete the search context
deleteAsyncSearchResponse(previousESSearchId);
SearchContextCache.remove(currentSearchContextId);
}
}

/*
* Get the async search response
* @param searchParams
* @param esSearchId
* @return Future<AsyncQueryResult>
* Function to get the async search response after we submit the async search request
*/
private Future<AsyncQueryResult> getAsyncSearchResponse(SearchParams searchParams, String esSearchId) {
CompletableFuture<AsyncQueryResult> future = new CompletableFuture<>();
String endPoint = "_async_search/" + esSearchId;
Request request = new Request("GET", endPoint);
long waitTime = AtlasConfiguration.INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS.getLong();
if (searchParams.getRequestTimeoutInSecs()!= null) {
waitTime = searchParams.getRequestTimeoutInSecs();
}
request.addParameter("wait_for_completion_timeout", waitTime + "s");
ResponseListener responseListener = new ResponseListener() {
@Override
public void onSuccess(Response response) {
try {
String respString = EntityUtils.toString(response.getEntity());
Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(respString, Map.class);
Boolean isInComplete = AtlasType.fromJson(AtlasType.toJson(responseMap.get("is_partial")), Boolean.class);
String id = AtlasType.fromJson(AtlasType.toJson(responseMap.get("id")), String.class);

if (isInComplete != null && isInComplete) {
/*
* After the wait time, if the response is still incomplete, then we need to delete the search context
* and complete the future with null
* So that ES don't process the request later
*/
deleteAsyncSearchResponse(id);
future.complete(null);
}
AsyncQueryResult result = new AsyncQueryResult(respString, false);
future.complete(result);
} catch (IOException e) {
future.completeExceptionally(e);
}
}

@Override
public void onFailure(Exception exception) {
if (exception instanceof ResponseException){
int statusCode = ((ResponseException) exception).getResponse().getStatusLine().getStatusCode();
if (statusCode == 400 || statusCode == 404) {
/*
* If the response is not found or deleted, then we need to complete the future with null
* Else we need to complete the future with the exception
* Sending null, would return 204 to the user
*/
LOG.debug("Async search response not found or deleted", exception);
future.complete(null);
} else {
future.completeExceptionally(exception);
}
} else {
future.completeExceptionally(exception);
}
}
};

lowLevelRestClient.performRequestAsync(request, responseListener);

return future;
}

private void deleteAsyncSearchResponse(String searchContextId) {
String endPoint = "_async_search/" + searchContextId;
Request request = new Request("DELETE", endPoint);
ResponseListener responseListener = new ResponseListener() {
@Override
public void onSuccess(Response response) {
LOG.debug("Deleted async search response");
}
@Override
public void onFailure(Exception exception) {
if (exception instanceof ResponseException && ((ResponseException) exception).getResponse().getStatusLine().getStatusCode() == 404) {
LOG.debug("Async search response not found");
} else {
LOG.error("Failed to delete async search response {}", exception.getMessage());
}
}
};
lowLevelRestClient.performRequestAsync(request, responseListener);
}

private Future<AsyncQueryResult> submitAsyncSearch(SearchParams searchParams, boolean source) {
CompletableFuture<AsyncQueryResult> future = new CompletableFuture<>();
HttpEntity entity = new NStringEntity(searchParams.getQuery(), ContentType.APPLICATION_JSON);
String endPoint;
String KeepAliveTime = AtlasConfiguration.INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS.getLong() +"s";
if (searchParams.getRequestTimeoutInSecs() != null) {
KeepAliveTime = searchParams.getRequestTimeoutInSecs() +"s";
}

if (source) {
endPoint = index + "/_async_search";
} else {
endPoint = index + "/_async_search?_source=false";
}

Request request = new Request("POST", endPoint);
request.setEntity(entity);

request.addParameter("wait_for_completion_timeout", "100ms");
request.addParameter("keep_alive", KeepAliveTime);

ResponseListener responseListener = new ResponseListener() {
@Override
public void onSuccess(Response response) {
try {
String respString = EntityUtils.toString(response.getEntity());
Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(respString, Map.class);
boolean isRunning = AtlasType.fromJson(AtlasType.toJson(responseMap.get("is_running")), Boolean.class);
String id = AtlasType.fromJson(AtlasType.toJson(responseMap.get("id")), String.class);
AsyncQueryResult result = new AsyncQueryResult(respString, isRunning);
/*
* If the response is running, then we need to complete the future with the ID to retrieve this later
* Else we will complete the future with the response, if it completes within default timeout of 100ms
*/
if (isRunning && StringUtils.isNotEmpty(id)) {
// response is still running, complete the future with the ID
// use the ID to retrieve the response later
result.setId(id);
future.complete(result);
} else {
future.complete(result);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void onFailure(Exception exception) {
future.completeExceptionally(exception);
}
};

lowLevelRestClient.performRequestAsync(request, responseListener);

return future;
}

private String performDirectIndexQuery(String query, boolean source) throws AtlasBaseException, IOException {
HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
String endPoint;
Expand Down Expand Up @@ -194,12 +404,17 @@ private String performDirectIndexQuery(String query, boolean source) throws Atla
return EntityUtils.toString(response.getEntity());
}

private DirectIndexQueryResult getResultFromResponse(String responseString) {
DirectIndexQueryResult result = new DirectIndexQueryResult();

private DirectIndexQueryResult getResultFromResponse(String responseString, boolean async) throws IOException {
Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(responseString, Map.class);
return getResultFromResponse(responseMap.get("response"));
}

private DirectIndexQueryResult getResultFromResponse(Map<String, LinkedHashMap> responseMap) throws IOException {
DirectIndexQueryResult result = new DirectIndexQueryResult();
Map<String, LinkedHashMap> hits_0 = AtlasType.fromJson(AtlasType.toJson(responseMap.get("hits")), Map.class);
if (hits_0 == null) {
return result;
}
this.vertexTotals = (Integer) hits_0.get("total").get("value");

List<LinkedHashMap> hits_1 = AtlasType.fromJson(AtlasType.toJson(hits_0.get("hits")), List.class);
Expand All @@ -214,8 +429,19 @@ private DirectIndexQueryResult getResultFromResponse(String responseString) {
}

return result;

}


private DirectIndexQueryResult getResultFromResponse(String responseString) throws IOException {

Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(responseString, Map.class);

return getResultFromResponse(responseMap);
}



@Override
public DirectIndexQueryResult<AtlasJanusVertex, AtlasJanusEdge> vertices(SearchParams searchParams) throws AtlasBaseException {
return runQueryWithLowLevelClient(searchParams);
Expand Down Expand Up @@ -351,4 +577,54 @@ public Map<String, List<String>> getHighLights() {
return new HashMap<>();
}
}

public class AsyncQueryResult {
private boolean isRunning;
private String id;
private String fullResponse;

private boolean success;
// Constructor for a running process
public AsyncQueryResult(String id) {
this.isRunning = true;
this.id = id;
this.fullResponse = null;
}

// Constructor for a completed process
public AsyncQueryResult(String fullResponse, boolean isRunning) {
this.isRunning = isRunning;
this.id = null;
this.fullResponse = fullResponse;
}

public void setRunning(boolean running) {
this.isRunning = running;
}

// Getters
public boolean isRunning() {
return isRunning;
}

void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}

public String getFullResponse() {
return fullResponse;
}

public void setSuccess(boolean success) {
this.success = success;
}

public boolean isSuccess() {
return success;
}
}

}
Loading

0 comments on commit 57e56f6

Please sign in to comment.