Skip to content

Commit

Permalink
fix(ingest/tableau): retry on auth error for special case
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Jan 3, 2025
1 parent 1190dd9 commit 6c33993
Showing 1 changed file with 48 additions and 13 deletions.
61 changes: 48 additions & 13 deletions metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import logging
import re
import time
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime
from collections import OrderedDict, defaultdict
from dataclasses import dataclass, field as dataclass_field
from datetime import datetime, timedelta, timezone
from functools import lru_cache
from typing import (
Any,
Expand Down Expand Up @@ -196,6 +196,11 @@
504, # Gateway Timeout
]

# From experience, this expiry time typically ranges from 50 minutes
# to 2 hours but might as well be configurable. We will allow upto
# 10 minutes of such expiry time
REGULAR_AUTH_EXPIRY_PERIOD = timedelta(minutes=10)

logger: logging.Logger = logging.getLogger(__name__)

# Replace / with |
Expand Down Expand Up @@ -654,6 +659,13 @@ class TableauSourceReport(StaleEntityRemovalSourceReport):
num_upstream_fine_grained_lineage_failed_parse_sql: int = 0
num_hidden_assets_skipped: int = 0
logged_in_user: List[UserInfo] = []
last_authenticated_at: Optional[datetime] = None

num_expected_tableau_metadata_queries: int = 0
num_actual_tableau_metadata_queries: int = 0
tableau_server_error_stats: Dict[str, int] = dataclass_field(
default_factory=(lambda: defaultdict(int))
)


def report_user_role(report: TableauSourceReport, server: Server) -> None:
Expand Down Expand Up @@ -724,6 +736,7 @@ def _authenticate(self, site_content_url: str) -> None:
try:
logger.info(f"Authenticated to Tableau site: '{site_content_url}'")
self.server = self.config.make_tableau_client(site_content_url)
self.report.last_authenticated_at = datetime.now(timezone.utc)
report_user_role(report=self.report, server=self.server)
# Note that we're not catching ConfigurationError, since we want that to throw.
except ValueError as e:
Expand Down Expand Up @@ -807,10 +820,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
site_source = TableauSiteSource(
config=self.config,
ctx=self.ctx,
site=site
if site
else SiteIdContentUrl(
site_id=self.server.site_id, site_content_url=self.config.site
site=(
site
if site
else SiteIdContentUrl(
site_id=self.server.site_id,
site_content_url=self.config.site,
)
),
report=self.report,
server=self.server,
Expand Down Expand Up @@ -925,6 +941,7 @@ def _re_authenticate(self) -> None:
# Sign-in again may not be enough because Tableau sometimes caches invalid sessions
# so we need to recreate the Tableau Server object
self.server = self.config.make_tableau_client(self.site_content_url)
self.report.last_authenticated_at = datetime.now(timezone.utc)

def _populate_usage_stat_registry(self) -> None:
if self.server is None:
Expand Down Expand Up @@ -1190,6 +1207,7 @@ def get_connection_object_page(
)
try:
assert self.server is not None
self.report.num_actual_tableau_metadata_queries += 1
query_data = query_metadata_cursor_based_pagination(
server=self.server,
main_query=query,
Expand All @@ -1199,25 +1217,40 @@ def get_connection_object_page(
qry_filter=query_filter,
)

except REAUTHENTICATE_ERRORS:
if not retry_on_auth_error:
except REAUTHENTICATE_ERRORS as e:
self.report.tableau_server_error_stats[str(type(e))] += 1
if not retry_on_auth_error or retries_remaining <= 0:
raise

# If ingestion has been running for over 2 hours, the Tableau
# temporary credentials will expire. If this happens, this exception
# will be thrown, and we need to re-authenticate and retry.
if self.report.last_authenticated_at and (
self.report.last_authenticated_at
< datetime.now(timezone.utc) - REGULAR_AUTH_EXPIRY_PERIOD
):
retry_on_auth_error = False
else:
# We have been getting some irregular auth errors like below well before the expected expiry time
# - within few seconds of initial authentication . We'll retry for such cases.
# <class 'tableauserverclient.server.endpoint.exceptions.NonXMLResponseError'>:
# b'{"timestamp":"xxx","status":401,"error":"Unauthorized","path":"/relationship-service-war/graphql"}'
retry_on_auth_error = True

self._re_authenticate()

return self.get_connection_object_page(
query=query,
connection_type=connection_type,
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retry_on_auth_error=retry_on_auth_error,
retries_remaining=retries_remaining - 1,
)

except InternalServerError as ise:
self.report.tableau_server_error_stats[str(InternalServerError)] += 1
# In some cases Tableau Server returns 504 error, which is a timeout error, so it worths to retry.
# Extended with other retryable errors.
if ise.code in RETRIABLE_ERROR_CODES:
Expand All @@ -1230,13 +1263,14 @@ def get_connection_object_page(
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retry_on_auth_error=True,
retries_remaining=retries_remaining - 1,
)
else:
raise ise

except OSError:
self.report.tableau_server_error_stats[str(OSError)] += 1
# In tableauseverclient 0.26 (which was yanked and released in 0.28 on 2023-10-04),
# the request logic was changed to use threads.
# https://github.com/tableau/server-client-python/commit/307d8a20a30f32c1ce615cca7c6a78b9b9bff081
Expand All @@ -1251,7 +1285,7 @@ def get_connection_object_page(
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retry_on_auth_error=True,
retries_remaining=retries_remaining - 1,
)

Expand Down Expand Up @@ -1339,7 +1373,7 @@ def get_connection_object_page(
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retry_on_auth_error=True,
retries_remaining=retries_remaining,
)
raise RuntimeError(f"Query {connection_type} error: {errors}")
Expand Down Expand Up @@ -1377,6 +1411,7 @@ def get_connection_objects(
while has_next_page:
filter_: str = make_filter(filter_page)

self.report.num_expected_tableau_metadata_queries += 1
(
connection_objects,
current_cursor,
Expand Down

0 comments on commit 6c33993

Please sign in to comment.