Skip to content

Commit

Permalink
Merge pull request #3798 from atlanhq/plt529test
Browse files Browse the repository at this point in the history
DG-1942 Fix task fetching inconsistency causing excessive lock churn and memory heap buildup in pods
  • Loading branch information
sumandas0 authored Dec 2, 2024
2 parents 2e353ad + e93359b commit 1672d81
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ private Map<String, Object> runQueryWithLowLevelClient(String query) throws Atla
}
}

private Map<String, LinkedHashMap> runUpdateByQueryWithLowLevelClient(String query) throws AtlasBaseException {
try {
String responseString = performDirectUpdateByQuery(query);

Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(responseString, Map.class);
return responseMap;

} catch (IOException e) {
LOG.error("Failed to execute direct query on ES {}", e.getMessage());
throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage());
}
}

private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("performAsyncDirectIndexQuery");
DirectIndexQueryResult result = null;
Expand Down Expand Up @@ -444,6 +457,30 @@ private String performDirectIndexQuery(String query, boolean source) throws Atla
return EntityUtils.toString(response.getEntity());
}

private String performDirectUpdateByQuery(String query) throws AtlasBaseException, IOException {
HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
String endPoint;

endPoint = index + "/_update_by_query";

Request request = new Request("POST", endPoint);
request.setEntity(entity);

Response response;
try {
response = lowLevelRestClient.performRequest(request);
} catch (ResponseException rex) {
if (rex.getResponse().getStatusLine().getStatusCode() == 404) {
LOG.warn(String.format("ES index with name %s not found", index));
throw new AtlasBaseException(INDEX_NOT_FOUND, index);
} else {
throw new AtlasBaseException(String.format("Error in executing elastic query: %s", EntityUtils.toString(entity)), rex);
}
}

return EntityUtils.toString(response.getEntity());
}

private DirectIndexQueryResult getResultFromResponse(String responseString, boolean async) throws IOException {
Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(responseString, Map.class);
return getResultFromResponse(responseMap.get("response"));
Expand Down Expand Up @@ -495,6 +532,10 @@ public Map<String, Object> directIndexQuery(String query) throws AtlasBaseExcept
return runQueryWithLowLevelClient(query);
}

public Map<String, LinkedHashMap> directUpdateByQuery(String query) throws AtlasBaseException {
return runUpdateByQueryWithLowLevelClient(query);
}

@Override
public Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> vertices() {
SearchRequest searchRequest = getSearchRequest(index, sourceBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,13 @@ public void run() {
LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
}
while (shouldRun.get()) {
TasksFetcher fetcher = new TasksFetcher(registry);
try {
if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) {
Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS);
continue;
}

TasksFetcher fetcher = new TasksFetcher(registry);

Thread tasksFetcherThread = new Thread(fetcher);
tasksFetcherThread.start();
tasksFetcherThread.join();
LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK);

List<AtlasTask> tasks = fetcher.getTasks();
if (CollectionUtils.isNotEmpty(tasks)) {
Expand All @@ -116,6 +112,7 @@ public void run() {
LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e);
} finally {
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
fetcher.clearTasks();
}
}
}
Expand Down Expand Up @@ -145,15 +142,14 @@ private void submitAll(List<AtlasTask> tasks, CountDownLatch latch) {
}
}

static class TasksFetcher implements Runnable {
static class TasksFetcher {
private TaskRegistry registry;
private List<AtlasTask> tasks = new ArrayList<>();

public TasksFetcher(TaskRegistry registry) {
this.registry = registry;
}

@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("TasksFetcher: Fetching tasks for queuing");
Expand All @@ -163,8 +159,13 @@ public void run() {
}

public List<AtlasTask> getTasks() {
run();
return tasks;
}

public void clearTasks() {
this.tasks.clear();
}
}

@PreDestroy
Expand Down
61 changes: 58 additions & 3 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.atlas.tasks;

import com.datastax.oss.driver.shaded.fasterxml.jackson.core.JsonProcessingException;
import com.datastax.oss.driver.shaded.fasterxml.jackson.databind.ObjectMapper;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
Expand All @@ -31,10 +33,12 @@
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.janusgraph.util.encoding.LongEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand All @@ -49,6 +53,7 @@
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -61,6 +66,7 @@ public class TaskRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
public static final int TASK_FETCH_BATCH_SIZE = 100;
public static final List<Map<String, Object>> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")));
public static final String JANUSGRAPH_VERTEX_INDEX = "janusgraph_vertex_index";

private AtlasGraph graph;
private TaskService taskService;
Expand Down Expand Up @@ -427,9 +433,11 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){
LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString()));
ret.add(atlasTask);
}
else {
LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid()));
} else {
LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}",
atlasTask.getGuid(), atlasTask.getStatus());
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
repairMismatchedTask(atlasTask, docId);
}
} else {
LOG.warn("Null vertex while re-queuing tasks at index {}", fetched);
Expand All @@ -452,6 +460,53 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
return ret;
}

private void repairMismatchedTask(AtlasTask atlasTask, String docId) {
AtlasElasticsearchQuery indexQuery = null;

try {
// Create a map for the fields to be updated
Map<String, Object> fieldsToUpdate = new HashMap<>();
fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime());
fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds());
fieldsToUpdate.put("__task_status", atlasTask.getStatus().toString());
fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime()); // Set current timestamp

// Convert fieldsToUpdate map to JSON using Jackson
ObjectMapper objectMapper = new ObjectMapper();
String fieldsToUpdateJson = objectMapper.writeValueAsString(fieldsToUpdate);

// Construct the Elasticsearch update by query DSL
String queryDsl = "{"
+ "\"script\": {"
+ " \"source\": \"for (entry in params.fields.entrySet()) { ctx._source[entry.getKey()] = entry.getValue() }\","
+ " \"params\": {"
+ " \"fields\": " + fieldsToUpdateJson
+ " }"
+ "},"
+ "\"query\": {"
+ " \"term\": {"
+ " \"_id\": \"" + docId + "\""
+ " }"
+ "}"
+ "}";

// Execute the Elasticsearch query
indexQuery = (AtlasElasticsearchQuery) graph.elasticsearchQuery(JANUSGRAPH_VERTEX_INDEX);
Map<String, LinkedHashMap> result = indexQuery.directUpdateByQuery(queryDsl);

if (result != null) {
LOG.info("Elasticsearch UpdateByQuery Result: " + result);
} else {
LOG.info("No documents updated in Elasticsearch for guid: " + atlasTask.getGuid());
}
} catch (JsonProcessingException e) {
LOG.error("Error converting fieldsToUpdate to JSON for task with guid: {} and docId: {}. Error: {}", atlasTask.getGuid(), docId, e.getMessage(), e);
}
catch (AtlasBaseException e) {
LOG.error("Error executing Elasticsearch query for task with guid: {} and docId: {}. Error: {}", atlasTask.getGuid(), docId, e.getMessage(), e);
}
}

public void commit() {
this.graph.commit();
}
Expand Down

0 comments on commit 1672d81

Please sign in to comment.