Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1442 Modification of the PreProcessor for Custom Sort - Bulk Update Edge Cases #3265

Merged
merged 20 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
49a6e6d
Init commmit, added lexorank generation util lib and logic for append…
hr2904 May 22, 2024
7f36434
Added method to append lexicographicSortOrder attribute to any Glossa…
hr2904 May 22, 2024
1bcff4e
Added LexoRank Validation method, with tentative rebalancing trigger …
hr2904 May 28, 2024
f4a38dc
Corrected a method rename
hr2904 May 28, 2024
8c1c21a
Added a check in validateLexoRank which checks if another entity with…
hr2904 May 28, 2024
30b4d47
Added a check , such that if bulk request is from migration, it will …
hr2904 May 29, 2024
5d06cba
Modified ES query.
hr2904 Jun 2, 2024
378859f
Modified an edge statement such that, when a new lexorank for cat is …
hr2904 Jun 3, 2024
ffd3e73
Reverted the check, instead added padded new offset for terms.
hr2904 Jun 3, 2024
de5965c
fixed the caching logic, by adding the bifurcation for terms and cate…
hr2904 Jun 4, 2024
7137d99
Fixed PR comments
hr2904 Jun 12, 2024
1587489
Added caching for same ranks in same request, for prevention of dupli…
hr2904 Jun 12, 2024
e28d7ac
Fixed a minor bug where an empty lexo attribute in update call was re…
hr2904 Jun 14, 2024
f9838df
Fixed PR comments
hr2904 Jun 18, 2024
586304b
Reverted the AbstractGlossaryPreProcessor file , previously edited.
hr2904 Jun 18, 2024
7ab9223
Fixed PR comments.
hr2904 Jun 20, 2024
cb56a94
fixed attribute name in dsl query
hr2904 Jun 25, 2024
1bd6f9b
Fixed a caching logic while checking for duplicate ranks.
hr2904 Jun 28, 2024
0d139ea
Removed term-category bifurcation as we are now allowing term-categor…
hr2904 Jul 5, 2024
075f5ae
Removed an unneeded constant
hr2904 Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions intg/src/main/java/org/apache/atlas/type/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ public final class Constants {
public static final String GLOSSARY_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "glossary");
public static final String CATEGORIES_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "categories");
public static final String CATEGORIES_PARENT_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "parentCategory");

public static final String MEANINGS_TEXT_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "meaningsText");
public static final String MEANING_NAMES_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "meaningNames");
public static final String HAS_LINEAGE = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "hasLineage");
public static final String HAS_LINEAGE_VALID = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "hasLineageValid");
public static final String LEXICOGRAPHICAL_SORT_ORDER = "lexicographicalSortOrder";

//Classification-Only System Attributes
public static final String CLASSIFICATION_ENTITY_STATUS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityStatus");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public abstract class GlossaryUtils {
public static final String TERM_ASSIGNMENT_ATTR_SOURCE = "source";

static final String ATLAS_GLOSSARY_TYPENAME = "AtlasGlossary";
static final String ATLAS_GLOSSARY_TERM_TYPENAME = "AtlasGlossaryTerm";
static final String ATLAS_GLOSSARY_CATEGORY_TYPENAME = "AtlasGlossaryCategory";
public static final String ATLAS_GLOSSARY_TERM_TYPENAME = "AtlasGlossaryTerm";
public static final String ATLAS_GLOSSARY_CATEGORY_TYPENAME = "AtlasGlossaryCategory";

