From 6ec86edb29f2317f2149485cac7d2b3d20e0d9a6 Mon Sep 17 00:00:00 2001 From: Abhijeet Kumar Date: Wed, 20 Nov 2024 11:53:04 +0530 Subject: [PATCH 1/8] DG-1924 | Update task vertex w impacted-vertices counts --- .github/workflows/maven.yml | 1 + .../apache/atlas/repository/Constants.java | 4 +++ .../atlas/model/patches/AtlasPatch.java | 22 +++++++++++++-- .../apache/atlas/model/tasks/AtlasTask.java | 27 +++++++++++++++++++ .../graph/GraphBackedSearchIndexer.java | 2 ++ .../patches/AtlasPatchRegistry.java | 1 + .../store/graph/v2/EntityGraphMapper.java | 22 ++++++++++++--- .../graph/v2/tasks/ClassificationTask.java | 10 ++++++- .../store/graph/v2/tasks/MeaningsTask.java | 7 +++++ .../org/apache/atlas/tasks/AbstractTask.java | 7 +++++ .../apache/atlas/tasks/AtlasTaskService.java | 4 +++ .../org/apache/atlas/tasks/TaskExecutor.java | 2 +- .../org/apache/atlas/tasks/TaskFactory.java | 3 ++- .../org/apache/atlas/tasks/TaskRegistry.java | 21 ++++++++++++--- .../java/org/apache/atlas/RequestContext.java | 19 +++++++++++++ 15 files changed, 140 insertions(+), 12 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f8a09b5589..5d92f13889 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,6 +26,7 @@ on: - development - master - lineageondemand + - taskdg1924 jobs: build: diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 4fc62e0e6e..4427731e9b 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -241,6 +241,8 @@ public final class Constants { public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp"); + public static final String MODIFICATION_TASK_ASSET_COUNT_TO_PROPAGATE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationAssetsCountToPropagate"); + public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete"); /** @@ -357,6 +359,8 @@ public final class Constants { public static final String TASK_CLASSIFICATION_ID = encodePropertyKey(TASK_PREFIX + "classificationId"); public static final String TASK_ENTITY_GUID = encodePropertyKey(TASK_PREFIX + "entityGuid"); public static final String TASK_CLASSIFICATION_TYPENAME = encodePropertyKey(TASK_PREFIX + "classificationTypeName"); + public static final String TASK_ASSET_COUNT_TO_PROPAGATE = encodePropertyKey(TASK_PREFIX + "assetsCountToPropagate"); + public static final String TASK_ASSET_COUNT_PROPAGATED = encodePropertyKey(TASK_PREFIX + "assetsCountPropagated"); public static final String ACTIVE_STATE_VALUE = "ACTIVE"; public static final String TASK_HEADER_ATLAN_AGENT = "x-atlan-agent"; public static final String TASK_HEADER_ATLAN_AGENT_ID = "x-atlan-agent-id"; diff --git a/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java b/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java index ae0255f9b7..a30511324d 100644 --- a/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java +++ b/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java @@ -49,13 +49,15 @@ public class AtlasPatch implements Serializable { private long createdTime; private long updatedTime; private PatchStatus status; + private long assetsCountToPropagate; + private long assetsCountPropagated; public enum PatchStatus { UNKNOWN, APPLIED, SKIPPED, FAILED } public AtlasPatch() { } public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status, - String updatedBy, String createdBy, long createdTime, long updatedTime) { + String updatedBy, String createdBy, long createdTime, long updatedTime, long assetsCountToPropagate, long assetsCountPropagated) { this.id = id; this.description = patchName; this.type = type; @@ -65,6 +67,8 @@ public AtlasPatch(String id, String patchName, String type, String action, Patch this.createdBy = createdBy; this.createdTime = createdTime; this.updatedTime = updatedTime; + this.assetsCountToPropagate = assetsCountToPropagate; + this.assetsCountPropagated = assetsCountPropagated; } public String getId() { @@ -139,6 +143,18 @@ public void setUpdatedTime(long updatedTime) { this.updatedTime = updatedTime; } + public void setAssetsCountToPropagate(Long assetsCount) { + this.assetsCountToPropagate = assetsCount; + } + + public Long getAssetsCountToPropagate() { + return assetsCountToPropagate; + } + + public Long getAssetsCountPropagated(){ + return assetsCountPropagated; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -157,7 +173,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status); + return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status, assetsCountToPropagate, assetsCountPropagated); } @Override @@ -173,6 +189,8 @@ public String toString() { sb.append(", createdTime=").append(createdTime); sb.append(", updatedTime=").append(updatedTime); sb.append(", status=").append(status); + sb.append(", assetsCountToPropagate=").append(assetsCountToPropagate); + sb.append(", assetsCountPropagated=").append(assetsCountPropagated); sb.append('}'); return sb.toString(); diff --git a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java index e4b87620d9..126dce34ff 100644 --- a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java +++ b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java @@ -95,6 +95,8 @@ public static Status from(String s) { private String classificationId; private String entityGuid; private String classificationTypeName; + private Long assetsCountToPropagate; + private Long assetsCountPropagated; public AtlasTask() { } @@ -111,8 +113,11 @@ public AtlasTask(String type, String createdBy, Map parameters, this.attemptCount = 0; this.classificationId = classificationId; this.entityGuid = entityGuid; + this.assetsCountToPropagate = 0L; + this.assetsCountPropagated = 0L; } + public String getGuid() { return guid; } @@ -239,6 +244,26 @@ public String getEntityGuid() { return entityGuid; } + public void setAssetsCountToPropagate(Long assetsCount) { + this.assetsCountToPropagate = assetsCount; + } + + public Long getAssetsCountToPropagate() { + return assetsCountToPropagate; + } + + public void setAssetsCountPropagated(Long assetsCountPropagated) { + this.assetsCountPropagated = assetsCountPropagated; + } + + public Long getAssetsCountPropagated(){ + return assetsCountPropagated; + } + + public void incrementAssetCountPropagated() { + this.assetsCountPropagated++; + } + @JsonIgnore public void start() { this.setStatus(Status.IN_PROGRESS); @@ -270,6 +295,8 @@ public String toString() { ", attemptCount=" + attemptCount + ", errorMessage='" + errorMessage + '\'' + ", status=" + status + + ", assetsCountToPropagate=" + assetsCountToPropagate + + ", assetsCountPropagated=" + assetsCountPropagated + '}'; } } \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index a8bacbbb5f..7b2c3da7ac 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -407,6 +407,8 @@ private void initialize(AtlasGraph graph) throws RepositoryException, IndexExcep createCommonVertexIndex(management, TASK_ENTITY_GUID, UniqueKind.NONE, String.class, SINGLE, false, false, true); createCommonVertexIndex(management, TASK_ERROR_MESSAGE, UniqueKind.NONE, String.class, SINGLE, false, false); createCommonVertexIndex(management, TASK_ATTEMPT_COUNT, UniqueKind.NONE, Integer.class, SINGLE, false, false); + createCommonVertexIndex(management, TASK_ASSET_COUNT_TO_PROPAGATE, UniqueKind.NONE, Long.class, SINGLE, false, false); + createCommonVertexIndex(management, TASK_ASSET_COUNT_PROPAGATED, UniqueKind.NONE, Long.class, SINGLE, false, false); createCommonVertexIndex(management, TASK_UPDATED_TIME, UniqueKind.NONE, Long.class, SINGLE, false, false); createCommonVertexIndex(management, TASK_TIME_TAKEN_IN_SECONDS, UniqueKind.NONE, Long.class, SINGLE, false, false); diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java index d9ae5800e4..e724a00ed0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java @@ -210,6 +210,7 @@ private static AtlasPatch toAtlasPatch(AtlasVertex vertex) { ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class)); ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class)); ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class)); + ret.setAssetsCountToPropagate(getEncodedProperty(vertex, TASK_ASSET_COUNT_TO_PROPAGATE, Long.class)); ret.setStatus(getPatchStatus(vertex)); return ret; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 76f0be44cb..96d31bc7eb 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -78,6 +78,7 @@ import javax.inject.Inject; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -95,6 +96,7 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; import static org.apache.atlas.model.tasks.AtlasTask.Status.IN_PROGRESS; +import static org.apache.atlas.model.tasks.AtlasTask.Status.PENDING; import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET; import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge; @@ -3417,6 +3419,7 @@ public void addClassifications(final EntityMutationContext context, String guid, public List propagateClassification(String entityGuid, String classificationVertexId, String relationshipGuid, Boolean previousRestrictPropagationThroughLineage,Boolean previousRestrictPropagationThroughHierarchy) throws AtlasBaseException { try { + if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) { LOG.error("propagateClassification(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId); @@ -3459,7 +3462,7 @@ public List propagateClassification(String entityGuid, String classifica List edgeLabelsToCheck = CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP.get(propagationMode); Boolean toExclude = propagationMode == CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ? true:false; List impactedVertices = entityRetriever.getIncludedImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId, edgeLabelsToCheck,toExclude); - + if (CollectionUtils.isEmpty(impactedVertices)) { LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId); @@ -3475,6 +3478,16 @@ public List propagateClassification(String entityGuid, String classifica } public List processClassificationPropagationAddition(List verticesToPropagate, AtlasVertex classificationVertex) throws AtlasBaseException{ + + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) verticesToPropagate.size()); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + AtlasPerfMetrics.MetricRecorder classificationPropagationMetricRecorder = RequestContext.get().startMetricRecord("processClassificationPropagationAddition"); List propagatedEntitiesGuids = new ArrayList<>(); int impactedVerticesSize = verticesToPropagate.size(); @@ -3505,9 +3518,12 @@ public List processClassificationPropagationAddition(List v propagatedEntitiesGuids.addAll(chunkedPropagatedEntitiesGuids); - offset += CHUNK_SIZE; - transactionInterceptHelper.intercept(); + int finishedTaskCount = toIndex - offset; + + offset += CHUNK_SIZE; + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); } while (offset < impactedVerticesSize); } catch (AtlasBaseException exception) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java index deac12e565..2420891011 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java @@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.tasks.AtlasTask; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; @@ -36,8 +37,11 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.atlas.model.tasks.AtlasTask.Status.*; +import static org.apache.atlas.repository.Constants.TASK_ASSET_COUNT_TO_PROPAGATE; +import static org.apache.atlas.repository.Constants.TASK_GUID; import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE; public abstract class ClassificationTask extends AbstractTask { @@ -100,7 +104,7 @@ public AtlasTask.Status perform() throws AtlasBaseException { try { setStatus(IN_PROGRESS); - + setAssetsCountToPropagate(1234L); run(params); setStatus(COMPLETE); @@ -180,5 +184,9 @@ protected void setStatus(AtlasTask.Status status) { graph.commit(); } + protected void setAssetsCountToPropagate(Long assetsCount) { + super.setAssetsCountToPropagate(assetsCount); + graph.commit(); + } protected abstract void run(Map parameters) throws AtlasBaseException; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java index 42b2bf76d7..c2bcb96ec4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java @@ -16,6 +16,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.atlas.model.tasks.AtlasTask.Status.*; import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_HARD_DELETE; @@ -60,6 +61,7 @@ public AtlasTask.Status perform() throws Exception { RequestContext.get().setUser(userName, null); try { setStatus(IN_PROGRESS); + setAssetsCountToPropagate(1234L); run(params); @@ -112,5 +114,10 @@ protected void setStatus(AtlasTask.Status status) { } + protected void setAssetsCountToPropagate(Long assetsCount) { + super.setAssetsCountToPropagate(assetsCount); + + } + protected abstract void run(Map parameters) throws AtlasBaseException; } diff --git a/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java b/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java index 6a64e91a73..4e006a57f3 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java +++ b/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java @@ -67,5 +67,12 @@ protected AtlasTask getTaskDef() { return this.task; } + protected void setAssetsCountToPropagate(Long assetsCount) { + task.setAssetsCountToPropagate(assetsCount);; + } + + public Long getAssetsCountToPropagate() { + return this.task.getAssetsCountToPropagate(); + } public abstract Status perform() throws Exception; } \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/tasks/AtlasTaskService.java b/repository/src/main/java/org/apache/atlas/tasks/AtlasTaskService.java index 1df177223b..cac026b5cf 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/AtlasTaskService.java +++ b/repository/src/main/java/org/apache/atlas/tasks/AtlasTaskService.java @@ -162,6 +162,8 @@ public List createAtlasTasks(List tasks) throws AtlasBaseE task.setCreatedTime(new Date()); task.setStatusPending(); task.setAttemptCount(0); + task.setAssetsCountToPropagate(task.getAssetsCountToPropagate()); + task.setAssetsCountPropagated(0L); task.setGuid(UUID.randomUUID().toString()); task.setCreatedBy(RequestContext.getCurrentUser()); @@ -268,6 +270,8 @@ public AtlasVertex createTaskVertex(AtlasTask task) { setEncodedProperty(ret, Constants.TASK_PARAMETERS, AtlasJson.toJson(task.getParameters())); setEncodedProperty(ret, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount()); + setEncodedProperty(ret, Constants.TASK_ASSET_COUNT_TO_PROPAGATE, task.getAssetsCountToPropagate()); + setEncodedProperty(ret, Constants.TASK_ASSET_COUNT_PROPAGATED, task.getAssetsCountPropagated()); setEncodedProperty(ret, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage()); LOG.info("Creating task vertex: {}: {}, {}: {}, {}: {} ", diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java index f0f4ca08db..60e3c29561 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java @@ -186,7 +186,7 @@ private void performTask(AtlasVertex taskVertex, AtlasTask task) throws Exceptio AbstractTask runnableTask = factory.create(task); - registry.inProgress(taskVertex, task); + registry.inProgress(taskVertex, task, runnableTask); runnableTask.run(); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java index 47ce1fcb77..f7513eedf9 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java @@ -16,6 +16,7 @@ * limitations under the License. */ package org.apache.atlas.tasks; +import org.apache.atlas.AtlasException; import org.apache.atlas.model.tasks.AtlasTask; import java.util.List; @@ -26,7 +27,7 @@ public interface TaskFactory { * @param atlasTask * @return */ - AbstractTask create(AtlasTask atlasTask); + AbstractTask create(AtlasTask atlasTask) throws AtlasException; List getSupportedTypes(); } \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 8c08a26121..793bd9a6eb 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -52,8 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.atlas.repository.Constants.TASK_GUID; -import static org.apache.atlas.repository.Constants.TASK_STATUS; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; @Component @@ -239,15 +238,21 @@ public void deleteComplete(AtlasVertex taskVertex, AtlasTask task) { deleteVertex(taskVertex); } - public void inProgress(AtlasVertex taskVertex, AtlasTask task) { + public void inProgress(AtlasVertex taskVertex, AtlasTask task, AbstractTask runnableTask) { RequestContext.get().setCurrentTask(task); task.setStartTime(new Date()); - + task.setAssetsCountToPropagate(runnableTask.getAssetsCountToPropagate()); setEncodedProperty(taskVertex, Constants.TASK_START_TIME, task.getStartTime()); setEncodedProperty(taskVertex, Constants.TASK_STATUS, AtlasTask.Status.IN_PROGRESS); setEncodedProperty(taskVertex, Constants.TASK_UPDATED_TIME, System.currentTimeMillis()); + setEncodedProperty(taskVertex, Constants.TASK_ASSET_COUNT_TO_PROPAGATE, task.getAssetsCountToPropagate()); graph.commit(); + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @GraphTransaction @@ -558,7 +563,15 @@ public static AtlasTask toAtlasTask(AtlasVertex v) { ret.setErrorMessage(errorMessage); } + Long assetsCountToPropagate = v.getProperty(Constants.TASK_ASSET_COUNT_TO_PROPAGATE, Long.class); + if (assetsCountToPropagate != null){ + ret.setAssetsCountToPropagate(assetsCountToPropagate); + } + Long assetsCountPropagated = v.getProperty(Constants.TASK_ASSET_COUNT_PROPAGATED, Long.class); + if (assetsCountPropagated != null){ + ret.setAssetsCountPropagated(assetsCountPropagated); + } return ret; } diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index 814a4cb450..c43bc39c83 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -84,6 +84,8 @@ public class RequestContext { private boolean isPurgeRequested = false; private int maxAttempts = 1; private int attemptCount = 1; + private Long assetsCountToPropagate = 0L; + private Long assetsCountPropagated = 0L; private boolean isImportInProgress = false; private boolean isInNotificationProcessing = false; private boolean isInTypePatching = false; @@ -299,6 +301,23 @@ public void setAttemptCount(int attemptCount) { this.attemptCount = attemptCount; } + public Long getAssetsCountToPropagate() { + return assetsCountToPropagate; + } + + public Long getAssetsCountPropagated() { + return assetsCountPropagated; + } + + + public void setAssetsCountToPropagate(Long assetsCount) { + this.assetsCountToPropagate = assetsCount ; + } + + public void setAssetsCountPropagated(Long assetsCountPropagated) { + this.assetsCountPropagated = assetsCountPropagated; + } + public boolean isImportInProgress() { return isImportInProgress; } From 27798366a76b2467f1009d0ffec2f39ae074a8fa Mon Sep 17 00:00:00 2001 From: Abhijeet Kumar Date: Wed, 11 Dec 2024 14:51:27 +0530 Subject: [PATCH 2/8] Refactor Task Asset Count Handling in Apache Atlas This commit streamlines the handling of asset propagation counts across task-related classes by removing outdated configurations and simplifications in the code: - **Constants.java**: Removed the 'MODIFICATION_TASK_ASSET_COUNT_TO_PROPAGATE_PROPERTY_KEY', eliminating the configuration property for asset count thresholds. - **AtlasTask.java**: Deleted the 'incrementAssetCountPropagated' method to reduce manual asset count manipulations. - **ClassificationTask.java & MeaningsTask.java**: Removed methods setting asset counts to propagate, simplifying task executions. - **AtlasTaskService.java**: Updated task initialization to set asset counts to zero, aligning with the new simplified approach. - **TaskRegistry.java**: Removed an unnecessary sleep operation from the task update process. --- .../main/java/org/apache/atlas/repository/Constants.java | 2 -- .../main/java/org/apache/atlas/model/tasks/AtlasTask.java | 4 ---- .../repository/store/graph/v2/tasks/ClassificationTask.java | 6 ------ .../atlas/repository/store/graph/v2/tasks/MeaningsTask.java | 6 ------ .../main/java/org/apache/atlas/tasks/AtlasTaskService.java | 2 +- .../src/main/java/org/apache/atlas/tasks/TaskRegistry.java | 5 ----- 6 files changed, 1 insertion(+), 24 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 4427731e9b..11e08bf78b 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -241,8 +241,6 @@ public final class Constants { public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp"); - public static final String MODIFICATION_TASK_ASSET_COUNT_TO_PROPAGATE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationAssetsCountToPropagate"); - public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete"); /** diff --git a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java index 126dce34ff..0a1b633b47 100644 --- a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java +++ b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java @@ -260,10 +260,6 @@ public Long getAssetsCountPropagated(){ return assetsCountPropagated; } - public void incrementAssetCountPropagated() { - this.assetsCountPropagated++; - } - @JsonIgnore public void start() { this.setStatus(Status.IN_PROGRESS); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java index 2420891011..f9811a418e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java @@ -104,9 +104,7 @@ public AtlasTask.Status perform() throws AtlasBaseException { try { setStatus(IN_PROGRESS); - setAssetsCountToPropagate(1234L); run(params); - setStatus(COMPLETE); } catch (AtlasBaseException e) { LOG.error("Task: {}: Error performing task!", getTaskGuid(), e); @@ -184,9 +182,5 @@ protected void setStatus(AtlasTask.Status status) { graph.commit(); } - protected void setAssetsCountToPropagate(Long assetsCount) { - super.setAssetsCountToPropagate(assetsCount); - graph.commit(); - } protected abstract void run(Map parameters) throws AtlasBaseException; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java index c2bcb96ec4..df5aaac9d7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java @@ -61,7 +61,6 @@ public AtlasTask.Status perform() throws Exception { RequestContext.get().setUser(userName, null); try { setStatus(IN_PROGRESS); - setAssetsCountToPropagate(1234L); run(params); @@ -114,10 +113,5 @@ protected void setStatus(AtlasTask.Status status) { } - protected void setAssetsCountToPropagate(Long assetsCount) { - super.setAssetsCountToPropagate(assetsCount); - - } - protected abstract void run(Map parameters) throws AtlasBaseException; } diff --git a/repository/src/main/java/org/apache/atlas/tasks/AtlasTaskService.java b/repository/src/main/java/org/apache/atlas/tasks/AtlasTaskService.java index cac026b5cf..82529cfc3d 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/AtlasTaskService.java +++ b/repository/src/main/java/org/apache/atlas/tasks/AtlasTaskService.java @@ -162,7 +162,7 @@ public List createAtlasTasks(List tasks) throws AtlasBaseE task.setCreatedTime(new Date()); task.setStatusPending(); task.setAttemptCount(0); - task.setAssetsCountToPropagate(task.getAssetsCountToPropagate()); + task.setAssetsCountToPropagate(0L); task.setAssetsCountPropagated(0L); task.setGuid(UUID.randomUUID().toString()); task.setCreatedBy(RequestContext.getCurrentUser()); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 793bd9a6eb..f41ab5cb16 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -248,11 +248,6 @@ public void inProgress(AtlasVertex taskVertex, AtlasTask task, AbstractTask runn setEncodedProperty(taskVertex, Constants.TASK_UPDATED_TIME, System.currentTimeMillis()); setEncodedProperty(taskVertex, Constants.TASK_ASSET_COUNT_TO_PROPAGATE, task.getAssetsCountToPropagate()); graph.commit(); - try { - TimeUnit.SECONDS.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } } @GraphTransaction From 800ec121f5729edfa0e8cb15d00da68c33abeaa5 Mon Sep 17 00:00:00 2001 From: Abhijeet Kumar Date: Wed, 11 Dec 2024 17:08:56 +0530 Subject: [PATCH 3/8] removed redundant imports, codes, constants --- .../graph/v2/tasks/ClassificationTask.java | 9 ++++--- .../store/graph/v2/tasks/MeaningsTask.java | 1 - .../org/apache/atlas/tasks/TaskRegistry.java | 24 ++++++++++--------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java index f9811a418e..daefe788be 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java @@ -23,7 +23,6 @@ import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.tasks.AtlasTask; import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; @@ -37,11 +36,11 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; -import static org.apache.atlas.model.tasks.AtlasTask.Status.*; -import static org.apache.atlas.repository.Constants.TASK_ASSET_COUNT_TO_PROPAGATE; -import static org.apache.atlas.repository.Constants.TASK_GUID; +import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED; +import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE; +import static org.apache.atlas.model.tasks.AtlasTask.Status.IN_PROGRESS; + import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE; public abstract class ClassificationTask extends AbstractTask { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java index df5aaac9d7..42b2bf76d7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java @@ -16,7 +16,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.apache.atlas.model.tasks.AtlasTask.Status.*; import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_HARD_DELETE; diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index f41ab5cb16..8d9accba91 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -52,7 +52,9 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.atlas.repository.Constants.*; + +import static org.apache.atlas.repository.Constants.TASK_GUID; +import static org.apache.atlas.repository.Constants.TASK_STATUS; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; @Component @@ -90,7 +92,7 @@ public List getPendingTasks() { try { AtlasGraphQuery query = graph.query() .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME) - .has(Constants.TASK_STATUS, AtlasTask.Status.PENDING) + .has(TASK_STATUS, AtlasTask.Status.PENDING) .orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC); Iterator results = query.vertices().iterator(); @@ -115,7 +117,7 @@ public List getInProgressTasks() { try { AtlasGraphQuery query = graph.query() .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME) - .has(Constants.TASK_STATUS, AtlasTask.Status.IN_PROGRESS) + .has(TASK_STATUS, AtlasTask.Status.IN_PROGRESS) .orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC); Iterator results = query.vertices().iterator(); @@ -170,7 +172,7 @@ public void updateStatus(AtlasVertex taskVertex, AtlasTask task) { } setEncodedProperty(taskVertex, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount()); - setEncodedProperty(taskVertex, Constants.TASK_STATUS, task.getStatus().toString()); + setEncodedProperty(taskVertex, TASK_STATUS, task.getStatus().toString()); setEncodedProperty(taskVertex, Constants.TASK_UPDATED_TIME, System.currentTimeMillis()); setEncodedProperty(taskVertex, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage()); } @@ -244,7 +246,7 @@ public void inProgress(AtlasVertex taskVertex, AtlasTask task, AbstractTask runn task.setStartTime(new Date()); task.setAssetsCountToPropagate(runnableTask.getAssetsCountToPropagate()); setEncodedProperty(taskVertex, Constants.TASK_START_TIME, task.getStartTime()); - setEncodedProperty(taskVertex, Constants.TASK_STATUS, AtlasTask.Status.IN_PROGRESS); + setEncodedProperty(taskVertex, TASK_STATUS, AtlasTask.Status.IN_PROGRESS); setEncodedProperty(taskVertex, Constants.TASK_UPDATED_TIME, System.currentTimeMillis()); setEncodedProperty(taskVertex, Constants.TASK_ASSET_COUNT_TO_PROPAGATE, task.getAssetsCountToPropagate()); graph.commit(); @@ -282,7 +284,7 @@ public AtlasTask getById(String guid) { @GraphTransaction public AtlasVertex getVertex(String taskGuid) { - AtlasGraphQuery query = graph.query().has(Constants.TASK_GUID, taskGuid); + AtlasGraphQuery query = graph.query().has(TASK_GUID, taskGuid); Iterator results = query.vertices().iterator(); @@ -319,7 +321,7 @@ public List getAll(List statusList, int offset, int limit) { List orConditions = new LinkedList<>(); for (String status : statusList) { - orConditions.add(query.createChildQuery().has(Constants.TASK_STATUS, AtlasTask.Status.from(status))); + orConditions.add(query.createChildQuery().has(TASK_STATUS, AtlasTask.Status.from(status))); } query.or(orConditions); @@ -355,8 +357,8 @@ public List getTasksForReQueueGraphQuery() { .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME); List orConditions = new LinkedList<>(); - orConditions.add(query.createChildQuery().has(Constants.TASK_STATUS, AtlasTask.Status.IN_PROGRESS)); - orConditions.add(query.createChildQuery().has(Constants.TASK_STATUS, AtlasTask.Status.PENDING)); + orConditions.add(query.createChildQuery().has(TASK_STATUS, AtlasTask.Status.IN_PROGRESS)); + orConditions.add(query.createChildQuery().has(TASK_STATUS, AtlasTask.Status.PENDING)); query.or(orConditions); query.orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC); @@ -483,7 +485,7 @@ private void deleteVertex(AtlasVertex taskVertex) { public static AtlasTask toAtlasTask(AtlasVertex v) { AtlasTask ret = new AtlasTask(); - String guid = v.getProperty(Constants.TASK_GUID, String.class); + String guid = v.getProperty(TASK_GUID, String.class); if (guid != null) { ret.setGuid(guid); } @@ -493,7 +495,7 @@ public static AtlasTask toAtlasTask(AtlasVertex v) { ret.setType(type); } - String status = v.getProperty(Constants.TASK_STATUS, String.class); + String status = v.getProperty(TASK_STATUS, String.class); if (status != null) { ret.setStatus(status); } From 9b6fb716d5736263e90cde1b5a8dd6050ab680b2 Mon Sep 17 00:00:00 2001 From: abhijeet-atlan Date: Wed, 11 Dec 2024 18:12:45 +0530 Subject: [PATCH 4/8] Update maven.yml --- .github/workflows/maven.yml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 7d63cdff0b..da9e4b9d2b 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -25,10 +25,7 @@ on: - beta - development - master - - lineageondemand - taskdg1924 - - dg1908 - - ns/fix/delta-refresh jobs: build: @@ -69,7 +66,7 @@ jobs: - name: Build with Maven run: | branch_name=${{ steps.get_branch.outputs.branch }} - if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'taskdg1924' || $branch_name == 'dg1908' ]] + if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'taskdg1924']] then echo "build without dashboard" chmod +x ./build.sh && ./build.sh build_without_dashboard From 7a71c1415a2df089bef72e440c16380cb4a50eca Mon Sep 17 00:00:00 2001 From: abhijeet-atlan Date: Wed, 11 Dec 2024 18:14:47 +0530 Subject: [PATCH 5/8] Update maven.yml --- .github/workflows/maven.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index da9e4b9d2b..d4346667cf 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -66,7 +66,7 @@ jobs: - name: Build with Maven run: | branch_name=${{ steps.get_branch.outputs.branch }} - if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'taskdg1924']] + if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'taskdg1924' ]] then echo "build without dashboard" chmod +x ./build.sh && ./build.sh build_without_dashboard From c095f36bf64adb92c63b05e269939decc8b2a4fb Mon Sep 17 00:00:00 2001 From: Abhijeet Kumar Date: Wed, 11 Dec 2024 15:54:02 +0530 Subject: [PATCH 6/8] DG-1924 | Update task vertex w impacted-vertices counts for remaining types of propagation --- .github/workflows/maven.yml | 3 +- .../store/graph/v1/DeleteHandlerV1.java | 27 ++++++++ .../store/graph/v2/EntityGraphMapper.java | 69 +++++++++++++++++-- 3 files changed, 93 insertions(+), 6 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 5d92f13889..340551bd57 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -27,6 +27,7 @@ on: - master - lineageondemand - taskdg1924 + - taskdg1924deleteprop jobs: build: @@ -65,7 +66,7 @@ jobs: - name: Build with Maven run: | branch_name=${{ steps.get_branch.outputs.branch }} - if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]] + if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'taskdg1924deleteprop' ]] then echo "build without dashboard" chmod +x ./build.sh && ./build.sh build_without_dashboard diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index 68c6dacd9c..8aa88f1d23 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -1215,16 +1215,42 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship } } + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) addPropagationsMap.size() + removePropagationsMap.size() - 1); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + + int propagatedCount = 0; for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) { List entitiesToAddPropagation = addPropagationsMap.get(classificationVertex); addTagPropagation(classificationVertex, entitiesToAddPropagation); + propagatedCount++; + if (propagatedCount == 100){ + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount - 1); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); + propagatedCount = 0; + } } for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) { List entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex); removeTagPropagation(classificationVertex, entitiesToRemovePropagation); + propagatedCount++; + if (propagatedCount == 100){ + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); + propagatedCount = 0; + } + } + if (propagatedCount != 0){ + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); } } else { // update blocked propagated classifications only if there is no change is tag propagation (don't update both) @@ -1232,6 +1258,7 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship } } + public void handleBlockedClassifications(AtlasEdge edge, Set blockedClassifications) throws AtlasBaseException { if (blockedClassifications != null) { List propagatableClassifications = getPropagatableClassifications(edge); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 96d31bc7eb..b49d70e65c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; import org.apache.atlas.*; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.authorize.AtlasAuthorizationUtils; @@ -3139,9 +3140,10 @@ public void cleanUpClassificationPropagation(String classificationName, int batc long classificationEdgeCount = 0; long classificationEdgeInMemoryCount = 0; Iterator tagVertices = GraphHelper.getClassificationVertices(graph, classificationName, CLEANUP_BATCH_SIZE); + List tagVerticesProcessed = new ArrayList<>(0); List currentAssetVerticesBatch = new ArrayList<>(0); - + int total_count = 0; while (tagVertices != null && tagVertices.hasNext()) { if (cleanedUpCount >= CLEANUP_MAX){ return; @@ -3160,6 +3162,8 @@ public void cleanUpClassificationPropagation(String classificationName, int batc } int currentAssetsBatchSize = currentAssetVerticesBatch.size(); + total_count += currentAssetsBatchSize; + if (currentAssetsBatchSize > 0) { LOG.info("To clean up tag {} from {} entities", classificationName, currentAssetsBatchSize); int offset = 0; @@ -3191,17 +3195,20 @@ public void cleanUpClassificationPropagation(String classificationName, int batc classificationEdgeInMemoryCount = 0; } } + try { AtlasEntity entity = repairClassificationMappings(vertex); entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications); } catch (IllegalStateException | AtlasBaseException e) { e.printStackTrace(); } + } transactionInterceptHelper.intercept(); - offset += CHUNK_SIZE; + + } finally { LOG.info("For offset {} , classificationEdge were : {}", offset, classificationEdgeCount); classificationEdgeCount = 0; @@ -3217,6 +3224,17 @@ public void cleanUpClassificationPropagation(String classificationName, int batc e.printStackTrace(); } } + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) total_count); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + total_count); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); transactionInterceptHelper.intercept(); cleanedUpCount += currentAssetsBatchSize; @@ -3481,7 +3499,7 @@ public List processClassificationPropagationAddition(List v // update the 'assetsCountToPropagate' on in memory java object. AtlasTask currentTask = RequestContext.get().getCurrentTask(); - currentTask.setAssetsCountToPropagate((long) verticesToPropagate.size()); + currentTask.setAssetsCountToPropagate((long) verticesToPropagate.size() - 1); //update the 'assetsCountToPropagate' in the current task vertex. AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); @@ -3519,7 +3537,7 @@ public List processClassificationPropagationAddition(List v propagatedEntitiesGuids.addAll(chunkedPropagatedEntitiesGuids); transactionInterceptHelper.intercept(); - int finishedTaskCount = toIndex - offset; + int finishedTaskCount = toIndex - offset - 1; offset += CHUNK_SIZE; currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount); @@ -4067,6 +4085,16 @@ public void updateClassificationTextPropagation(String classificationVertexId) t AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); LOG.info("Fetched classification : {} ", classification.toString()); List impactedVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex); + + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) impactedVertices.size() - 1); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + LOG.info("impactedVertices : {}", impactedVertices.size()); int batchSize = 100; for (int i = 0; i < impactedVertices.size(); i += batchSize) { @@ -4081,6 +4109,10 @@ public void updateClassificationTextPropagation(String classificationVertexId) t entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(classification)); } } + + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + batch.size() - 1); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); + transactionInterceptHelper.intercept(); LOG.info("Updated classificationText from {} for {}", i, batchSize); } @@ -4271,6 +4303,15 @@ public void classificationRefreshPropagation(String classificationId) throws Atl .filter(vertex -> vertex != null) .collect(Collectors.toList()); + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) verticesToRemove.size() + verticesToAddClassification.size() - 1); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + //Remove classifications from unreachable vertices processPropagatedClassificationDeletionFromVertices(verticesToRemove, currentClassificationVertex, classification); @@ -4332,6 +4373,9 @@ private void processPropagatedClassificationDeletionFromVertices(List updatedEntities = updateClassificationText(classification, updatedVertices); entityChangeNotifier.onClassificationsDeletedFromEntities(updatedEntities, Collections.singletonList(classification)); + int finishedTaskCount = toIndex - offset; offset += CHUNK_SIZE; - + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); transactionInterceptHelper.intercept(); } while (offset < propagatedVerticesSize); @@ -4367,6 +4413,15 @@ List processClassificationEdgeDeletionInChunk(AtlasClassification classi int toIndex; int offset = 0; + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) propagatedEdgesSize); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + do { toIndex = ((offset + CHUNK_SIZE > propagatedEdgesSize) ? propagatedEdgesSize : (offset + CHUNK_SIZE)); @@ -4382,8 +4437,12 @@ List processClassificationEdgeDeletionInChunk(AtlasClassification classi deletedPropagationsGuid.addAll(propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList())); } + int finishedTaskCount = toIndex - offset; + offset += CHUNK_SIZE; + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); transactionInterceptHelper.intercept(); } while (offset < propagatedEdgesSize); From aaf5cfbc53a56a921daf3ac52b028086b2785b43 Mon Sep 17 00:00:00 2001 From: Abhijeet Kumar Date: Mon, 30 Dec 2024 14:38:42 +0530 Subject: [PATCH 7/8] changed 'total count' to camel case instead of snake case for better concistency --- .../repository/store/graph/v2/EntityGraphMapper.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index b49d70e65c..34e6759a40 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -3143,7 +3143,7 @@ public void cleanUpClassificationPropagation(String classificationName, int batc List tagVerticesProcessed = new ArrayList<>(0); List currentAssetVerticesBatch = new ArrayList<>(0); - int total_count = 0; + int totalCount = 0; while (tagVertices != null && tagVertices.hasNext()) { if (cleanedUpCount >= CLEANUP_MAX){ return; @@ -3162,7 +3162,7 @@ public void cleanUpClassificationPropagation(String classificationName, int batc } int currentAssetsBatchSize = currentAssetVerticesBatch.size(); - total_count += currentAssetsBatchSize; + totalCount += currentAssetsBatchSize; if (currentAssetsBatchSize > 0) { LOG.info("To clean up tag {} from {} entities", classificationName, currentAssetsBatchSize); @@ -3226,14 +3226,14 @@ public void cleanUpClassificationPropagation(String classificationName, int batc } // update the 'assetsCountToPropagate' on in memory java object. AtlasTask currentTask = RequestContext.get().getCurrentTask(); - currentTask.setAssetsCountToPropagate((long) total_count); + currentTask.setAssetsCountToPropagate((long) totalCount); //update the 'assetsCountToPropagate' in the current task vertex. AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); graph.commit(); - currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + total_count); + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + totalCount); currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); transactionInterceptHelper.intercept(); From 458d8e5da3693a58a3585700fc639096e0394241 Mon Sep 17 00:00:00 2001 From: Abhijeet Kumar Date: Thu, 2 Jan 2025 10:34:10 +0530 Subject: [PATCH 8/8] minor fixes based on review comments --- intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java | 1 - .../atlas/repository/store/graph/v2/EntityGraphMapper.java | 1 - .../src/main/java/org/apache/atlas/tasks/TaskFactory.java | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java index 0a1b633b47..0d7ebf4e4d 100644 --- a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java +++ b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java @@ -117,7 +117,6 @@ public AtlasTask(String type, String createdBy, Map parameters, this.assetsCountPropagated = 0L; } - public String getGuid() { return guid; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 34e6759a40..4c32250d63 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -79,7 +79,6 @@ import javax.inject.Inject; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java index f7513eedf9..7c7f5d7ba8 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java @@ -27,7 +27,7 @@ public interface TaskFactory { * @param atlasTask * @return */ - AbstractTask create(AtlasTask atlasTask) throws AtlasException; + AbstractTask create(AtlasTask atlasTask); List getSupportedTypes(); } \ No newline at end of file