diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 008216fea89508..4467c161bf7f93 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -2,9 +2,9 @@ import logging import re import time -from collections import OrderedDict -from dataclasses import dataclass -from datetime import datetime +from collections import OrderedDict, defaultdict +from dataclasses import dataclass, field as dataclass_field +from datetime import datetime, timedelta, timezone from functools import lru_cache from typing import ( Any, @@ -196,6 +196,11 @@ 504, # Gateway Timeout ] +# From experience, this expiry time typically ranges from 50 minutes +# to 2 hours but might as well be configurable. We will allow upto +# 10 minutes of such expiry time +REGULAR_AUTH_EXPIRY_PERIOD = timedelta(minutes=10) + logger: logging.Logger = logging.getLogger(__name__) # Replace / with | @@ -654,6 +659,13 @@ class TableauSourceReport(StaleEntityRemovalSourceReport): num_upstream_fine_grained_lineage_failed_parse_sql: int = 0 num_hidden_assets_skipped: int = 0 logged_in_user: List[UserInfo] = [] + last_authenticated_at: Optional[datetime] = None + + num_expected_tableau_metadata_queries: int = 0 + num_actual_tableau_metadata_queries: int = 0 + tableau_server_error_stats: Dict[str, int] = dataclass_field( + default_factory=(lambda: defaultdict(int)) + ) def report_user_role(report: TableauSourceReport, server: Server) -> None: @@ -724,6 +736,7 @@ def _authenticate(self, site_content_url: str) -> None: try: logger.info(f"Authenticated to Tableau site: '{site_content_url}'") self.server = self.config.make_tableau_client(site_content_url) + self.report.last_authenticated_at = datetime.now(timezone.utc) report_user_role(report=self.report, server=self.server) # Note that we're not catching ConfigurationError, since we want that to throw. except ValueError as e: @@ -807,10 +820,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: site_source = TableauSiteSource( config=self.config, ctx=self.ctx, - site=site - if site - else SiteIdContentUrl( - site_id=self.server.site_id, site_content_url=self.config.site + site=( + site + if site + else SiteIdContentUrl( + site_id=self.server.site_id, + site_content_url=self.config.site, + ) ), report=self.report, server=self.server, @@ -925,6 +941,7 @@ def _re_authenticate(self) -> None: # Sign-in again may not be enough because Tableau sometimes caches invalid sessions # so we need to recreate the Tableau Server object self.server = self.config.make_tableau_client(self.site_content_url) + self.report.last_authenticated_at = datetime.now(timezone.utc) def _populate_usage_stat_registry(self) -> None: if self.server is None: @@ -1190,6 +1207,7 @@ def get_connection_object_page( ) try: assert self.server is not None + self.report.num_actual_tableau_metadata_queries += 1 query_data = query_metadata_cursor_based_pagination( server=self.server, main_query=query, @@ -1199,25 +1217,40 @@ def get_connection_object_page( qry_filter=query_filter, ) - except REAUTHENTICATE_ERRORS: - if not retry_on_auth_error: + except REAUTHENTICATE_ERRORS as e: + self.report.tableau_server_error_stats[str(type(e))] += 1 + if not retry_on_auth_error or retries_remaining <= 0: raise # If ingestion has been running for over 2 hours, the Tableau # temporary credentials will expire. If this happens, this exception # will be thrown, and we need to re-authenticate and retry. + if self.report.last_authenticated_at and ( + self.report.last_authenticated_at + < datetime.now(timezone.utc) - REGULAR_AUTH_EXPIRY_PERIOD + ): + retry_on_auth_error = False + else: + # We have been getting some irregular auth errors like below well before the expected expiry time + # - within few seconds of initial authentication . We'll retry for such cases. + # : + # b'{"timestamp":"xxx","status":401,"error":"Unauthorized","path":"/relationship-service-war/graphql"}' + retry_on_auth_error = True + self._re_authenticate() + return self.get_connection_object_page( query=query, connection_type=connection_type, query_filter=query_filter, fetch_size=fetch_size, current_cursor=current_cursor, - retry_on_auth_error=False, + retry_on_auth_error=retry_on_auth_error, retries_remaining=retries_remaining - 1, ) except InternalServerError as ise: + self.report.tableau_server_error_stats[str(InternalServerError)] += 1 # In some cases Tableau Server returns 504 error, which is a timeout error, so it worths to retry. # Extended with other retryable errors. if ise.code in RETRIABLE_ERROR_CODES: @@ -1230,13 +1263,14 @@ def get_connection_object_page( query_filter=query_filter, fetch_size=fetch_size, current_cursor=current_cursor, - retry_on_auth_error=False, + retry_on_auth_error=True, retries_remaining=retries_remaining - 1, ) else: raise ise except OSError: + self.report.tableau_server_error_stats[str(OSError)] += 1 # In tableauseverclient 0.26 (which was yanked and released in 0.28 on 2023-10-04), # the request logic was changed to use threads. # https://github.com/tableau/server-client-python/commit/307d8a20a30f32c1ce615cca7c6a78b9b9bff081 @@ -1251,7 +1285,7 @@ def get_connection_object_page( query_filter=query_filter, fetch_size=fetch_size, current_cursor=current_cursor, - retry_on_auth_error=False, + retry_on_auth_error=True, retries_remaining=retries_remaining - 1, ) @@ -1339,7 +1373,7 @@ def get_connection_object_page( query_filter=query_filter, fetch_size=fetch_size, current_cursor=current_cursor, - retry_on_auth_error=False, + retry_on_auth_error=True, retries_remaining=retries_remaining, ) raise RuntimeError(f"Query {connection_type} error: {errors}") @@ -1377,6 +1411,7 @@ def get_connection_objects( while has_next_page: filter_: str = make_filter(filter_page) + self.report.num_expected_tableau_metadata_queries += 1 ( connection_objects, current_cursor,