Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add possibility to render Topic FQN from es search #16903

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions ingestion/src/metadata/utils/fqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,16 +290,33 @@ def _(_: Optional[OpenMetadata], *, table_fqn: str) -> str:

@fqn_build_registry.add(Topic)
def _(
_: Optional[OpenMetadata], # ES Index not necessary for Topic FQN building
metadata: Optional[OpenMetadata],
*,
service_name: str,
topic_name: str,
) -> str:
if not service_name or not topic_name:
skip_es_search: bool = True,
) -> Optional[str]:
entity: Optional[Topic] = None

if not skip_es_search:
entity = search_topic_from_es(
metadata=metadata, service_name=service_name, topic_name=topic_name
)

# if entity not found in ES proceed to build FQN with database_name and schema_name
if not entity and service_name and topic_name:
fqn = _build(service_name, topic_name)
return fqn

if entity:
return str(entity.fullyQualifiedName.root)

if not all([service_name, topic_name]):
raise FQNBuildingException(
f"Args should be informed, but got service=`{service_name}`, topic=`{topic_name}``"
)
return _build(service_name, topic_name)

return None


@fqn_build_registry.add(Container)
Expand Down Expand Up @@ -681,6 +698,34 @@ def search_database_from_es(
)


def search_topic_from_es(
metadata: OpenMetadata,
topic_name: str,
service_name: Optional[str],
fields: Optional[str] = None,
):
"""
Search Topic entity from ES
"""

if not topic_name:
raise FQNBuildingException(
f"Topic Name should be informed, but got topic=`{topic_name}`"
)

fqn_search_string = _build(service_name or "*", topic_name)

es_result = metadata.es_search_from_fqn(
entity_type=Topic,
fqn_search_string=fqn_search_string,
fields=fields,
)

return get_entity_from_es_result(
entity_list=es_result, fetch_multiple_entities=False
)


def get_query_checksum(query: str) -> str:
"""
Prepare the query checksum from its string representation.
Expand Down
Loading