From 1f3688a1ed83a5a947f1ed304b21191d7822a33d Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Mon, 2 Sep 2024 19:09:45 +0530 Subject: [PATCH] feat(ingest/databricks): include metadata for browse only tables (#10766) Co-authored-by: Harshal Sheth --- metadata-ingestion/setup.py | 1 + .../source/unity/hive_metastore_proxy.py | 4 ++-- .../src/datahub/ingestion/source/unity/proxy.py | 16 +++++++++++----- .../unity/test_unity_catalog_ingest.py | 5 ++++- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 22ff8025aa0a06..cbe3a6c250c1e7 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -301,6 +301,7 @@ databricks = { # 0.1.11 appears to have authentication issues with azure databricks + # 0.22.0 has support for `include_browse` in metadata list apis "databricks-sdk>=0.30.0", "pyspark~=3.3.0", "requests", diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py index c99fe3b09c5bb5..eea10d940bd1c8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py @@ -135,8 +135,8 @@ def get_table_names(self, schema_name: str) -> List[str]: def get_view_names(self, schema_name: str) -> List[str]: try: rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`") - # 3 columns - database, tableName, isTemporary - return [row.tableName for row in rows] + # 4 columns - namespace, viewName, isTemporary, isMaterialized + return [row.viewName for row in rows] except Exception as e: self.report.report_warning("Failed to get views for schema", schema_name) logger.warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 112acd8101297f..bd987c2da7c764 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -109,7 +109,7 @@ def __init__( self.hive_metastore_proxy = hive_metastore_proxy def check_basic_connectivity(self) -> bool: - return bool(self._workspace_client.catalogs.list()) + return bool(self._workspace_client.catalogs.list(include_browse=True)) def assigned_metastore(self) -> Optional[Metastore]: response = self._workspace_client.metastores.summary() @@ -119,7 +119,7 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: if self.hive_metastore_proxy: yield self.hive_metastore_proxy.hive_metastore_catalog(metastore) - response = self._workspace_client.catalogs.list() + response = self._workspace_client.catalogs.list(include_browse=True) if not response: logger.info("Catalogs not found") return @@ -131,7 +131,9 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: def catalog( self, catalog_name: str, metastore: Optional[Metastore] ) -> Optional[Catalog]: - response = self._workspace_client.catalogs.get(catalog_name) + response = self._workspace_client.catalogs.get( + catalog_name, include_browse=True + ) if not response: logger.info(f"Catalog {catalog_name} not found") return None @@ -148,7 +150,9 @@ def schemas(self, catalog: Catalog) -> Iterable[Schema]: ): yield from self.hive_metastore_proxy.hive_metastore_schemas(catalog) return - response = self._workspace_client.schemas.list(catalog_name=catalog.name) + response = self._workspace_client.schemas.list( + catalog_name=catalog.name, include_browse=True + ) if not response: logger.info(f"Schemas not found for catalog {catalog.id}") return @@ -166,7 +170,9 @@ def tables(self, schema: Schema) -> Iterable[Table]: return with patch("databricks.sdk.service.catalog.TableInfo", TableInfoWithGeneration): response = self._workspace_client.tables.list( - catalog_name=schema.catalog.name, schema_name=schema.name + catalog_name=schema.catalog.name, + schema_name=schema.name, + include_browse=True, ) if not response: logger.info(f"Tables not found for schema {schema.id}") diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index 22a48efdec41d7..c078f1b77fd1be 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -276,6 +276,9 @@ def register_mock_data(workspace_client): TableEntry = namedtuple("TableEntry", ["database", "tableName", "isTemporary"]) +ViewEntry = namedtuple( + "ViewEntry", ["namespace", "viewName", "isTemporary", "isMaterialized"] +) def mock_hive_sql(query): @@ -418,7 +421,7 @@ def mock_hive_sql(query): TableEntry("bronze_kambi", "view1", False), ] elif query == "SHOW VIEWS FROM `bronze_kambi`": - return [TableEntry("bronze_kambi", "view1", False)] + return [ViewEntry("bronze_kambi", "view1", False, False)] return []