Skip to content

Commit

Permalink
PLT-191: atlas search: method level
Browse files Browse the repository at this point in the history
  • Loading branch information
n5nk committed Nov 10, 2023
1 parent dbdb795 commit 48b00be
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 161 deletions.
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
- development
- master
- lineageondemand
- atlassearch

jobs:
build:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/trivy-docker-scan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
- development
- master
- lineageondemand
- atlassearch


jobs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.script.ScriptException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -105,6 +106,12 @@ public interface AtlasGraph<V, E> {
*/
Iterable<AtlasVertex<V, E>> getVertices();

/**
* Gets required vertices in the graph.
* @return
*/
List<AtlasVertex<V, E>> getVertices(Object... ids);

/**
* Gets the vertex with the specified id. As an optimization, a non-null vertex may be
* returned by some implementations if the Vertex does not exist. In that case,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ interface Result<V, E> {

DirectIndexQueryResult<V, E> getCollapseVertices(String key);

Long getId();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import org.elasticsearch.search.aggregations.Aggregation;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class DirectIndexQueryResult<V, E> {
Expand Down Expand Up @@ -33,4 +35,5 @@ public Integer getApproximateCount() {
public void setApproximateCount(Integer approximateCount) {
this.approximateCount = approximateCount;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ public AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> getVertex() {
return graph.getVertex(String.valueOf(vertexId));
}

@Override
public Long getId() {
return LongEncoding.decode(hit.getId());
}

@Override
public double getScore() {
return hit.getScore();
Expand Down Expand Up @@ -328,6 +333,11 @@ public DirectIndexQueryResult getCollapseVertices(String key) {
return null;
}

@Override
public Long getId() {
return LongEncoding.decode(String.valueOf(hit.get("_id")));
}

@Override
public double getScore() {
Object _score = hit.get("_score");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import jnr.ffi.Struct;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
Expand All @@ -29,19 +30,7 @@
import org.apache.atlas.groovy.GroovyExpression;
import org.apache.atlas.model.discovery.SearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasGraphTraversal;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQueryParameter;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.GraphIndexQueryParameters;
import org.apache.atlas.repository.graphdb.GremlinVersion;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery;
import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
import org.apache.atlas.type.AtlasType;
Expand All @@ -67,12 +56,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.JanusGraphIndexQuery;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.SchemaViolationException;
import org.janusgraph.core.*;
import org.janusgraph.core.schema.JanusGraphIndex;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.core.schema.Parameter;
Expand All @@ -86,12 +70,9 @@
import javax.script.ScriptException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.atlas.AtlasErrorCode.INDEX_SEARCH_FAILED;
import static org.apache.atlas.AtlasErrorCode.RELATIONSHIP_CREATE_INVALID_PARAMS;
Expand Down Expand Up @@ -236,14 +217,20 @@ public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> getEdges() {

return wrapEdges(edges);
}

@Override
public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> getVertices() {
Iterator<Vertex> vertices = getGraph().vertices();

return wrapVertices(vertices);
}

@Override
public List<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> getVertices(Object... ids) {
Iterator<Vertex> it = getGraph().vertices(ids);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), true)
.map(v -> GraphDbObjectFactory.createVertex(this, v)).collect(Collectors.toList());
}

@Override
public AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> addVertex() {
Vertex result = getGraph().addVertex();
Expand Down Expand Up @@ -411,7 +398,6 @@ public AtlasVertex<AtlasJanusVertex, AtlasJanusEdge> getVertex(String vertexId)

return GraphDbObjectFactory.createVertex(this, vertex);
}

@Override
public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> getVertices(String key, Object value) {
AtlasGraphQuery<AtlasJanusVertex, AtlasJanusEdge> query = query();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,10 @@ public Set<String> getCollapseKeys() {
public DirectIndexQueryResult<AtlasJanusVertex, AtlasJanusEdge> getCollapseVertices(String key) {
return null;
}

@Override
public Long getId() {
return source.getElement().longId();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@
import javax.script.ScriptException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.apache.atlas.AtlasErrorCode.*;
import static org.apache.atlas.SortOrder.ASCENDING;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.ASSET_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.OWNER_ATTRIBUTE;
import static org.apache.atlas.repository.Constants.VERTEX_INDEX_NAME;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.BASIC_SEARCH_STATE_FILTER;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TO_RANGE_LIST;

Expand Down Expand Up @@ -1042,63 +1042,66 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i
if(LOG.isDebugEnabled()){
LOG.debug("Preparing search results for ({})", ret.getSearchParameters());
}
Iterator<Result> iterator = indexQueryResult.getIterator();
boolean showSearchScore = searchParams.getShowSearchScore();

while (iterator.hasNext()) {
Result result = iterator.next();
AtlasVertex vertex = result.getVertex();

if (vertex == null) {
LOG.warn("vertex in null");
continue;
Stream<Result> resultStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(indexQueryResult.getIterator(), Spliterator.ORDERED), true);
Map<Long, Result> vertexIds = resultStream.collect(Collectors.toMap(Result::getId, result -> result));
List<AtlasVertex> resultVertices = graph.getVertices(vertexIds.keySet().toArray());
List<AtlasEntityHeader> headers = resultVertices.parallelStream().map(v -> {
try {
return enrichVertex(ret, resultAttributes, vertexIds.get(v.getId()), v, fetchCollapsedResults);
} catch (AtlasBaseException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
headers.forEach(ret::addEntity);
} catch (Exception e) {
throw e;
}

AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if(RequestContext.get().includeClassifications()){
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}
if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse = new HashMap<>();

Set<String> collapseKeys = result.getCollapseKeys();
for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(ret.getSearchParameters());

Set<String> collapseResultAttributes = new HashSet<>();
if (searchParams.getCollapseAttributes() != null) {
collapseResultAttributes.addAll(searchParams.getCollapseAttributes());
} else {
collapseResultAttributes = resultAttributes;
}
scrubSearchResults(ret, searchParams.getSuppressLogs());
}

if (searchParams.getCollapseRelationAttributes() != null) {
RequestContext.get().getRelationAttrsForSearch().clear();
RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes());
}
private AtlasEntityHeader enrichVertex(AtlasSearchResult ret, Set<String> resultAttributes, Result result, AtlasVertex vertex, boolean fetchCollapsedResults) throws AtlasBaseException {
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if(RequestContext.get().includeClassifications()){
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
SearchParams searchParams = ret.getSearchParameters();
if (searchParams.getShowSearchScore()) {
ret.addEntityScore(header.getGuid(), result.getScore());
}
if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse = new HashMap<>();

DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());
prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);
Set<String> collapseKeys = result.getCollapseKeys();
for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(ret.getSearchParameters());

collapseRet.setSearchParameters(null);
collapse.put(collapseKey, collapseRet);
}
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
Set<String> collapseResultAttributes = new HashSet<>();
if (searchParams.getCollapseAttributes() != null) {
collapseResultAttributes.addAll(searchParams.getCollapseAttributes());
} else {
collapseResultAttributes = resultAttributes;
}

if (searchParams.getCollapseRelationAttributes() != null) {
RequestContext.get().getRelationAttrsForSearch().clear();
RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes());
}

ret.addEntity(header);
DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());
prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);

collapseRet.setSearchParameters(null);
collapse.put(collapseKey, collapseRet);
}
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
} catch (Exception e) {
throw e;
}
scrubSearchResults(ret, searchParams.getSuppressLogs());
return header;
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down
Loading

0 comments on commit 48b00be

Please sign in to comment.