Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tableau): set ingestion stage report and perftimers #12234

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
90 changes: 64 additions & 26 deletions metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
import time
from collections import OrderedDict
from dataclasses import dataclass, field
from dataclasses import dataclass, field as dataclass_field
from datetime import datetime
from functools import lru_cache
from typing import (
Expand Down Expand Up @@ -650,17 +650,31 @@
num_csql_field_skipped_no_name: int = 0
num_table_field_skipped_no_name: int = 0
# timers
extract_usage_stats_timer: Dict[str, float] = field(default_factory=TopKDict)
fetch_groups_timer: Dict[str, float] = field(default_factory=TopKDict)
populate_database_server_hostname_map_timer: Dict[str, float] = field(default_factory=TopKDict)
populate_projects_registry_timer: Dict[str, float] = field(default_factory=TopKDict)
emit_workbooks_timer: Dict[str, float] = field(default_factory=TopKDict)
emit_sheets_timer: Dict[str, float] = field(default_factory=TopKDict)
emit_dashboards_timer: Dict[str, float] = field(default_factory=TopKDict)
emit_embedded_datasources_timer: Dict[str, float] = field(default_factory=TopKDict)
emit_published_datasources_timer: Dict[str, float] = field(default_factory=TopKDict)
emit_custom_sql_datasources_timer: Dict[str, float] = field(default_factory=TopKDict)
emit_upstream_tables_timer: Dict[str, float] = field(default_factory=TopKDict)
extract_usage_stats_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
fetch_groups_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict)
populate_database_server_hostname_map_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
populate_projects_registry_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
emit_workbooks_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict)
emit_sheets_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict)
emit_dashboards_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict)
emit_embedded_datasources_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
emit_published_datasources_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
emit_custom_sql_datasources_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
emit_upstream_tables_timer: Dict[str, float] = dataclass_field(
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
default_factory=TopKDict
)
# lineage
num_tables_with_upstream_lineage: int = 0
num_upstream_table_lineage: int = 0
Expand All @@ -671,7 +685,7 @@
num_upstream_table_lineage_failed_parse_sql: int = 0
num_upstream_fine_grained_lineage_failed_parse_sql: int = 0
num_hidden_assets_skipped: int = 0
logged_in_user: List[UserInfo] = field(default_factory=list)
logged_in_user: List[UserInfo] = dataclass_field(default_factory=list)


def report_user_role(report: TableauSourceReport, server: Server) -> None:
Expand Down Expand Up @@ -837,7 +851,7 @@
yield from site_source.ingest_tableau_site()

self.report.report_ingestion_stage_start("End")

except MetadataQueryException as md_exception:
self.report.failure(
title="Failed to Retrieve Tableau Metadata",
Expand Down Expand Up @@ -3479,64 +3493,88 @@
return {"permissions": json.dumps(groups)} if len(groups) > 0 else None

def ingest_tableau_site(self):
self.report.report_ingestion_stage_start(f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}")
self.report.report_ingestion_stage_start(
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}"
)

# Initialise the dictionary to later look-up for chart and dashboard stat
if self.config.extract_usage_stats:
with PerfTimer() as timer:
self._populate_usage_stat_registry()
self.report.extract_usage_stats_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.extract_usage_stats_timer[self.site_id] = round(
timer.elapsed_seconds(), 2
)

if self.config.permission_ingestion:
with PerfTimer() as timer:
self._fetch_groups()
self.report.fetch_groups_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.fetch_groups_timer[self.site_id] = round(
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
timer.elapsed_seconds(), 2
)

# Populate the map of database names and database hostnames to be used later to map
# databases to platform instances.
if self.config.database_hostname_to_platform_instance_map:
with PerfTimer() as timer:
self._populate_database_server_hostname_map()
self.report.populate_database_server_hostname_map_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.populate_database_server_hostname_map_timer[

Check warning on line 3520 in metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py#L3518-L3520

Added lines #L3518 - L3520 were not covered by tests
self.site_id
] = round(timer.elapsed_seconds(), 2)

with PerfTimer() as timer:
self._populate_projects_registry()
self.report.populate_projects_registry_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.populate_projects_registry_timer[self.site_id] = round(
timer.elapsed_seconds(), 2
)

if self.config.add_site_container:
yield from self.emit_site_container()
yield from self.emit_project_containers()

with PerfTimer() as timer:
yield from self.emit_workbooks()
self.report.emit_workbooks_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.emit_workbooks_timer[self.site_id] = round(
timer.elapsed_seconds(), 2
)

if self.sheet_ids:
with PerfTimer() as timer:
yield from self.emit_sheets()
self.report.emit_sheets_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.emit_sheets_timer[self.site_id] = round(
timer.elapsed_seconds(), 2
)

if self.dashboard_ids:
with PerfTimer() as timer:
yield from self.emit_dashboards()
self.report.emit_dashboards_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.emit_dashboards_timer[self.site_id] = round(
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
timer.elapsed_seconds(), 2
)

if self.embedded_datasource_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_embedded_datasources()
self.report.emit_embedded_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.emit_embedded_datasources_timer[self.site_id] = round(
timer.elapsed_seconds(), 2
)

if self.datasource_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_published_datasources()
self.report.emit_published_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.emit_published_datasources_timer[self.site_id] = round(
timer.elapsed_seconds(), 2
)

if self.custom_sql_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_custom_sql_datasources()
self.report.emit_custom_sql_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.emit_custom_sql_datasources_timer[self.site_id] = round(
timer.elapsed_seconds(), 2
)

if self.database_tables:
with PerfTimer() as timer:
yield from self.emit_upstream_tables()
self.report.emit_upstream_tables_timer[self.site_id] = round(timer.elapsed_seconds(), 2)
self.report.emit_upstream_tables_timer[self.site_id] = round(
timer.elapsed_seconds(), 2
)
Loading