Skip to content

Commit

Permalink
PLT-191: flag based search
Browse files Browse the repository at this point in the history
  • Loading branch information
n5nk committed Nov 14, 2023
1 parent 0769262 commit 7bbb93f
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@ public class SearchParams {
boolean suppressLogs;
boolean excludeMeanings;
boolean excludeClassifications;
boolean parallelSearch;

RequestMetadata requestMetadata = new RequestMetadata();

public boolean isParallelSearch() {
return this.parallelSearch;
}

public void setParallelSearch(boolean parallelSearch) {
this.parallelSearch = parallelSearch;
}
public String getQuery() {
return getQuery();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,11 @@ public AtlasSearchResult directIndexSearch(SearchParams searchParams) throws Atl
AtlasPerfMetrics.MetricRecorder elasticSearchQueryMetric = RequestContext.get().startMetricRecord("elasticSearchQuery");
DirectIndexQueryResult indexQueryResult = indexQuery.vertices(searchParams);
RequestContext.get().endMetricRecord(elasticSearchQueryMetric);
prepareSearchResult(ret, indexQueryResult, resultAttributes, true);
if(searchParams.isParallelSearch()){
prepareSearchResult2(ret, indexQueryResult, resultAttributes, true);
}else{
prepareSearchResult(ret, indexQueryResult, resultAttributes, true);
}

ret.setAggregations(indexQueryResult.getAggregationMap());
ret.setApproximateCount(indexQuery.vertexTotals());
Expand Down Expand Up @@ -1037,6 +1041,71 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro
}

private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
try {
if(LOG.isDebugEnabled()){
LOG.debug("Preparing search results for ({})", ret.getSearchParameters());
}
Iterator<Result> iterator = indexQueryResult.getIterator();
boolean showSearchScore = searchParams.getShowSearchScore();

while (iterator.hasNext()) {
Result result = iterator.next();
AtlasVertex vertex = result.getVertex();

if (vertex == null) {
LOG.warn("vertex in null");
continue;
}

AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if(RequestContext.get().includeClassifications()){
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}
if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse = new HashMap<>();

Set<String> collapseKeys = result.getCollapseKeys();
for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(ret.getSearchParameters());

Set<String> collapseResultAttributes = new HashSet<>();
if (searchParams.getCollapseAttributes() != null) {
collapseResultAttributes.addAll(searchParams.getCollapseAttributes());
} else {
collapseResultAttributes = resultAttributes;
}

if (searchParams.getCollapseRelationAttributes() != null) {
RequestContext.get().getRelationAttrsForSearch().clear();
RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes());
}

DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());
prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);

collapseRet.setSearchParameters(null);
collapse.put(collapseKey, collapseRet);
}
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
}

ret.addEntity(header);
}
} catch (Exception e) {
throw e;
}
scrubSearchResults(ret, searchParams.getSuppressLogs());
}

private void prepareSearchResult2(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
try {
if(LOG.isDebugEnabled()){
Expand All @@ -1063,7 +1132,7 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i
}

private AtlasEntityHeader enrichVertex(AtlasSearchResult ret, Set<String> resultAttributes, Result result, AtlasVertex vertex, boolean fetchCollapsedResults) throws AtlasBaseException {
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader2(vertex, resultAttributes);
if(RequestContext.get().includeClassifications()){
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex atlasVertex, Set<String
return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null;
}

public AtlasEntityHeader toAtlasEntityHeader2(AtlasVertex atlasVertex, Set<String> attributes) throws AtlasBaseException {
return atlasVertex != null ? mapVertexToAtlasEntityHeader2(atlasVertex, attributes) : null;
}

public AtlasEntityHeader toAtlasEntityHeaderWithClassifications(String guid) throws AtlasBaseException {
return toAtlasEntityHeaderWithClassifications(getEntityVertex(guid), Collections.emptySet());
}
Expand Down Expand Up @@ -944,6 +948,82 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex,
ret.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex)));
ret.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex)));

if(RequestContext.get().includeMeanings()) {
List<AtlasTermAssignmentHeader> termAssignmentHeaders = mapAssignedTerms(entityVertex);
ret.setMeanings(termAssignmentHeaders);
ret.setMeaningNames(
termAssignmentHeaders.stream().map(AtlasTermAssignmentHeader::getDisplayText)
.collect(Collectors.toList()));
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);

if (entityType != null) {
for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) {
Object attrValue = getVertexAttribute(entityVertex, headerAttribute);

if (attrValue != null) {
ret.setAttribute(headerAttribute.getName(), attrValue);
}
}

Object displayText = getDisplayText(entityVertex, entityType);

if (displayText != null) {
ret.setDisplayText(displayText.toString());
}

if (CollectionUtils.isNotEmpty(attributes)) {
for (String attrName : attributes) {
AtlasAttribute attribute = entityType.getAttribute(attrName);

if (attribute == null) {
attrName = toNonQualifiedName(attrName);

if (ret.hasAttribute(attrName)) {
continue;
}

attribute = entityType.getAttribute(attrName);

if (attribute == null) {
attribute = entityType.getRelationshipAttribute(attrName, null);
}
}

Object attrValue = getVertexAttribute(entityVertex, attribute);

if (attrValue != null) {
ret.setAttribute(attrName, attrValue);
}
}
}
}
RequestContext.get().endMetricRecord(metricRecorder);
return ret;
}

private AtlasEntityHeader mapVertexToAtlasEntityHeader2(AtlasVertex entityVertex, Set<String> attributes) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVertexToAtlasEntityHeader");
AtlasEntityHeader ret = new AtlasEntityHeader();

String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
String guid = entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
Boolean isIncomplete = isEntityIncomplete(entityVertex);

ret.setTypeName(typeName);
ret.setGuid(guid);
ret.setStatus(GraphHelper.getStatus(entityVertex));
if(RequestContext.get().includeClassifications()){
ret.setClassificationNames(getAllTraitNames(entityVertex));
}
ret.setIsIncomplete(isIncomplete);
ret.setLabels(getLabels(entityVertex));

ret.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex));
ret.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex));
ret.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex)));
ret.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex)));

if(RequestContext.get().includeMeanings()) {
List<AtlasTermAssignmentHeader> termAssignmentHeaders = mapAssignedTerms(entityVertex);
ret.setMeanings(termAssignmentHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public AtlasSearchResult searchWithParameters(SearchParameters parameters) throw
@Path("indexsearch")
@POST
@Timed
public AtlasSearchResult indexSearch(@Context HttpServletRequest servletRequest, IndexSearchParams parameters) throws AtlasBaseException {
public AtlasSearchResult indexSearch(@Context HttpServletRequest servletRequest, IndexSearchParams parameters,@QueryParam("parallelSearch") boolean parallelSearch) throws AtlasBaseException {
AtlasPerfTracer perf = null;
long startTime = System.currentTimeMillis();

Expand All @@ -405,6 +405,7 @@ public AtlasSearchResult indexSearch(@Context HttpServletRequest servletRequest,
if(LOG.isDebugEnabled()){
LOG.debug("Performing indexsearch for the params ({})", parameters);
}
parameters.setParallelSearch(parallelSearch);
AtlasSearchResult result = discoveryService.directIndexSearch(parameters);
long endTime = System.currentTimeMillis();

Expand Down

0 comments on commit 7bbb93f

Please sign in to comment.