Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync master into staging #3282

Merged
merged 10 commits into from
Jun 26, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ public final class Constants {
public static final String REL_DOMAIN_TO_STAKEHOLDERS = "data_domain_stakeholders";
public static final String REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS = "stakeholder_title_stakeholders";

public static final String REL_DATA_PRODUCT_TO_OUTPUT_PORTS = "data_products_output_ports";
public static final String REL_DATA_PRODUCT_TO_INPUT_PORTS = "data_products_input_ports";

public static final String INPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.inputPortDataProducts";
public static final String OUTPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.outputPortDataProducts";

/**
* SQL property keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;

import java.util.ArrayList;
/**
* A graph query that runs directly against a particular index.
*
Expand Down Expand Up @@ -103,7 +103,7 @@ interface Result<V, E> {
DirectIndexQueryResult<V, E> getCollapseVertices(String key);

Map<String, List<String>> getHighLights();

ArrayList<Object> getSort();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,11 @@ public DirectIndexQueryResult<AtlasJanusVertex, AtlasJanusEdge> getCollapseVerti
public Map<String, List<String>> getHighLights() {
return new HashMap<>();
}

@Override
public ArrayList<Object> getSort() {
return new ArrayList<>();
}
}


Expand Down Expand Up @@ -609,6 +614,15 @@ public Map<String, List<String>> getHighLights() {
}
return new HashMap<>();
}

@Override
public ArrayList<Object> getSort() {
Object sort = this.hit.get("sort");
if (Objects.nonNull(sort) && sort instanceof List) {
return (ArrayList<Object>) sort;
}
return new ArrayList<>();
}
}

public class AsyncQueryResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,10 @@ public DirectIndexQueryResult<AtlasJanusVertex, AtlasJanusEdge> getCollapseVerti
public Map<String, List<String>> getHighLights() {
return new HashMap<>();
}

@Override
public ArrayList<Object> getSort() {
return new ArrayList<>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.LinkedHashMap;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand All @@ -59,7 +60,7 @@ public class AtlasSearchResult implements Serializable {
private Map<String, Object> aggregations;
private Map<String,Double> searchScore;

private Map<String, ElasticsearchMetadata> searchMetadata;
private LinkedHashMap<String, ElasticsearchMetadata> searchMetadata;



Expand Down Expand Up @@ -162,13 +163,26 @@ public Map<String, ElasticsearchMetadata> getSearchMetadata() {

public void addHighlights(String guid, Map<String, List<String>> highlights) {
if(MapUtils.isEmpty(this.searchMetadata)) {
this.searchMetadata = new HashMap<>();
this.searchMetadata = new LinkedHashMap<>();
}
ElasticsearchMetadata v = this.searchMetadata.getOrDefault(guid, new ElasticsearchMetadata());
v.addHighlights(highlights);
this.searchMetadata.put(guid, v);
}

public void addSort(String guid, ArrayList sort) {
if(MapUtils.isEmpty(this.searchMetadata)) {
this.searchMetadata = new LinkedHashMap<>();
}
ElasticsearchMetadata sortMetadata = this.searchMetadata.getOrDefault(guid, new ElasticsearchMetadata());
sortMetadata.addSort(sort);
if (this.searchMetadata.containsKey(guid)) {
this.searchMetadata.replace(guid, sortMetadata);
} else {
this.searchMetadata.put(guid, sortMetadata);
}
}

@Override
public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities, nextMarker); }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package org.apache.atlas.model.discovery;

import com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.commons.collections.MapUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;

public class ElasticsearchMetadata {

private Map<String, List<String>> highlights;

@JsonInclude(JsonInclude.Include.NON_NULL)
private ArrayList<Object> sort;

public Map<String, List<String>> getHighlights() {
return highlights;
}
Expand All @@ -23,6 +28,15 @@ public void addHighlights(Map<String, List<String>> highlights) {
}
}

public Object getSort() { return sort; }

public void addSort(ArrayList<Object> sort) {
if (sort.isEmpty()) {
this.sort = null;
} else {
this.sort = sort;
}
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class SearchParams {
Async async = new Async();
boolean showHighlights;

boolean showSearchMetadata;

public String getQuery() {
return getQuery();
}
Expand Down Expand Up @@ -154,10 +156,14 @@ public String getSearchInput() {
return this.requestMetadata.getSearchInput();
}

public boolean isShowHighlights() {
public boolean getShowHighlights() {
return showHighlights;
}

public boolean getShowSearchMetadata() {
return showSearchMetadata;
}


static class RequestMetadata {
private String searchInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,8 +1098,10 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i
header.setCollapse(collapse);
}
}

if (searchParams.isShowHighlights()) {
if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,7 @@
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.HOME_ID_KEY;
import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_DOMAINS;
import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_PRODUCTS;
import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_STAKEHOLDERS;
import static org.apache.atlas.repository.Constants.REL_POLICY_TO_ACCESS_CONTROL;
import static org.apache.atlas.repository.Constants.REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS;
import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;
Expand Down Expand Up @@ -116,6 +106,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
add(REL_DOMAIN_TO_STAKEHOLDERS);
add(REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS);
add(REL_POLICY_TO_ACCESS_CONTROL);
add(REL_DATA_PRODUCT_TO_OUTPUT_PORTS);
add(REL_DATA_PRODUCT_TO_INPUT_PORTS);
}};

public enum RelationshipMutation {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.apache.atlas.repository.store.graph.v2;

import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;

public class DataProductInputsOutputsMigrationService {

private static final Logger LOG = LoggerFactory.getLogger(DataProductInputsOutputsMigrationService.class);

private final EntityGraphRetriever entityRetriever;


private String productGuid;
private final TransactionInterceptHelper transactionInterceptHelper;

public DataProductInputsOutputsMigrationService(EntityGraphRetriever entityRetriever, String productGuid, TransactionInterceptHelper transactionInterceptHelper) {
this.entityRetriever = entityRetriever;
this.transactionInterceptHelper = transactionInterceptHelper;
this.productGuid = productGuid;
}

public void migrateProduct() throws Exception {
try {
AtlasVertex productVertex = entityRetriever.getEntityVertex(this.productGuid);

boolean isCommitRequired = migrateAttr(productVertex);
if (isCommitRequired){
LOG.info("Committing changes for Product: {}", this.productGuid);
commitChanges();
}
else {
LOG.info("No changes to commit for Product: {} as no migration needed", this.productGuid);
}

} catch (Exception e) {
LOG.error("Error while migration inputs/outputs for Dataproduct: {}", this.productGuid, e);
throw e;
}
}

private boolean migrateAttr(AtlasVertex vertex) throws AtlasBaseException {
boolean isCommitRequired = false;

List<String> outputPortsRelationGuids = getAssetGuids(vertex, OUTPUT_PORT_PRODUCT_EDGE_LABEL);
List<String> outputPortGuidsAttr = vertex.getMultiValuedProperty(OUTPUT_PORT_GUIDS_ATTR, String.class);


List<String> inputPortsRelationGuids = getAssetGuids(vertex, INPUT_PORT_PRODUCT_EDGE_LABEL);
List<String> inputPortGuidsAttr = vertex.getMultiValuedProperty(INPUT_PORT_GUIDS_ATTR, String.class);

if(!CollectionUtils.isEqualCollection(outputPortsRelationGuids, outputPortGuidsAttr)) {
LOG.info("Migrating outputPort guid attribute: {} for Product: {}", OUTPUT_PORT_GUIDS_ATTR, this.productGuid);
addInternalAttr(vertex, OUTPUT_PORT_GUIDS_ATTR, outputPortsRelationGuids);
isCommitRequired = true;
}

if(!CollectionUtils.isEqualCollection(inputPortsRelationGuids, inputPortGuidsAttr)) {
LOG.info("Migrating inputPort guid attribute: {} for Product: {}", INPUT_PORT_GUIDS_ATTR, this.productGuid);
addInternalAttr(vertex, INPUT_PORT_GUIDS_ATTR, inputPortsRelationGuids);
isCommitRequired = true;
}

return isCommitRequired;
}

public void commitChanges() throws AtlasBaseException {
try {
transactionInterceptHelper.intercept();
LOG.info("Committed a entity to the graph");
} catch (Exception e){
LOG.error("Failed to commit asset: ", e);
throw e;
}
}

private List<String> getAssetGuids(AtlasVertex vertex, String edgeLabel) throws AtlasBaseException {
List<String> guids = new ArrayList<>();
Iterator<AtlasVertex> activeParent = GraphHelper.getActiveParentVertices(vertex, edgeLabel);
while(activeParent.hasNext()) {
AtlasVertex child = activeParent.next();
guids.add(child.getProperty(GUID_PROPERTY_KEY, String.class));
}
return guids;
}

private void addInternalAttr(AtlasVertex productVertex, String internalAttr, List<String> currentGuids){
productVertex.removeProperty(internalAttr);
if (CollectionUtils.isNotEmpty(currentGuids)) {
currentGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(productVertex, internalAttr , guid));
}
}
}
Loading
Loading