Skip to content

Commit

Permalink
Merge pull request #2689 from atlanhq/rollback/PLT-367
Browse files Browse the repository at this point in the history
Rollback PLT-367
  • Loading branch information
aarshi0301 authored Dec 21, 2023
2 parents 5116e32 + 3faa2a0 commit 35cdeb5
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static EntityAuditActionV2 fromString(String strValue) {
private String eventKey;
private AtlasEntity entity;
private EntityAuditType type;
private List<Map<String, Object>> detail;
private Map<String, Object> detail;

private AtlasEntityHeader entityDetail;
private Map<String, String> headers;
Expand Down Expand Up @@ -227,11 +227,11 @@ public void setType(EntityAuditType type) {
this.type = type;
}

public List<Map<String, Object>> getDetail() {
public Map<String, Object> getDetail() {
return detail;
}

public void setDetail(List<Map<String, Object>> detail) {
public void setDetail(Map<String, Object> detail) {
this.detail = detail;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,7 @@ private EntityAuditSearchResult getResultFromResponse(String responseString) thr
EntityAuditEventV2 event = new EntityAuditEventV2();
event.setEntityId(entityGuid);
event.setAction(EntityAuditEventV2.EntityAuditActionV2.fromString((String) source.get(ACTION)));
if (source.get(DETAIL) != null) {
List<Map<String, Object>> classificationMap;
if (source.get(DETAIL) instanceof java.util.ArrayList) {
classificationMap = (List<Map<String, Object>>) source.get(DETAIL);
} else {
classificationMap = new ArrayList<>();
classificationMap.add((Map<String, Object>) source.get(DETAIL));
}
event.setDetail(classificationMap);
}
event.setDetail((Map<String, Object>) source.get(DETAIL));
event.setUser((String) source.get(USER));
event.setCreated((long) source.get(CREATED));
if (source.get(TIMESTAMP) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,17 @@ public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification>
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");

FixedBufferList<EntityAuditEventV2> classificationsAdded = getAuditEventsList();
Map<AtlasEntity, List<AtlasClassification>> entityClassifications = new HashMap<>();
Map<AtlasEntity, List<AtlasClassification>> propagatedClassifications = new HashMap<>();

getClassificationsFromEntity(classifications, entity, entityClassifications, propagatedClassifications);
emitAddClassificationEvent(classificationsAdded, entityClassifications, propagatedClassifications);
for (AtlasClassification classification : classifications) {
if (entity.getGuid().equals(classification.getEntityGuid())) {
createEvent(classificationsAdded.next(), entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification));
} else {
createEvent(classificationsAdded.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification));
}
}

for (EntityAuditRepository auditRepository: auditRepositories) {
auditRepository.putEventsV2(classificationsAdded.toList());
}
RequestContext.get().endMetricRecord(metric);
}
}
Expand All @@ -222,11 +227,20 @@ public void onClassificationsAdded(List<AtlasEntity> entities, List<AtlasClassif
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
Map<AtlasEntity, List<AtlasClassification>> entityClassifications = new HashMap<>();
Map<AtlasEntity, List<AtlasClassification>> propagatedClassifications = new HashMap<>();
for (AtlasClassification classification : classifications) {
for (AtlasEntity entity : entities) {
if (entity.getGuid().equals(classification.getEntityGuid())) {
createEvent(events.next(), entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification));
} else {
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification));
}
}
}

for (EntityAuditRepository auditRepository: auditRepositories) {
auditRepository.putEventsV2(events.toList());
}

getClassificationsFromEntities(classifications, entities,entityClassifications, propagatedClassifications );
emitAddClassificationEvent(events, entityClassifications, propagatedClassifications);

RequestContext.get().endMetricRecord(metric);
}
Expand All @@ -239,45 +253,21 @@ public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassificatio

FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
String guid = entity.getGuid();

Map<AtlasEntity, List<AtlasClassification>> entityClassifications = new HashMap<>();
Map<AtlasEntity, List<AtlasClassification>> propagatedClassifications = new HashMap<>();

getClassificationsFromEntity(classifications, entity, entityClassifications, propagatedClassifications);

List<AtlasClassification> addedClassification = new ArrayList<>(0);
List<AtlasClassification> deletedClassification = new ArrayList<>(0);
List<AtlasClassification> updatedClassification = new ArrayList<>(0);

if (CollectionUtils.isNotEmpty(propagatedClassifications.get(entity))) {
propagatedClassifications.get(entity).forEach(classification -> {
for (AtlasClassification classification : classifications) {
if (guid.equals(classification.getEntityGuid())) {
createEvent(events.next(), entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification));
} else {
if (isPropagatedClassificationAdded(guid, classification)) {
addedClassification.add(classification);
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification));
} else if (isPropagatedClassificationDeleted(guid, classification)) {
deletedClassification.add(classification);
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + getDeleteClassificationString(classification.getTypeName()));
} else {
updatedClassification.add(classification);
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " + AtlasType.toJson(classification));
}
});
}

