diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java index 878c980c45..065be47cfc 100644 --- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java +++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java @@ -132,7 +132,7 @@ public static EntityAuditActionV2 fromString(String strValue) { private String eventKey; private AtlasEntity entity; private EntityAuditType type; - private List> detail; + private Map detail; private AtlasEntityHeader entityDetail; private Map headers; @@ -227,11 +227,11 @@ public void setType(EntityAuditType type) { this.type = type; } - public List> getDetail() { + public Map getDetail() { return detail; } - public void setDetail(List> detail) { + public void setDetail(Map detail) { this.detail = detail; } diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/ESBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/ESBasedAuditRepository.java index 76172b554c..cbab135606 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/ESBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/ESBasedAuditRepository.java @@ -226,16 +226,7 @@ private EntityAuditSearchResult getResultFromResponse(String responseString) thr EntityAuditEventV2 event = new EntityAuditEventV2(); event.setEntityId(entityGuid); event.setAction(EntityAuditEventV2.EntityAuditActionV2.fromString((String) source.get(ACTION))); - if (source.get(DETAIL) != null) { - List> classificationMap; - if (source.get(DETAIL) instanceof java.util.ArrayList) { - classificationMap = (List>) source.get(DETAIL); - } else { - classificationMap = new ArrayList<>(); - classificationMap.add((Map) source.get(DETAIL)); - } - event.setDetail(classificationMap); - } + event.setDetail((Map) source.get(DETAIL)); event.setUser((String) source.get(USER)); event.setCreated((long) source.get(CREATED)); if (source.get(TIMESTAMP) != null) { diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java index d319225eeb..362fbf497a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java @@ -204,12 +204,17 @@ public void onClassificationsAdded(AtlasEntity entity, List MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit"); FixedBufferList classificationsAdded = getAuditEventsList(); - Map> entityClassifications = new HashMap<>(); - Map> propagatedClassifications = new HashMap<>(); - - getClassificationsFromEntity(classifications, entity, entityClassifications, propagatedClassifications); - emitAddClassificationEvent(classificationsAdded, entityClassifications, propagatedClassifications); + for (AtlasClassification classification : classifications) { + if (entity.getGuid().equals(classification.getEntityGuid())) { + createEvent(classificationsAdded.next(), entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification)); + } else { + createEvent(classificationsAdded.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)); + } + } + for (EntityAuditRepository auditRepository: auditRepositories) { + auditRepository.putEventsV2(classificationsAdded.toList()); + } RequestContext.get().endMetricRecord(metric); } } @@ -222,11 +227,20 @@ public void onClassificationsAdded(List entities, List events = getAuditEventsList(); - Map> entityClassifications = new HashMap<>(); - Map> propagatedClassifications = new HashMap<>(); + for (AtlasClassification classification : classifications) { + for (AtlasEntity entity : entities) { + if (entity.getGuid().equals(classification.getEntityGuid())) { + createEvent(events.next(), entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification)); + } else { + createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)); + } + } + } + + for (EntityAuditRepository auditRepository: auditRepositories) { + auditRepository.putEventsV2(events.toList()); + } - getClassificationsFromEntities(classifications, entities,entityClassifications, propagatedClassifications ); - emitAddClassificationEvent(events, entityClassifications, propagatedClassifications); RequestContext.get().endMetricRecord(metric); } @@ -239,45 +253,21 @@ public void onClassificationsUpdated(AtlasEntity entity, List events = getAuditEventsList(); String guid = entity.getGuid(); - - Map> entityClassifications = new HashMap<>(); - Map> propagatedClassifications = new HashMap<>(); - - getClassificationsFromEntity(classifications, entity, entityClassifications, propagatedClassifications); - - List addedClassification = new ArrayList<>(0); - List deletedClassification = new ArrayList<>(0); - List updatedClassification = new ArrayList<>(0); - - if (CollectionUtils.isNotEmpty(propagatedClassifications.get(entity))) { - propagatedClassifications.get(entity).forEach(classification -> { + for (AtlasClassification classification : classifications) { + if (guid.equals(classification.getEntityGuid())) { + createEvent(events.next(), entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification)); + } else { if (isPropagatedClassificationAdded(guid, classification)) { - addedClassification.add(classification); + createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)); } else if (isPropagatedClassificationDeleted(guid, classification)) { - deletedClassification.add(classification); + createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + getDeleteClassificationString(classification.getTypeName())); } else { - updatedClassification.add(classification); + createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " + AtlasType.toJson(classification)); } - }); - } - - if (CollectionUtils.isNotEmpty(addedClassification)) { - createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classifications: " + AtlasType.toJson(addedClassification)); - } - - if (CollectionUtils.isNotEmpty(deletedClassification)) { - createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classifications: " + getDeleteClassificationsMap(deletedClassification)); - } - - if (CollectionUtils.isNotEmpty(updatedClassification)) { - createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classifications: " + AtlasType.toJson(updatedClassification)); - } - - if (entityClassifications.get(entity) != null) { - createEvent(events.next(), entity, CLASSIFICATION_UPDATE, "Updated classifications: " + AtlasType.toJson(entityClassifications.get(entity))); + } } - for (EntityAuditRepository auditRepository : auditRepositories) { + for (EntityAuditRepository auditRepository: auditRepositories) { auditRepository.putEventsV2(events.toList()); } @@ -311,11 +301,18 @@ public void onClassificationsDeleted(AtlasEntity entity, List events = getAuditEventsList(); - Map>> entityClassifications = new HashMap<>(); - Map>> propagatedClassifications = new HashMap<>(); - getClassificationTextFromEntity(classifications, entity, entityClassifications, propagatedClassifications); - emitDeleteClassificationEvent(events, entityClassifications, propagatedClassifications); + for (AtlasClassification classification : classifications) { + if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) { + createEvent(events.next(), entity, CLASSIFICATION_DELETE, "Deleted classification: " + getDeleteClassificationString(classification.getTypeName())); + } else { + createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + getDeleteClassificationString(classification.getTypeName())); + } + } + + for (EntityAuditRepository auditRepository: auditRepositories) { + auditRepository.putEventsV2(events.toList()); + } RequestContext.get().endMetricRecord(metric); } @@ -328,11 +325,19 @@ public void onClassificationsDeleted(List entities, List events = getAuditEventsList(); - Map>> entityClassifications = new HashMap<>(); - Map>> propagatedClassifications = new HashMap<>(); + for (AtlasClassification classification : classifications) { + for (AtlasEntity entity : entities) { + if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) { + createEvent(events.next(), entity, CLASSIFICATION_DELETE, "Deleted classification: " + getDeleteClassificationString(classification.getTypeName())); + } else { + createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + getDeleteClassificationString(classification.getTypeName())); + } + } + } - getClassificationsTextFromEntities(classifications, entities, entityClassifications, propagatedClassifications); - emitDeleteClassificationEvent(events, entityClassifications, propagatedClassifications); + for (EntityAuditRepository auditRepository : auditRepositories) { + auditRepository.putEventsV2(events.toList()); + } RequestContext.get().endMetricRecord(metric); } @@ -771,22 +776,22 @@ public static String getV2AuditPrefix(EntityAuditActionV2 action) { ret = "Purged: "; break; case CLASSIFICATION_ADD: - ret = "Added classifications: "; + ret = "Added classification: "; break; case CLASSIFICATION_DELETE: - ret = "Deleted classifications: "; + ret = "Deleted classification: "; break; case CLASSIFICATION_UPDATE: - ret = "Updated classifications: "; + ret = "Updated classification: "; break; case PROPAGATED_CLASSIFICATION_ADD: - ret = "Added propagated classifications: "; + ret = "Added propagated classification: "; break; case PROPAGATED_CLASSIFICATION_DELETE: - ret = "Deleted propagated classifications: "; + ret = "Deleted propagated classification: "; break; case PROPAGATED_CLASSIFICATION_UPDATE: - ret = "Updated propagated classifications: "; + ret = "Updated propagated classification: "; break; case ENTITY_IMPORT_CREATE: ret = "Created by import: "; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java index 103e76c006..292ce1de09 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java @@ -193,24 +193,42 @@ public void setClassifications(Map map) throws AtlasB AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("commitChanges.notify"); Map> deleted = RequestContext.get().getDeletedClassificationAndVertices(); + Set allVertices = new HashSet<>(); if (MapUtils.isNotEmpty(deleted)) { - Map> entityClassification = getEntityClassificationsMapping(deleted); - - for (Map.Entry> atlasEntityListEntry : entityClassification.entrySet()) { - entityChangeNotifier.onClassificationDeletedFromEntity(atlasEntityListEntry.getKey(), atlasEntityListEntry.getValue()); + for (AtlasClassification deletedClassification : deleted.keySet()) { + Collection vertices = deleted.get(deletedClassification); + List propagatedEntities = new ArrayList<>(); + + for (Object obj : vertices) { + AtlasVertex vertex = (AtlasVertex) obj; + AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), IGNORE_REL); + + allVertices.add(vertex); + propagatedEntities.add(entity); + } + entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(deletedClassification)); } } Map> added = RequestContext.get().getAddedClassificationAndVertices(); if (MapUtils.isNotEmpty(added)) { - Map> entityClassification = getEntityClassificationsMapping(added); - - for (Map.Entry> atlasEntityListEntry : entityClassification.entrySet()) { - entityChangeNotifier.onClassificationAddedToEntity(atlasEntityListEntry.getKey(), atlasEntityListEntry.getValue()); + for (AtlasClassification addedClassification : added.keySet()) { + Collection vertices = added.get(addedClassification); + List propagatedEntities = new ArrayList<>(); + + for (Object obj : vertices) { + AtlasVertex vertex = (AtlasVertex) obj; + AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), IGNORE_REL); + + allVertices.add(vertex); + propagatedEntities.add(entity); + } + entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(addedClassification), false); } } + entityGraphMapper.updateClassificationText(null, allVertices); transactionInterceptHelper.intercept(); RequestContext.get().endMetricRecord(recorder); RequestContext.get().setDelayTagNotifications(false); @@ -291,12 +309,13 @@ private void deleteClassifications(String entityGuid, String typeName, List(vertices)); } } else { - /** ToDO : @aarshi: remove this after testing : - * this is dead code it will never be executed as isDelayTagNotifications() always true - **/ - Map> entityClassification = new HashMap<>(); for (AtlasClassification classification : addedClassifications.keySet()) { Set vertices = addedClassifications.get(classification); List propagatedEntities = updateClassificationText(classification, vertices); - propagatedEntities.forEach(entity -> entityClassification.computeIfAbsent(entity, key -> new ArrayList<>()).add(classification)); - } - - for (Map.Entry> atlasEntityListEntry : entityClassification.entrySet()) { - entityChangeNotifier.onClassificationAddedToEntity(atlasEntityListEntry.getKey(), atlasEntityListEntry.getValue()); + entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification), false); } } @@ -3036,23 +3028,11 @@ public void deleteClassification(String entityGuid, String classificationName) t if (RequestContext.get().isDelayTagNotifications()) { RequestContext.get().addDeletedClassificationAndVertices(classification, new ArrayList<>(entityVertices)); } else if (CollectionUtils.isNotEmpty(entityVertices)) { - /** ToDO : @aarshi: remove this after testing : - * this is dead code it will never be executed as isDelayTagNotifications() always true - **/ - - Map> entityClassification = new HashMap<>(); - if (CollectionUtils.isNotEmpty(entityVertices)) { - - List propagatedEntities = updateClassificationText(classification, entityVertices); - propagatedEntities.forEach(entity -> entityClassification.computeIfAbsent(entity, key -> new ArrayList<>()).add(classification)); + List propagatedEntities = updateClassificationText(classification, entityVertices); + //Sending audit request for all entities at once + entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification)); - //Sending audit request for all entities at once - for (Map.Entry> atlasEntityListEntry : entityClassification.entrySet()) { - entityChangeNotifier.onClassificationDeletedFromEntity(atlasEntityListEntry.getKey(), atlasEntityListEntry.getValue()); - } - } } - AtlasPerfTracer.log(perf); }