Skip to content

Commit

Permalink
Accounted for errors in parsing search results (#664)
Browse files Browse the repository at this point in the history
* Stash

* comma

* undo some changes

* correction

* Changed name to a number

* Added test

* fixed tests

* satisft remote tests

* undo

* added dc prefix

* Used messages file

* Applied PR suggestions

* Variable name consistency

* updated messages

* Updated messages
  • Loading branch information
murdo-moj authored Aug 13, 2024
1 parent 9e008c5 commit 94def64
Show file tree
Hide file tree
Showing 7 changed files with 1,776 additions and 54 deletions.
1 change: 1 addition & 0 deletions home/service/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def _get_context(self, items_per_page: int) -> dict[str, Any]:
context = {
"form": self.form,
"results": self.results.page_results,
"malformed_result_urns": self.results.malformed_result_urns,
"highlighted_results": self.highlighted_results.page_results,
"h1_value": _("Search"),
"page_obj": self.paginator.get_page(self.page),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import defaultdict
from datetime import datetime, timezone
from importlib.resources import files
from typing import Any, Tuple

from data_platform_catalogue.entities import (
Expand All @@ -22,6 +23,15 @@
PROPERTIES_EMPTY_STRING_FIELDS = ("description", "externalUrl")


def get_graphql_query(graphql_query_file_name: str) -> str:
query_text = (
files("data_platform_catalogue.client.graphql")
.joinpath(f"{graphql_query_file_name}.graphql")
.read_text()
)
return query_text


def parse_owner(entity: dict[str, Any]) -> OwnerRef:
"""
Parse ownership information, if it is set.
Expand Down
87 changes: 36 additions & 51 deletions lib/datahub-client/data_platform_catalogue/client/search.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import json
import logging
from importlib.resources import files
from typing import Any, Sequence
from typing import Any, Sequence, Tuple

from datahub.configuration.common import GraphError # pylint: disable=E0611
from datahub.ingestion.graph.client import DataHubGraph # pylint: disable=E0611

from data_platform_catalogue.client.exceptions import CatalogueError
from data_platform_catalogue.client.graphql_helpers import (
get_graphql_query,
parse_created_and_modified,
parse_domain,
parse_glossary_terms,
Expand Down Expand Up @@ -35,31 +35,11 @@
class SearchClient:
def __init__(self, graph: DataHubGraph):
self.graph = graph
self.search_query = (
files("data_platform_catalogue.client.graphql")
.joinpath("search.graphql")
.read_text()
)
self.facets_query = (
files("data_platform_catalogue.client.graphql")
.joinpath("facets.graphql")
.read_text()
)
self.list_domains_query = (
files("data_platform_catalogue.client.graphql")
.joinpath("listDomains.graphql")
.read_text()
)
self.get_glossary_terms_query = (
files("data_platform_catalogue.client.graphql")
.joinpath("getGlossaryTerms.graphql")
.read_text()
)
self.get_tags_query = (
files("data_platform_catalogue.client.graphql")
.joinpath("getTags.graphql")
.read_text()
)
self.search_query = get_graphql_query("search")
self.facets_query = get_graphql_query("facets")
self.list_domains_query = get_graphql_query("listDomains")
self.get_glossary_terms_query = get_graphql_query("getGlossaryTerms")
self.get_tags_query = get_graphql_query("getTags")

def search(
self,
Expand Down Expand Up @@ -114,41 +94,46 @@ def search(

logger.debug(json.dumps(response, indent=2))

page_results = self._parse_search_results(response)
# Should these 2 variables be bound or unbound?
page_results, malformed_result_urns = self._parse_search_results(response)

return SearchResponse(
total_results=response["total"],
page_results=page_results,
malformed_result_urns=malformed_result_urns,
facets=self._parse_facets(response.get("facets", [])),
)

def _parse_search_results(self, response):
def _parse_search_results(self, response) -> Tuple[list, list]:
page_results = []
malformed_result_urns = []
for result in response["searchResults"]:
entity = result["entity"]
entity_type = entity["type"]
entity_urn = entity["urn"]
matched_fields = self._get_matched_fields(result=result)

if entity_type == "DATASET":
page_results.append(
self._parse_result(entity, matched_fields, ResultType.TABLE)
)
elif entity_type == "CHART":
page_results.append(
self._parse_result(entity, matched_fields, ResultType.CHART)
)
elif entity_type == "CONTAINER":
page_results.append(
self._parse_container(entity, matched_fields, ResultType.DATABASE)
)
elif entity_type == "DASHBOARD":
page_results.append(
self._parse_container(entity, matched_fields, ResultType.DASHBOARD)
)
else:
raise ValueError(f"Unexpected entity type: {entity_type}")

return page_results
try:
if entity_type == "DATASET":
parsed_result = self._parse_dataset(entity, matched_fields, ResultType.TABLE)
page_results.append(parsed_result)
elif entity_type == "CHART":
parsed_result = self._parse_dataset(entity, matched_fields, ResultType.CHART)
page_results.append(parsed_result)
elif entity_type == "CONTAINER":
parsed_result = self._parse_container(entity, matched_fields, ResultType.DATABASE)
page_results.append(parsed_result)
elif entity_type == "DASHBOARD":
parsed_result = self._parse_container(entity, matched_fields, ResultType.DASHBOARD)
page_results.append(parsed_result)
else:
raise Exception
except Exception:
logger.warn(f"Parsing for result {entity_urn} failed")
malformed_result_urns.append(entity_urn)


return page_results, malformed_result_urns

@staticmethod
def _get_matched_fields(result: dict) -> dict:
Expand Down Expand Up @@ -230,7 +215,7 @@ def _get_data_collection_page_results(self, response, key_for_results: str):
matched_fields: dict = {}
if entity_type == "DATASET":
page_results.append(
self._parse_result(entity, matched_fields, ResultType.TABLE)
self._parse_dataset(entity, matched_fields, ResultType.TABLE)
)
else:
raise ValueError(f"Unexpected entity type: {entity_type}")
Expand Down Expand Up @@ -276,7 +261,7 @@ def _parse_list_domains(
list_domain_options.append(DomainOption(urn, name, total))
return list_domain_options

def _parse_result(
def _parse_dataset(
self, entity: dict[str, Any], matches, result_type: ResultType
) -> SearchResult:
"""
Expand Down
1 change: 1 addition & 0 deletions lib/datahub-client/data_platform_catalogue/search_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,5 @@ def lookup_label(self, field_name, label) -> FacetOption | None:
class SearchResponse:
total_results: int
page_results: list[SearchResult]
malformed_result_urns: list[str] = field(default_factory=list)
facets: SearchFacets = field(default_factory=SearchFacets)
147 changes: 147 additions & 0 deletions lib/datahub-client/tests/client/datahub/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,153 @@ def test_dataset_result(mock_graph, searcher):
)
assert response == expected

def test_bad_entity_type(mock_graph, searcher):
datahub_response = {
"searchAcrossEntities": {
"start": 0,
"count": 1,
"total": 1,
"searchResults": [
{
"insights": [],
"matchedFields": [],
"entity": {
"type": "UNKNOWN",
"urn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,calm-pagoda-323403.jaffle_shop.customers,PROD)", # noqa E501
"platform": {"name": "bigquery"},
"container": None,
"ownership": None,
"name": "calm-pagoda-323403.jaffle_shop.customers",
},
}
],
}
}
mock_graph.execute_graphql = MagicMock(return_value=datahub_response)

response = searcher.search()
expected = expected = SearchResponse(
total_results=1,
page_results=[],
malformed_result_urns=["urn:li:dataset:(urn:li:dataPlatform:bigquery,calm-pagoda-323403.jaffle_shop.customers,PROD)"],
facets=SearchFacets(facets={}),
)
assert response == expected


def test_2_dataset_results_with_one_malformed_result(mock_graph, searcher):
datahub_response = {
"searchAcrossEntities": {
"start": 0,
"count": 1,
"total": 1,
"searchResults": [
{
"insights": [],
"matchedFields": [],
"entity": {
"type": "DATASET",
"urn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,calm-pagoda-323403.jaffle_shop.customers,PROD)", # noqa E501
"platform": {"name": "bigquery"},
"container": None,
"ownership": None,
"name": "pagoda",
"properties": {
"name": "customers",
"qualifiedName": "jaffle_shop.customers",
"customProperties": [
{"key": "StoredAsSubDirectories", "value": "False"},
{
"key": "CreatedByJob",
"value": "moj-reg-prod-hmpps-assess-risks-and-needs-prod-glue-job",
},
],
},
"domain": {
"domain": {
"urn": "urn:li:domain:3dc18e48-c062-4407-84a9-73e23f768023",
"id": "3dc18e48-c062-4407-84a9-73e23f768023",
"properties": {
"name": "HMPPS",
"description": "HMPPS is an executive agency that ...",
},
},
"editableProperties": None,
"tags": None,
"lastIngested": 1705990502353,
},
},
},
{
"insights": [],
"matchedFields": [],
"entity": {
"type": "DATASET",
"urn": "malformed", # noqa E501
"platform": {"name": "bigquery"},
"container": None,
"ownership": 1234,
"name": "john",
"properties": {
"name": "customers",
"qualifiedName": "jaffle_shop.customers",
"customProperties": [
{"key": "StoredAsSubDirectories", "value": "False"},
{
"key": "CreatedByJob",
"value": "moj-reg-prod-hmpps-assess-risks-and-needs-prod-glue-job",
},
],
}
},
}
],
}
}
mock_graph.execute_graphql = MagicMock(return_value=datahub_response)

response = searcher.search()
expected = SearchResponse(
total_results=1,
page_results=[
SearchResult(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,calm-pagoda-323403.jaffle_shop.customers,PROD)",
result_type=ResultType.TABLE,
name="customers",
display_name="customers",
fully_qualified_name="jaffle_shop.customers",
description="",
matches={},
metadata={
"owner": "",
"owner_email": "",
"total_parents": 0,
"domain_name": "HMPPS",
"domain_id": "urn:li:domain:3dc18e48-c062-4407-84a9-73e23f768023",
"entity_types": {
"entity_type": "Dataset",
"entity_sub_types": ["Dataset"],
},
"dpia_required": None,
"dpia_location": "",
"dc_where_to_access_dataset": "",
"source_dataset_name": "",
"s3_location": "",
"dc_access_requirements": "",
"refresh_period": "",
"last_updated": "",
"row_count": "",
},
tags=[],
last_modified=None,
created=None,
)
],
malformed_result_urns=["malformed"],
facets=SearchFacets(facets={}),
)
assert response == expected


def test_full_page(mock_graph, searcher):
datahub_response = {
Expand Down
Loading

0 comments on commit 94def64

Please sign in to comment.