if (CollectionUtils.isNotEmpty(addedClassification)) {
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classifications: " + AtlasType.toJson(addedClassification));
}

if (CollectionUtils.isNotEmpty(deletedClassification)) {
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classifications: " + getDeleteClassificationsMap(deletedClassification));
}

if (CollectionUtils.isNotEmpty(updatedClassification)) {
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classifications: " + AtlasType.toJson(updatedClassification));
}

if (entityClassifications.get(entity) != null) {
createEvent(events.next(), entity, CLASSIFICATION_UPDATE, "Updated classifications: " + AtlasType.toJson(entityClassifications.get(entity)));
}
}

for (EntityAuditRepository auditRepository : auditRepositories) {
for (EntityAuditRepository auditRepository: auditRepositories) {
auditRepository.putEventsV2(events.toList());
}

Expand Down Expand Up @@ -311,11 +301,18 @@ public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassificatio
MetricRecorder metric = RequestContext.get().startMetricRecord("onClassificationsDeleted");

FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
Map<AtlasEntity, List<Map<String,Object>>> entityClassifications = new HashMap<>();
Map<AtlasEntity, List<Map<String,Object>>> propagatedClassifications = new HashMap<>();

getClassificationTextFromEntity(classifications, entity, entityClassifications, propagatedClassifications);
emitDeleteClassificationEvent(events, entityClassifications, propagatedClassifications);
for (AtlasClassification classification : classifications) {
if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) {
createEvent(events.next(), entity, CLASSIFICATION_DELETE, "Deleted classification: " + getDeleteClassificationString(classification.getTypeName()));
} else {
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + getDeleteClassificationString(classification.getTypeName()));
}
}

for (EntityAuditRepository auditRepository: auditRepositories) {
auditRepository.putEventsV2(events.toList());
}

RequestContext.get().endMetricRecord(metric);
}
Expand All @@ -328,11 +325,19 @@ public void onClassificationsDeleted(List<AtlasEntity> entities, List<AtlasClass
MetricRecorder metric = RequestContext.get().startMetricRecord("onClassificationsDeleted");
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();

Map<AtlasEntity, List<Map<String,Object>>> entityClassifications = new HashMap<>();
Map<AtlasEntity, List<Map<String,Object>>> propagatedClassifications = new HashMap<>();
for (AtlasClassification classification : classifications) {
for (AtlasEntity entity : entities) {
if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) {
createEvent(events.next(), entity, CLASSIFICATION_DELETE, "Deleted classification: " + getDeleteClassificationString(classification.getTypeName()));
} else {
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + getDeleteClassificationString(classification.getTypeName()));
}
}
}

getClassificationsTextFromEntities(classifications, entities, entityClassifications, propagatedClassifications);
emitDeleteClassificationEvent(events, entityClassifications, propagatedClassifications);
for (EntityAuditRepository auditRepository : auditRepositories) {
auditRepository.putEventsV2(events.toList());
}

RequestContext.get().endMetricRecord(metric);
}
Expand Down Expand Up @@ -771,22 +776,22 @@ public static String getV2AuditPrefix(EntityAuditActionV2 action) {
ret = "Purged: ";
break;
case CLASSIFICATION_ADD:
ret = "Added classifications: ";
ret = "Added classification: ";
break;
case CLASSIFICATION_DELETE:
ret = "Deleted classifications: ";
ret = "Deleted classification: ";
break;
case CLASSIFICATION_UPDATE:
ret = "Updated classifications: ";
ret = "Updated classification: ";
break;
case PROPAGATED_CLASSIFICATION_ADD:
ret = "Added propagated classifications: ";
ret = "Added propagated classification: ";
break;
case PROPAGATED_CLASSIFICATION_DELETE:
ret = "Deleted propagated classifications: ";
ret = "Deleted propagated classification: ";
break;
case PROPAGATED_CLASSIFICATION_UPDATE:
ret = "Updated propagated classifications: ";
ret = "Updated propagated classification: ";
break;
case ENTITY_IMPORT_CREATE:
ret = "Created by import: ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,24 +193,42 @@ public void setClassifications(Map<String, AtlasEntityHeader> map) throws AtlasB

AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("commitChanges.notify");
Map<AtlasClassification, Collection<Object>> deleted = RequestContext.get().getDeletedClassificationAndVertices();
Set<AtlasVertex> allVertices = new HashSet<>();

if (MapUtils.isNotEmpty(deleted)) {
Map<AtlasEntity, List<AtlasClassification>> entityClassification = getEntityClassificationsMapping(deleted);

for (Map.Entry<AtlasEntity, List<AtlasClassification>> atlasEntityListEntry : entityClassification.entrySet()) {
entityChangeNotifier.onClassificationDeletedFromEntity(atlasEntityListEntry.getKey(), atlasEntityListEntry.getValue());
for (AtlasClassification deletedClassification : deleted.keySet()) {
Collection<Object> vertices = deleted.get(deletedClassification);
List<AtlasEntity> propagatedEntities = new ArrayList<>();

for (Object obj : vertices) {
AtlasVertex vertex = (AtlasVertex) obj;
AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), IGNORE_REL);

allVertices.add(vertex);
propagatedEntities.add(entity);
}
entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(deletedClassification));
}
}