public static final String NAME = "name";
public static final String QUALIFIED_NAME = "qualifiedName";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,7 @@ public List<PreProcessor> getPreProcessor(String typeName) {

switch (typeName) {
case ATLAS_GLOSSARY_ENTITY_TYPE:
preProcessors.add(new GlossaryPreProcessor(typeRegistry, entityRetriever));
preProcessors.add(new GlossaryPreProcessor(typeRegistry, entityRetriever, graph));
break;

case ATLAS_GLOSSARY_TERM_ENTITY_TYPE:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor;
nikhilbonte21 marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.IndexSearchParams;
Expand All @@ -14,18 +15,22 @@
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.NanoIdUtils;
import org.apache.atlas.util.lexoRank.LexoRank;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.atlas.repository.Constants.QUERY_COLLECTION_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.QUALIFIED_NAME;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.glossary.GlossaryUtils.ATLAS_GLOSSARY_CATEGORY_TYPENAME;
import static org.apache.atlas.glossary.GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;
import static org.apache.atlas.type.Constants.LEXICOGRAPHICAL_SORT_ORDER;

public class PreProcessorUtils {
private static final Logger LOG = LoggerFactory.getLogger(PreProcessorUtils.class);
Expand All @@ -39,6 +44,7 @@ public class PreProcessorUtils {
public static final String CATEGORY_CHILDREN = "childrenCategories";
public static final String GLOSSARY_TERM_REL_TYPE = "AtlasGlossaryTermAnchor";
public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor";
public static final String INIT_LEXORANK_OFFSET = "0|100000:";

//DataMesh models constants
public static final String PARENT_DOMAIN_REL_TYPE = "parentDomain";
Expand All @@ -52,6 +58,8 @@ public class PreProcessorUtils {

public static final String DATA_PRODUCT_EDGE_LABEL = "__DataDomain.dataProducts";
public static final String DOMAIN_PARENT_EDGE_LABEL = "__DataDomain.subDomains";
public static final String STAKEHOLDER_EDGE_LABEL = "__DataDomain.stakeholders";


public static final String PARENT_DOMAIN_QN_ATTR = "parentDomainQualifiedName";
public static final String SUPER_DOMAIN_QN_ATTR = "superDomainQualifiedName";
Expand Down Expand Up @@ -85,6 +93,13 @@ public enum MigrationStatus {

public static final String CHILDREN_QUERIES = "__Namespace.childrenQueries";
public static final String CHILDREN_FOLDERS = "__Namespace.childrenFolders";
public static final int REBALANCING_TRIGGER = 119;
public static final int PRE_DELIMITER_LENGTH = 9;
public static final String LEXORANK_HARD_LIMIT = "" + (256 - PRE_DELIMITER_LENGTH);
public static final String LEXORANK_VALID_REGEX = "^0\\|[0-9a-z]{6}:(?:[0-9a-z]{0," + LEXORANK_HARD_LIMIT + "})?$";
public static final Set<String> ATTRIBUTES = new HashSet<>(Arrays.asList("lexicographicalSortOrder"));

public static final Pattern LEXORANK_VALIDITY_PATTERN = Pattern.compile(LEXORANK_VALID_REGEX);

public static String getUUID(){
return NanoIdUtils.randomNanoId();
Expand Down Expand Up @@ -202,4 +217,174 @@ public static void verifyDuplicateAssetByName(String typeName, String assetName,
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, errorMessage);
}
}

public static void isValidLexoRank(String inputLexorank, String glossaryQualifiedName, String parentQualifiedName, EntityDiscoveryService discovery) throws AtlasBaseException {

Matcher matcher = LEXORANK_VALIDITY_PATTERN.matcher(inputLexorank);

if(!matcher.matches() || StringUtils.isEmpty(inputLexorank)){
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid value for lexicographicalSortOrder attribute");
}
// TODO : Need to discuss either to remove this after migration is successful on all tenants and custom-sort is successfully GA or keep it for re-balancing WF
Boolean requestFromMigration = RequestContext.get().getRequestContextHeaders().getOrDefault("x-atlan-request-id", "").contains("custom-sort-migration");
if(requestFromMigration) {
return;
}
Map<String, String> lexoRankCache = RequestContext.get().getLexoRankCache();
if(Objects.isNull(lexoRankCache)) {
lexoRankCache = new HashMap<>();
}
String cacheKey = glossaryQualifiedName + "-" + parentQualifiedName;
if(lexoRankCache.containsKey(cacheKey) && lexoRankCache.get(cacheKey).equals(inputLexorank)){
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Duplicate value for the attribute :" + LEXICOGRAPHICAL_SORT_ORDER +" found");
}
Map<String, Object> dslQuery = createDSLforCheckingPreExistingLexoRank(inputLexorank, glossaryQualifiedName, parentQualifiedName);
List<AtlasEntityHeader> assetsWithDuplicateRank = new ArrayList<>();
try {
IndexSearchParams searchParams = new IndexSearchParams();
searchParams.setDsl(dslQuery);
assetsWithDuplicateRank = discovery.directIndexSearch(searchParams).getEntities();
} catch (AtlasBaseException e) {
LOG.error("IndexSearch Error Occured : " + e.getMessage());
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Something went wrong with IndexSearch");
}

if (!CollectionUtils.isEmpty(assetsWithDuplicateRank)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Duplicate Lexorank found");
}

lexoRankCache.put(cacheKey, inputLexorank);
RequestContext.get().setLexoRankCache(lexoRankCache);
// TODO : Add the rebalancing logic here
// int colonIndex = inputLexorank.indexOf(":");
// if (colonIndex != -1 && inputLexorank.substring(colonIndex + 1).length() >= REBALANCING_TRIGGER) {
// Rebalancing trigger
// }
}

public static void assignNewLexicographicalSortOrder(AtlasEntity entity, String glossaryQualifiedName, String parentQualifiedName, EntityDiscoveryService discovery) throws AtlasBaseException{
Map<String, String> lexoRankCache = RequestContext.get().getLexoRankCache();

if(Objects.isNull(lexoRankCache)) {
lexoRankCache = new HashMap<>();
}
String lexoRank = "";
String lastLexoRank = "";
String cacheKey = glossaryQualifiedName + "-" + parentQualifiedName;

if(lexoRankCache.containsKey(cacheKey)) {
lastLexoRank = lexoRankCache.get(cacheKey);
} else {

List<AtlasEntityHeader> categories = null;
Map<String, Object> dslQuery = generateDSLQueryForLastChild(glossaryQualifiedName, parentQualifiedName);
try {
IndexSearchParams searchParams = new IndexSearchParams();
searchParams.setAttributes(ATTRIBUTES);
searchParams.setDsl(dslQuery);
categories = discovery.directIndexSearch(searchParams).getEntities();
} catch (AtlasBaseException e) {
e.printStackTrace();
throw new AtlasBaseException("Something went wrong in assigning lexicographicalSortOrder");
}

if (CollectionUtils.isNotEmpty(categories)) {
AtlasEntityHeader category = categories.get(0);
String lexicographicalSortOrder = (String) category.getAttribute(LEXICOGRAPHICAL_SORT_ORDER);
if (StringUtils.isNotEmpty(lexicographicalSortOrder)) {
lastLexoRank = lexicographicalSortOrder;
} else {
lastLexoRank = INIT_LEXORANK_OFFSET;
}
} else {
lastLexoRank = INIT_LEXORANK_OFFSET;
}
}

LexoRank parsedLexoRank = LexoRank.parse(lastLexoRank);
LexoRank nextLexoRank = parsedLexoRank.genNext().genNext();
lexoRank = nextLexoRank.toString();

entity.setAttribute(LEXICOGRAPHICAL_SORT_ORDER, lexoRank);
lexoRankCache.put(cacheKey, lexoRank);
RequestContext.get().setLexoRankCache(lexoRankCache);
}

public static Map<String, Object> createDSLforCheckingPreExistingLexoRank(String lexoRank, String glossaryQualifiedName, String parentQualifiedName) {

Map<String, Object> boolMap = buildBoolQueryDuplicateLexoRank(lexoRank, glossaryQualifiedName, parentQualifiedName);

Map<String, Object> dsl = new HashMap<>();
dsl.put("from", 0);
dsl.put("size", 1);
dsl.put("query", mapOf("bool", boolMap));

return dsl;
}

private static Map<String, Object> buildBoolQueryDuplicateLexoRank(String lexoRank, String glossaryQualifiedName, String parentQualifiedName) {
Map<String, Object> boolFilter = new HashMap<>();
List<Map<String, Object>> mustArray = new ArrayList<>();
mustArray.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustArray.add(mapOf("term", mapOf(LEXICOGRAPHICAL_SORT_ORDER, lexoRank)));
if(StringUtils.isNotEmpty(glossaryQualifiedName)) {
mustArray.add(mapOf("terms", mapOf("__typeName.keyword", Arrays.asList(ATLAS_GLOSSARY_TERM_TYPENAME, ATLAS_GLOSSARY_CATEGORY_TYPENAME))));
mustArray.add(mapOf("term", mapOf("__glossary", glossaryQualifiedName)));
if(StringUtils.isEmpty(parentQualifiedName)) {
boolFilter.put("must_not", Arrays.asList(mapOf("exists", mapOf("field", "__categories")),mapOf("exists", mapOf("field", "__parentCategory"))));
} else {
List<Map<String, Object>> shouldParentArray = new ArrayList<>();
shouldParentArray.add(mapOf("term", mapOf("__categories", parentQualifiedName)));
shouldParentArray.add(mapOf("term", mapOf("__parentCategory", parentQualifiedName)));
mustArray.add(mapOf("bool",mapOf("should", shouldParentArray)));
}
} else{
mustArray.add(mapOf("terms", mapOf("__typeName.keyword", Arrays.asList(ATLAS_GLOSSARY_ENTITY_TYPE))));
}

boolFilter.put("must", mustArray);

return boolFilter;
}

public static Map<String, Object> generateDSLQueryForLastChild(String glossaryQualifiedName, String parentQualifiedName) {

Map<String, Object> sortKeyOrder = mapOf(LEXICOGRAPHICAL_SORT_ORDER, mapOf("order", "desc"));

Object[] sortArray = {sortKeyOrder};

Map<String, Object> boolMap = buildBoolQuery(glossaryQualifiedName, parentQualifiedName);

Map<String, Object> dsl = new HashMap<>();
dsl.put("from", 0);
dsl.put("size", 1);
dsl.put("sort", sortArray);
dsl.put("query", mapOf("bool", boolMap));

return dsl;
}

private static Map<String, Object> buildBoolQuery(String glossaryQualifiedName, String parentQualifiedName) {
Map<String, Object> boolFilter = new HashMap<>();
List<Map<String, Object>> mustArray = new ArrayList<>();
mustArray.add(mapOf("term", mapOf("__state", "ACTIVE")));
if(StringUtils.isNotEmpty(glossaryQualifiedName)) {
mustArray.add(mapOf("terms", mapOf("__typeName.keyword", Arrays.asList("AtlasGlossaryTerm", "AtlasGlossaryCategory"))));
mustArray.add(mapOf("term", mapOf("__glossary", glossaryQualifiedName)));
if(StringUtils.isEmpty(parentQualifiedName)) {
boolFilter.put("must_not", Arrays.asList(mapOf("exists", mapOf("field", "__categories")),mapOf("exists", mapOf("field", "__parentCategory"))));
} else {
List<Map<String, Object>> shouldParentArray = new ArrayList<>();
shouldParentArray.add(mapOf("term", mapOf("__categories", parentQualifiedName)));
shouldParentArray.add(mapOf("term", mapOf("__parentCategory", parentQualifiedName)));
mustArray.add(mapOf("bool",mapOf("should", shouldParentArray)));
}
} else{
mustArray.add(mapOf("terms", mapOf("__typeName.keyword", Arrays.asList("AtlasGlossary"))));
}

boolFilter.put("must", mustArray);

return boolFilter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -70,9 +71,7 @@
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;
import static org.apache.atlas.type.Constants.CATEGORIES_PARENT_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.CATEGORIES_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.GLOSSARY_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.*;

public class CategoryPreProcessor extends AbstractGlossaryPreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(CategoryPreProcessor.class);
Expand Down Expand Up @@ -117,6 +116,7 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
private void processCreateCategory(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateCategory");
String catName = (String) entity.getAttribute(NAME);
String parentQname = null;

if (StringUtils.isEmpty(catName) || isNameInvalid(catName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME);
Expand All @@ -126,6 +126,16 @@ private void processCreateCategory(AtlasEntity entity, AtlasVertex vertex) throw
categoryExists(catName, glossaryQualifiedName);
validateParent(glossaryQualifiedName);

if (parentCategory != null) {
parentQname = (String) parentCategory.getAttribute(QUALIFIED_NAME);
}
String lexicographicalSortOrder = (String) entity.getAttribute(LEXICOGRAPHICAL_SORT_ORDER);
if(StringUtils.isEmpty(lexicographicalSortOrder)){
assignNewLexicographicalSortOrder(entity,glossaryQualifiedName, parentQname, this.discovery);
} else {
isValidLexoRank(lexicographicalSortOrder, glossaryQualifiedName, parentQname, this.discovery);
}

entity.setAttribute(QUALIFIED_NAME, createQualifiedName(vertex));
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)),
"create entity: type=", entity.getTypeName());
Expand All @@ -151,6 +161,17 @@ private void processUpdateCategory(AtlasEntity entity, AtlasVertex vertex) throw

String newGlossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME);

String lexicographicalSortOrder = (String) entity.getAttribute(LEXICOGRAPHICAL_SORT_ORDER);
String parentQname = "";
if(Objects.nonNull(parentCategory)) {
parentQname = (String) parentCategory.getAttribute(QUALIFIED_NAME);
}
if(StringUtils.isNotEmpty(lexicographicalSortOrder)) {
isValidLexoRank(lexicographicalSortOrder, newGlossaryQualifiedName, parentQname, this.discovery);
} else {
entity.removeAttribute(LEXICOGRAPHICAL_SORT_ORDER);
}

if (!currentGlossaryQualifiedName.equals(newGlossaryQualifiedName)){
//Auth check
isAuthorized(currentGlossaryHeader, anchor);
Expand Down Expand Up @@ -489,4 +510,5 @@ private String createQualifiedName(AtlasVertex vertex) {

return getUUID() + "@" + anchor.getAttribute(QUALIFIED_NAME);
}

}
Loading
Loading