Skip to content

Commit

Permalink
Merge branch 'master' into dg-1322-migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
PRATHAM2002-DS authored Jun 17, 2024
2 parents 4491938 + 6efb355 commit 0960758
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ public final class Constants {
public static final String REL_DOMAIN_TO_STAKEHOLDERS = "data_domain_stakeholders";
public static final String REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS = "stakeholder_title_stakeholders";

public static final String REL_DATA_PRODUCT_TO_OUTPUT_PORTS = "data_products_output_ports";
public static final String REL_DATA_PRODUCT_TO_INPUT_PORTS = "data_products_input_ports";

public static final String INPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.inputPortDataProducts";
public static final String OUTPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.outputPortDataProducts";

/**
* SQL property keys.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;

import java.util.ArrayList;
/**
* A graph query that runs directly against a particular index.
*
Expand Down Expand Up @@ -103,7 +103,7 @@ interface Result<V, E> {
DirectIndexQueryResult<V, E> getCollapseVertices(String key);

Map<String, List<String>> getHighLights();

ArrayList<Object> getSort();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,11 @@ public DirectIndexQueryResult<AtlasJanusVertex, AtlasJanusEdge> getCollapseVerti
public Map<String, List<String>> getHighLights() {
return new HashMap<>();
}

@Override
public ArrayList<Object> getSort() {
return new ArrayList<>();
}
}


Expand Down Expand Up @@ -609,6 +614,15 @@ public Map<String, List<String>> getHighLights() {
}
return new HashMap<>();
}

@Override
public ArrayList<Object> getSort() {
Object sort = this.hit.get("sort");
if (Objects.nonNull(sort) && sort instanceof List) {
return (ArrayList<Object>) sort;
}
return new ArrayList<>();
}
}

public class AsyncQueryResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,10 @@ public DirectIndexQueryResult<AtlasJanusVertex, AtlasJanusEdge> getCollapseVerti
public Map<String, List<String>> getHighLights() {
return new HashMap<>();
}

@Override
public ArrayList<Object> getSort() {
return new ArrayList<>();
}
}
}
4 changes: 2 additions & 2 deletions intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ public enum AtlasErrorCode {
TASK_INVALID_PARAMETERS(400, "ATLAS-400-00-111", "Invalid parameters for task {0}"),
TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported"),

PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED(400, "ATLAS-400-00-113", "Exceeded limit of maximum allowed assets across policies for a Persona: Limit: {0}, assets: {1}");

PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED(400, "ATLAS-400-00-113", "Exceeded limit of maximum allowed assets across policies for a Persona: Limit: {0}, assets: {1}"),
ADMIN_LIST_SHOULD_NOT_BE_EMPTY(400, "ATLAS-400-00-114", "Admin list should not be empty for type {0}");

private String errorCode;
private String errorMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.LinkedHashMap;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand All @@ -59,7 +60,7 @@ public class AtlasSearchResult implements Serializable {
private Map<String, Object> aggregations;
private Map<String,Double> searchScore;

private Map<String, ElasticsearchMetadata> searchMetadata;
private LinkedHashMap<String, ElasticsearchMetadata> searchMetadata;



Expand Down Expand Up @@ -162,13 +163,26 @@ public Map<String, ElasticsearchMetadata> getSearchMetadata() {

public void addHighlights(String guid, Map<String, List<String>> highlights) {
if(MapUtils.isEmpty(this.searchMetadata)) {
this.searchMetadata = new HashMap<>();
this.searchMetadata = new LinkedHashMap<>();
}
ElasticsearchMetadata v = this.searchMetadata.getOrDefault(guid, new ElasticsearchMetadata());
v.addHighlights(highlights);
this.searchMetadata.put(guid, v);
}

public void addSort(String guid, ArrayList sort) {
if(MapUtils.isEmpty(this.searchMetadata)) {
this.searchMetadata = new LinkedHashMap<>();
}
ElasticsearchMetadata sortMetadata = this.searchMetadata.getOrDefault(guid, new ElasticsearchMetadata());
sortMetadata.addSort(sort);
if (this.searchMetadata.containsKey(guid)) {
this.searchMetadata.replace(guid, sortMetadata);
} else {
this.searchMetadata.put(guid, sortMetadata);
}
}

@Override
public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities, nextMarker); }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package org.apache.atlas.model.discovery;

import com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.commons.collections.MapUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;

public class ElasticsearchMetadata {

private Map<String, List<String>> highlights;

@JsonInclude(JsonInclude.Include.NON_NULL)
private ArrayList<Object> sort;

public Map<String, List<String>> getHighlights() {
return highlights;
}
Expand All @@ -23,6 +28,15 @@ public void addHighlights(Map<String, List<String>> highlights) {
}
}

public Object getSort() { return sort; }

public void addSort(ArrayList<Object> sort) {
if (sort.isEmpty()) {
this.sort = null;
} else {
this.sort = sort;
}
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class SearchParams {
Async async = new Async();
boolean showHighlights;

boolean showSearchMetadata;

public String getQuery() {
return getQuery();
}
Expand Down Expand Up @@ -154,10 +156,14 @@ public String getSearchInput() {
return this.requestMetadata.getSearchInput();
}

public boolean isShowHighlights() {
public boolean getShowHighlights() {
return showHighlights;
}

public boolean getShowSearchMetadata() {
return showSearchMetadata;
}


static class RequestMetadata {
private String searchInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,8 +1098,10 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i
header.setCollapse(collapse);
}
}

if (searchParams.isShowHighlights()) {
if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,7 @@
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.HOME_ID_KEY;
import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_DOMAINS;
import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_PRODUCTS;
import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_STAKEHOLDERS;
import static org.apache.atlas.repository.Constants.REL_POLICY_TO_ACCESS_CONTROL;
import static org.apache.atlas.repository.Constants.REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS;
import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;
Expand Down Expand Up @@ -116,6 +106,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
add(REL_DOMAIN_TO_STAKEHOLDERS);
add(REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS);
add(REL_POLICY_TO_ACCESS_CONTROL);
add(REL_DATA_PRODUCT_TO_OUTPUT_PORTS);
add(REL_DATA_PRODUCT_TO_INPUT_PORTS);
}};

public enum RelationshipMutation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.INPUT_PORT_GUIDS_ATTR;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.OUTPUT_PORT_GUIDS_ATTR;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.*;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
Expand Down Expand Up @@ -169,6 +171,7 @@ public class EntityGraphMapper {
private static final String TYPE_GLOSSARY= "AtlasGlossary";
private static final String TYPE_CATEGORY= "AtlasGlossaryCategory";
private static final String TYPE_TERM = "AtlasGlossaryTerm";
private static final String TYPE_PRODUCT = "DataProduct";
private static final String TYPE_PROCESS = "Process";
private static final String ATTR_MEANINGS = "meanings";
private static final String ATTR_ANCHOR = "anchor";
Expand Down Expand Up @@ -1904,7 +1907,7 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co
AtlasAttribute inverseRefAttribute = attribute.getInverseRefAttribute();
Cardinality cardinality = attribute.getAttributeDef().getCardinality();
List<AtlasEdge> removedElements = new ArrayList<>();
List<Object> newElementsCreated = new ArrayList<>();
List<Object> newElementsCreated = new ArrayList<>();
List<Object> allArrayElements = null;
List<Object> currentElements;
boolean deleteExistingRelations = shouldDeleteExistingRelations(ctx, attribute);
Expand Down Expand Up @@ -2002,6 +2005,11 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co
case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, removedElements);
break;

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, removedElements);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2087,6 +2095,11 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext
case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, new ArrayList<>(0));
break;

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, null);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2154,6 +2167,11 @@ public List removeArrayValue(AttributeMutationContext ctx, EntityMutationContext
case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), new ArrayList<>(0), removedElements);
break;

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, null , removedElements);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2191,6 +2209,40 @@ private void addEdgesToContext(String guid, List<Object> newElementsCreated, Lis
}
}

private void addInternalProductAttr(AttributeMutationContext ctx, List<Object> createdElements, List<AtlasEdge> deletedElements) throws AtlasBaseException {
MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("addInternalProductAttrForAppend");
AtlasVertex toVertex = ctx.getReferringVertex();
String toVertexType = getTypeName(toVertex);

if (CollectionUtils.isEmpty(createdElements) && CollectionUtils.isEmpty(deletedElements)){
RequestContext.get().endMetricRecord(metricRecorder);
return;
}

if (TYPE_PRODUCT.equals(toVertexType)) {
String attrName = ctx.getAttribute().getRelationshipEdgeLabel().equals(OUTPUT_PORT_PRODUCT_EDGE_LABEL)
? OUTPUT_PORT_GUIDS_ATTR
: INPUT_PORT_GUIDS_ATTR;

addOrRemoveDaapInternalAttr(toVertex, attrName, createdElements, deletedElements);
}else{
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Can not update product relations while updating any asset");
}
RequestContext.get().endMetricRecord(metricRecorder);
}

private void addOrRemoveDaapInternalAttr(AtlasVertex toVertex, String internalAttr, List<Object> createdElements, List<AtlasEdge> deletedElements) {
if (CollectionUtils.isNotEmpty(createdElements)) {
List<String> addedGuids = createdElements.stream().map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
addedGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(toVertex, internalAttr, guid));
}

if (CollectionUtils.isNotEmpty(deletedElements)) {
List<String> removedGuids = deletedElements.stream().map(x -> x.getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
removedGuids.forEach(guid -> AtlasGraphUtilsV2.removeItemFromListPropertyValue(toVertex, internalAttr, guid));
}
}

private boolean shouldDeleteExistingRelations(AttributeMutationContext ctx, AtlasAttribute attribute) {
boolean ret = false;
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(AtlasGraphUtilsV2.getTypeName(ctx.getReferringVertex()));
Expand Down
Loading

0 comments on commit 0960758

Please sign in to comment.