Map<AtlasClassification, Collection<Object>> added = RequestContext.get().getAddedClassificationAndVertices();
if (MapUtils.isNotEmpty(added)) {
Map<AtlasEntity, List<AtlasClassification>> entityClassification = getEntityClassificationsMapping(added);

for (Map.Entry<AtlasEntity, List<AtlasClassification>> atlasEntityListEntry : entityClassification.entrySet()) {
entityChangeNotifier.onClassificationAddedToEntity(atlasEntityListEntry.getKey(), atlasEntityListEntry.getValue());
for (AtlasClassification addedClassification : added.keySet()) {
Collection<Object> vertices = added.get(addedClassification);
List<AtlasEntity> propagatedEntities = new ArrayList<>();

for (Object obj : vertices) {
AtlasVertex vertex = (AtlasVertex) obj;
AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), IGNORE_REL);

allVertices.add(vertex);
propagatedEntities.add(entity);
}
entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(addedClassification), false);
}
}

entityGraphMapper.updateClassificationText(null, allVertices);
transactionInterceptHelper.intercept();
RequestContext.get().endMetricRecord(recorder);
RequestContext.get().setDelayTagNotifications(false);
Expand Down Expand Up @@ -291,12 +309,13 @@ private void deleteClassifications(String entityGuid, String typeName, List<Atla
return;
}

String classificationNames = getClassificationNames(list);
try {
entitiesStore.deleteClassifications(entityGuid, list);
} catch (AtlasBaseException e) {
LOG.error("Failed to remove classification association between {}, entity with guid {}", classificationNames, entityGuid);
throw e;
for (AtlasClassification c : list) {
try {
entitiesStore.deleteClassification(entityGuid, c.getTypeName());
} catch (AtlasBaseException e) {
LOG.error("Failed to remove classification association between {}, entity with guid {}", c.getTypeName(), c.getEntityGuid());
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2770,18 +2770,10 @@ public void addClassifications(final EntityMutationContext context, String guid,
RequestContext.get().addAddedClassificationAndVertices(classification, new ArrayList<>(vertices));
}
} else {
/** ToDO : @aarshi: remove this after testing :
* this is dead code it will never be executed as isDelayTagNotifications() always true
**/
Map<AtlasEntity, List<AtlasClassification>> entityClassification = new HashMap<>();
for (AtlasClassification classification : addedClassifications.keySet()) {
Set<AtlasVertex> vertices = addedClassifications.get(classification);
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, vertices);
propagatedEntities.forEach(entity -> entityClassification.computeIfAbsent(entity, key -> new ArrayList<>()).add(classification));
}

for (Map.Entry<AtlasEntity, List<AtlasClassification>> atlasEntityListEntry : entityClassification.entrySet()) {
entityChangeNotifier.onClassificationAddedToEntity(atlasEntityListEntry.getKey(), atlasEntityListEntry.getValue());
entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification), false);
}
}

Expand Down Expand Up @@ -3036,23 +3028,11 @@ public void deleteClassification(String entityGuid, String classificationName) t
if (RequestContext.get().isDelayTagNotifications()) {
RequestContext.get().addDeletedClassificationAndVertices(classification, new ArrayList<>(entityVertices));
} else if (CollectionUtils.isNotEmpty(entityVertices)) {
/** ToDO : @aarshi: remove this after testing :
* this is dead code it will never be executed as isDelayTagNotifications() always true
**/

Map<AtlasEntity, List<AtlasClassification>> entityClassification = new HashMap<>();
if (CollectionUtils.isNotEmpty(entityVertices)) {

List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entityVertices);
propagatedEntities.forEach(entity -> entityClassification.computeIfAbsent(entity, key -> new ArrayList<>()).add(classification));
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entityVertices);
//Sending audit request for all entities at once
entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification));

//Sending audit request for all entities at once
for (Map.Entry<AtlasEntity, List<AtlasClassification>> atlasEntityListEntry : entityClassification.entrySet()) {
entityChangeNotifier.onClassificationDeletedFromEntity(atlasEntityListEntry.getKey(), atlasEntityListEntry.getValue());
}
}
}

AtlasPerfTracer.log(perf);
}

Expand Down

0 comments on commit 35cdeb5

Please sign in to comment.