Skip to content

Commit

Permalink
🎉 Init
Browse files Browse the repository at this point in the history
Signed-off-by: mgorsk1 <[email protected]>
  • Loading branch information
mgorsk1 committed Jul 3, 2024
1 parent b65e084 commit 01b5d7d
Showing 1 changed file with 51 additions and 43 deletions.
94 changes: 51 additions & 43 deletions ingestion/src/metadata/utils/fqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ class SplitTestCaseFqn(BaseModel):
database: str
schema_: str = Field(alias="schema")
table: str
column: Optional[str] = None
test_case: Optional[str] = None
column: Optional[str]
test_case: Optional[str]


def split(str_: str) -> List[str]:
Expand Down Expand Up @@ -290,16 +290,33 @@ def _(_: Optional[OpenMetadata], *, table_fqn: str) -> str:

@fqn_build_registry.add(Topic)
def _(
_: Optional[OpenMetadata], # ES Index not necessary for Topic FQN building
metadata: Optional[OpenMetadata],
*,
service_name: str,
topic_name: str,
) -> str:
if not service_name or not topic_name:
skip_es_search: bool = True,
) -> Optional[str]:
entity: Optional[Topic] = None

if not skip_es_search:
entity = search_topic_from_es(
metadata=metadata, service_name=service_name, topic_name=topic_name
)

# if entity not found in ES proceed to build FQN with database_name and schema_name
if not entity and service_name and topic_name:
fqn = _build(service_name, topic_name)
return fqn

if entity:
return str(entity.fullyQualifiedName.root)

if not all([service_name, topic_name]):
raise FQNBuildingException(
f"Args should be informed, but got service=`{service_name}`, topic=`{topic_name}``"
)
return _build(service_name, topic_name)

return None


@fqn_build_registry.add(Container)
Expand Down Expand Up @@ -591,43 +608,6 @@ def build_es_fqn_search_string(
return fqn_search_string


def search_database_schema_from_es(
metadata: OpenMetadata,
database_name: str,
schema_name: str,
service_name: str,
fetch_multiple_entities: bool = False,
fields: Optional[str] = None,
):
"""
Find database schema entity in elasticsearch index.
:param metadata: OM Client
:param database_name: name of database in which we are searching for database schema
:param schema_name: name of schema we are searching for
:param service_name: name of service in which we are searching for database schema
:param fetch_multiple_entities: should single match be returned or all matches
:param fields: additional fields to return
:return: entity / entities matching search criteria
"""
if not schema_name:
raise FQNBuildingException(
f"Schema Name should be informed, but got schema_name=`{schema_name}`"
)

fqn_search_string = _build(service_name or "*", database_name or "*", schema_name)

es_result = metadata.es_search_from_fqn(
entity_type=DatabaseSchema,
fqn_search_string=fqn_search_string,
fields=fields,
)

return get_entity_from_es_result(
entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities
)


def search_table_from_es(
metadata: OpenMetadata,
database_name: str,
Expand Down Expand Up @@ -681,6 +661,34 @@ def search_database_from_es(
)


def search_topic_from_es(
metadata: OpenMetadata,
topic_name: str,
service_name: Optional[str],
fields: Optional[str] = None,
):
"""
Search Topic entity from ES
"""

if not topic_name:
raise FQNBuildingException(
f"Topic Name should be informed, but got topic=`{topic_name}`"
)

fqn_search_string = _build(service_name or "*", topic_name)

es_result = metadata.es_search_from_fqn(
entity_type=Topic,
fqn_search_string=fqn_search_string,
fields=fields,
)

return get_entity_from_es_result(
entity_list=es_result, fetch_multiple_entities=False
)


def get_query_checksum(query: str) -> str:
"""
Prepare the query checksum from its string representation.
Expand Down

0 comments on commit 01b5d7d

Please sign in to comment.