Skip to content

Commit

Permalink
Merge pull request #2864 from atlanhq/cancel-es-req
Browse files Browse the repository at this point in the history
ES request cancellation: some name refactoring
  • Loading branch information
sumandas0 authored Feb 22, 2024
2 parents 38f7523 + 3c90305 commit 7047176
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -130,7 +129,7 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar
DirectIndexQueryResult result = null;

try {
if(searchParams.isAsync()) {
if(searchParams.isCallAsync()) {
return performAsyncDirectIndexQuery(searchParams);
} else{
String responseString = performDirectIndexQuery(searchParams.getQuery(), false);
Expand Down Expand Up @@ -175,22 +174,23 @@ public Map<String, Object> runQueryWithLowLevelClient(String query) throws Atlas
private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException {
DirectIndexQueryResult result = null;
try {
if(StringUtils.isNotEmpty(searchParams.getAsyncSearchContextId())) {
if(StringUtils.isNotEmpty(searchParams.getSearchContextId())) {
// If the search context id is present, then we need to delete the previous search context async
processRequestWithSameSearchContextId(searchParams);
}

AsyncQueryResult response = submitAsyncSearch(searchParams, false).get();

if(response.isRunning()) {
String esSearchId = response.getId();
if (StringUtils.isNotEmpty(searchParams.getAsyncSearchContextId())) {
Integer serialNumber = Integer.parseInt(searchParams.getAsyncSearchContextId().split("-")[1]);
SearchContextCache.putSequence(searchParams.getAsyncSearchContextId().split("-")[0], serialNumber.toString());
SearchContextCache.put(searchParams.getAsyncSearchContextId(), esSearchId);
if (StringUtils.isNotEmpty(searchParams.getSearchContextId())) {
if (searchParams.getSearchContextSequenceNo() != null) {
SearchContextCache.putSequence(searchParams.getSearchContextId(), searchParams.getSearchContextSequenceNo());
}
SearchContextCache.put(searchParams.getSearchContextId(), esSearchId);
}
response = getAsyncSearchResponse(searchParams, esSearchId).get();
if (response != null) {
if (!response.isSuccess() && response.getSearchCancelledException() != null) {
if (response.getSearchCancelledException() != null) {
throw response.getSearchCancelledException();
}
result = getResultFromResponse(response.getFullResponse(), true);
Expand All @@ -205,27 +205,42 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP
return result;
}

private void processRequestWithSameSearchContextId(SearchParams searchParams) throws AtlasBaseException, IOException {
// search context id is of form id - serial number now check if current id is greater than the one in cache if so extract esSearchId and delete it
Integer currentSerialNumber = Integer.parseInt(searchParams.getAsyncSearchContextId().split("-")[1]);
String currentId = searchParams.getAsyncSearchContextId().split("-")[0];
Integer existingSerialNumber = Integer.parseInt(SearchContextCache.getSequence(currentId));
if (currentSerialNumber > existingSerialNumber) {
String esSearchId = SearchContextCache.get(currentId);
if(StringUtils.isNotEmpty(esSearchId)) {
deleteAsyncSearchResponse(esSearchId);
SearchContextCache.remove(currentId);
}
private void processRequestWithSameSearchContextId(SearchParams searchParams) {
// Extract search context ID and sequence number
String currentSearchContextId = searchParams.getSearchContextId();
Integer currentSequenceNumber = searchParams.getSearchContextSequenceNo();

// Check if cache entry exists for the given ID
boolean cacheEntryExists = SearchContextCache.get(currentSearchContextId) != null;

// Handle cases where sequence number is available and greater
if (currentSequenceNumber != null && currentSequenceNumber > SearchContextCache.getSequence(currentSearchContextId)) {
// Sequence number is greater, update cache
handleCacheUpdate(currentSearchContextId);
} else if (currentSequenceNumber == null || !cacheEntryExists) {
// Sequence number missing or no cache entry, potentially invalid or outdated
// Handle case where sequence number is unavailable or invalid
handleCacheUpdate(currentSearchContextId);
}
}
private void handleCacheUpdate(String currentSearchContextId) {
// Retrieve existing search ID from cache
String esSearchId = SearchContextCache.get(currentSearchContextId);

private Future<AsyncQueryResult> getAsyncSearchResponse(SearchParams searchParams, String esSearchId) throws AtlasBaseException, IOException {
// Check if search ID exists and delete if necessary
if (StringUtils.isNotEmpty(esSearchId)) {
deleteAsyncSearchResponse(esSearchId);
SearchContextCache.remove(currentSearchContextId);
}
}

private Future<AsyncQueryResult> getAsyncSearchResponse(SearchParams searchParams, String esSearchId) {
CompletableFuture<AsyncQueryResult> future = new CompletableFuture<>();
String endPoint = "_async_search/" + esSearchId;
Request request = new Request("GET", endPoint);
long waitTime = AtlasConfiguration.INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS.getLong();
if (searchParams.getAsyncRequestTimeoutInSecs()!= null) {
waitTime = searchParams.getAsyncRequestTimeoutInSecs();
if (searchParams.getRequestTimeoutInSecs()!= null) {
waitTime = searchParams.getRequestTimeoutInSecs();
}
//Reduce wait time by 10% to avoid timeout and round off to seconds
waitTime = (long) (waitTime * 0.9);
Expand All @@ -240,10 +255,8 @@ public void onSuccess(Response response) {
Boolean is_running = AtlasType.fromJson(AtlasType.toJson(responseMap.get("is_running")), Boolean.class);
AsyncQueryResult result = new AsyncQueryResult(respString, false);
if (completionStatus != null && completionStatus == 200) {
result.setSuccess(true);
future.complete(result);
} else if (is_running!=null && is_running) {
result.setSuccess(false);
} else if (is_running !=null && is_running) {
future.complete(result);
} else if(completionStatus != null) {
future.completeExceptionally(new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED));
Expand All @@ -260,7 +273,6 @@ public void onFailure(Exception exception) {
SearchCancelledException searchCancelledException = new SearchCancelledException("Search cancelled as the request took too long to complete or " +
"request with same context came through");
AsyncQueryResult result = new AsyncQueryResult(null);
result.setSuccess(false);
result.setSearchCancelledException(searchCancelledException);
if (statusCode == 404) {
LOG.debug("Async search response not found");
Expand All @@ -282,7 +294,7 @@ public void onFailure(Exception exception) {
return future;
}

private void deleteAsyncSearchResponse(String searchContextId) throws AtlasBaseException, IOException {
private void deleteAsyncSearchResponse(String searchContextId) {
String endPoint = "_async_search/" + searchContextId;
Request request = new Request("DELETE", endPoint);
ResponseListener responseListener = new ResponseListener() {
Expand All @@ -307,8 +319,8 @@ private Future<AsyncQueryResult> submitAsyncSearch(SearchParams searchParams, bo
HttpEntity entity = new NStringEntity(searchParams.getQuery(), ContentType.APPLICATION_JSON);
String endPoint;
String KeepAliveTime = AtlasConfiguration.INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS.getLong() +"s";
if (searchParams.getAsyncRequestTimeoutInSecs()!= null) {
KeepAliveTime = searchParams.getAsyncRequestTimeoutInSecs() +"s";
if (searchParams.getRequestTimeoutInSecs() != null) {
KeepAliveTime = searchParams.getRequestTimeoutInSecs() +"s";
}

if (source) {
Expand Down Expand Up @@ -558,9 +570,6 @@ public class AsyncQueryResult {
private boolean isRunning;
private String id;
private String fullResponse;

private boolean success;

private SearchCancelledException searchCancelledException;

// Constructor for a running process
Expand Down Expand Up @@ -597,14 +606,6 @@ public String getFullResponse() {
return fullResponse;
}

public boolean isSuccess() {
return success;
}

public void setSuccess(boolean success) {
this.success = success;
}

public SearchCancelledException getSearchCancelledException() {
return searchCancelledException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ public static void put(String key, String value) {
searchContextCache.put(key, value);
}

public static void putSequence(String key, String value) {
public static void putSequence(String key, Integer value) {
searchContextSequenceCache.put(key, value);
}

public static String getSequence(String key) {
return (String) searchContextSequenceCache.getIfPresent(key);
public static Integer getSequence(String key) {
Integer sequence = (Integer) searchContextSequenceCache.getIfPresent(key);
return sequence != null ? sequence : 0;
}

public static String get(String key){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class SearchParams {
boolean useAccessControlv2;

RequestMetadata requestMetadata = new RequestMetadata();

Async async = new Async();
boolean showHighlights;

public String getQuery() {
Expand Down Expand Up @@ -123,38 +125,43 @@ public void setRequestMetadata(RequestMetadata requestMetadata) {
this.requestMetadata = requestMetadata;
}

public String getSearchInput() {
return this.requestMetadata.getSearchInput();
public Async getAsync() {
return async;
}

public boolean isShowHighlights() {
return showHighlights;
public void setAsync(Async async) {
this.async = async;
}

public boolean isAsync() {
return this.requestMetadata.async;
public boolean isCallAsync() {
return async.getIsCallAsync();
}

public Long getAsyncRequestTimeoutInSecs() {
return this.requestMetadata.asyncRequestTimeoutInSecs;
public String getSearchContextId() {
return async.getSearchContextId();
}

public String getAsyncSearchContextId() {
return this.requestMetadata.asyncSearchContextId;
public Integer getSearchContextSequenceNo() {
return async.getSearchContextSequenceNo();
}

@JsonIgnoreProperties(ignoreUnknown=true)
static class RequestMetadata {
private String searchInput;
private Set<String> utmTags;
private boolean saveSearchLog;
public Long getRequestTimeoutInSecs() {
return async.getRequestTimeoutInSecs();
}

private boolean async;
public String getSearchInput() {
return this.requestMetadata.getSearchInput();
}

private Long asyncRequestTimeoutInSecs;
public boolean isShowHighlights() {
return showHighlights;
}

private String asyncSearchContextId;

static class RequestMetadata {
private String searchInput;
private Set<String> utmTags;
private boolean saveSearchLog;

public String getSearchInput() {
return searchInput;
Expand All @@ -179,17 +186,32 @@ public void setUtmTags(Set<String> utmTags) {
public void setSaveSearchLog(boolean saveSearchLog) {
this.saveSearchLog = saveSearchLog;
}
}

@JsonIgnoreProperties(ignoreUnknown=true)
static class Async {
private boolean isCallAsync;

private String searchContextId;

private Integer searchContextSequenceNo;

private Long requestTimeoutInSecs;

public boolean getIsCallAsync() {
return isCallAsync;
}

public boolean isAsync() {
return async;
public String getSearchContextId() {
return searchContextId;
}

public Long getAsyncRequestTimeoutInSecs() {
return asyncRequestTimeoutInSecs;
public Integer getSearchContextSequenceNo() {
return searchContextSequenceNo;
}

public String getAsyncSearchContextId() {
return asyncSearchContextId;
public Long getRequestTimeoutInSecs() {
return requestTimeoutInSecs;
}
}

Expand Down

0 comments on commit 7047176

Please sign in to comment.