Skip to content

Commit f8ca98b

Browse files
mayurinehatellance
authored andcommitted
fix(ingest/tableau): retry on auth error for special case (datahub-project#12264)
1 parent e150251 commit f8ca98b

File tree

2 files changed

+112
-20
lines changed

2 files changed

+112
-20
lines changed

metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py

+50-18
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
import logging
33
import re
44
import time
5-
from collections import OrderedDict
6-
from dataclasses import dataclass
7-
from datetime import datetime
5+
from collections import OrderedDict, defaultdict
6+
from dataclasses import dataclass, field as dataclass_field
7+
from datetime import datetime, timedelta, timezone
88
from functools import lru_cache
99
from typing import (
1010
Any,
@@ -196,6 +196,11 @@
196196
504, # Gateway Timeout
197197
]
198198

199+
# From experience, this expiry time typically ranges from 50 minutes
200+
# to 2 hours but might as well be configurable. We will allow upto
201+
# 10 minutes of such expiry time
202+
REGULAR_AUTH_EXPIRY_PERIOD = timedelta(minutes=10)
203+
199204
logger: logging.Logger = logging.getLogger(__name__)
200205

201206
# Replace / with |
@@ -637,6 +642,7 @@ class SiteIdContentUrl:
637642
site_content_url: str
638643

639644

645+
@dataclass
640646
class TableauSourceReport(StaleEntityRemovalSourceReport):
641647
get_all_datasources_query_failed: bool = False
642648
num_get_datasource_query_failures: int = 0
@@ -653,7 +659,14 @@ class TableauSourceReport(StaleEntityRemovalSourceReport):
653659
num_upstream_table_lineage_failed_parse_sql: int = 0
654660
num_upstream_fine_grained_lineage_failed_parse_sql: int = 0
655661
num_hidden_assets_skipped: int = 0
656-
logged_in_user: List[UserInfo] = []
662+
logged_in_user: List[UserInfo] = dataclass_field(default_factory=list)
663+
last_authenticated_at: Optional[datetime] = None
664+
665+
num_expected_tableau_metadata_queries: int = 0
666+
num_actual_tableau_metadata_queries: int = 0
667+
tableau_server_error_stats: Dict[str, int] = dataclass_field(
668+
default_factory=(lambda: defaultdict(int))
669+
)
657670

658671

659672
def report_user_role(report: TableauSourceReport, server: Server) -> None:
@@ -724,6 +737,7 @@ def _authenticate(self, site_content_url: str) -> None:
724737
try:
725738
logger.info(f"Authenticated to Tableau site: '{site_content_url}'")
726739
self.server = self.config.make_tableau_client(site_content_url)
740+
self.report.last_authenticated_at = datetime.now(timezone.utc)
727741
report_user_role(report=self.report, server=self.server)
728742
# Note that we're not catching ConfigurationError, since we want that to throw.
729743
except ValueError as e:
@@ -807,10 +821,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
807821
site_source = TableauSiteSource(
808822
config=self.config,
809823
ctx=self.ctx,
810-
site=site
811-
if site
812-
else SiteIdContentUrl(
813-
site_id=self.server.site_id, site_content_url=self.config.site
824+
site=(
825+
site
826+
if site
827+
else SiteIdContentUrl(
828+
site_id=self.server.site_id,
829+
site_content_url=self.config.site,
830+
)
814831
),
815832
report=self.report,
816833
server=self.server,
@@ -925,6 +942,7 @@ def _re_authenticate(self) -> None:
925942
# Sign-in again may not be enough because Tableau sometimes caches invalid sessions
926943
# so we need to recreate the Tableau Server object
927944
self.server = self.config.make_tableau_client(self.site_content_url)
945+
self.report.last_authenticated_at = datetime.now(timezone.utc)
928946

929947
def _populate_usage_stat_registry(self) -> None:
930948
if self.server is None:
@@ -1190,6 +1208,7 @@ def get_connection_object_page(
11901208
)
11911209
try:
11921210
assert self.server is not None
1211+
self.report.num_actual_tableau_metadata_queries += 1
11931212
query_data = query_metadata_cursor_based_pagination(
11941213
server=self.server,
11951214
main_query=query,
@@ -1199,25 +1218,36 @@ def get_connection_object_page(
11991218
qry_filter=query_filter,
12001219
)
12011220

1202-
except REAUTHENTICATE_ERRORS:
1203-
if not retry_on_auth_error:
1221+
except REAUTHENTICATE_ERRORS as e:
1222+
self.report.tableau_server_error_stats[e.__class__.__name__] += 1
1223+
if not retry_on_auth_error or retries_remaining <= 0:
12041224
raise
12051225

1206-
# If ingestion has been running for over 2 hours, the Tableau
1207-
# temporary credentials will expire. If this happens, this exception
1208-
# will be thrown, and we need to re-authenticate and retry.
1209-
self._re_authenticate()
1226+
# We have been getting some irregular authorization errors like below well before the expected expiry time
1227+
# - within few seconds of initial authentication . We'll retry without re-auth for such cases.
1228+
# <class 'tableauserverclient.server.endpoint.exceptions.NonXMLResponseError'>:
1229+
# b'{"timestamp":"xxx","status":401,"error":"Unauthorized","path":"/relationship-service-war/graphql"}'
1230+
if self.report.last_authenticated_at and (
1231+
datetime.now(timezone.utc) - self.report.last_authenticated_at
1232+
> REGULAR_AUTH_EXPIRY_PERIOD
1233+
):
1234+
# If ingestion has been running for over 2 hours, the Tableau
1235+
# temporary credentials will expire. If this happens, this exception
1236+
# will be thrown, and we need to re-authenticate and retry.
1237+
self._re_authenticate()
1238+
12101239
return self.get_connection_object_page(
12111240
query=query,
12121241
connection_type=connection_type,
12131242
query_filter=query_filter,
12141243
fetch_size=fetch_size,
12151244
current_cursor=current_cursor,
1216-
retry_on_auth_error=False,
1245+
retry_on_auth_error=True,
12171246
retries_remaining=retries_remaining - 1,
12181247
)
12191248

12201249
except InternalServerError as ise:
1250+
self.report.tableau_server_error_stats[InternalServerError.__name__] += 1
12211251
# In some cases Tableau Server returns 504 error, which is a timeout error, so it worths to retry.
12221252
# Extended with other retryable errors.
12231253
if ise.code in RETRIABLE_ERROR_CODES:
@@ -1230,13 +1260,14 @@ def get_connection_object_page(
12301260
query_filter=query_filter,
12311261
fetch_size=fetch_size,
12321262
current_cursor=current_cursor,
1233-
retry_on_auth_error=False,
1263+
retry_on_auth_error=True,
12341264
retries_remaining=retries_remaining - 1,
12351265
)
12361266
else:
12371267
raise ise
12381268

12391269
except OSError:
1270+
self.report.tableau_server_error_stats[OSError.__name__] += 1
12401271
# In tableauseverclient 0.26 (which was yanked and released in 0.28 on 2023-10-04),
12411272
# the request logic was changed to use threads.
12421273
# https://github.com/tableau/server-client-python/commit/307d8a20a30f32c1ce615cca7c6a78b9b9bff081
@@ -1251,7 +1282,7 @@ def get_connection_object_page(
12511282
query_filter=query_filter,
12521283
fetch_size=fetch_size,
12531284
current_cursor=current_cursor,
1254-
retry_on_auth_error=False,
1285+
retry_on_auth_error=True,
12551286
retries_remaining=retries_remaining - 1,
12561287
)
12571288

@@ -1339,7 +1370,7 @@ def get_connection_object_page(
13391370
query_filter=query_filter,
13401371
fetch_size=fetch_size,
13411372
current_cursor=current_cursor,
1342-
retry_on_auth_error=False,
1373+
retry_on_auth_error=True,
13431374
retries_remaining=retries_remaining,
13441375
)
13451376
raise RuntimeError(f"Query {connection_type} error: {errors}")
@@ -1377,6 +1408,7 @@ def get_connection_objects(
13771408
while has_next_page:
13781409
filter_: str = make_filter(filter_page)
13791410

1411+
self.report.num_expected_tableau_metadata_queries += 1
13801412
(
13811413
connection_objects,
13821414
current_cursor,

metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py

+62-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import json
22
import pathlib
3-
from typing import Any, Dict, List, cast
3+
from typing import Any, Dict, List, Union, cast
44
from unittest import mock
55

66
import pytest
@@ -13,10 +13,15 @@
1313
GroupItem,
1414
ProjectItem,
1515
SiteItem,
16+
UserItem,
1617
ViewItem,
1718
WorkbookItem,
1819
)
1920
from tableauserverclient.models.reference_item import ResourceReference
21+
from tableauserverclient.server.endpoint.exceptions import (
22+
NonXMLResponseError,
23+
TableauError,
24+
)
2025

2126
from datahub.emitter.mce_builder import DEFAULT_ENV, make_schema_field_urn
2227
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@@ -270,7 +275,7 @@ def side_effect_site_get_by_id(id, *arg, **kwargs):
270275

271276

272277
def mock_sdk_client(
273-
side_effect_query_metadata_response: List[dict],
278+
side_effect_query_metadata_response: List[Union[dict, TableauError]],
274279
datasources_side_effect: List[dict],
275280
sign_out_side_effect: List[dict],
276281
) -> mock.MagicMock:
@@ -1312,6 +1317,61 @@ def test_permission_warning(pytestconfig, tmp_path, mock_datahub_graph):
13121317
)
13131318

13141319

1320+
@freeze_time(FROZEN_TIME)
1321+
@pytest.mark.integration
1322+
def test_retry_on_error(pytestconfig, tmp_path, mock_datahub_graph):
1323+
with mock.patch(
1324+
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
1325+
mock_datahub_graph,
1326+
) as mock_checkpoint:
1327+
mock_checkpoint.return_value = mock_datahub_graph
1328+
1329+
with mock.patch("datahub.ingestion.source.tableau.tableau.Server") as mock_sdk:
1330+
mock_client = mock_sdk_client(
1331+
side_effect_query_metadata_response=[
1332+
NonXMLResponseError(
1333+
"""{"timestamp":"xxx","status":401,"error":"Unauthorized","path":"/relationship-service-war/graphql"}"""
1334+
),
1335+
*mock_data(),
1336+
],
1337+
sign_out_side_effect=[{}],
1338+
datasources_side_effect=[{}],
1339+
)
1340+
mock_client.users = mock.Mock()
1341+
mock_client.users.get_by_id.side_effect = [
1342+
UserItem(
1343+
name="name", site_role=UserItem.Roles.SiteAdministratorExplorer
1344+
)
1345+
]
1346+
mock_sdk.return_value = mock_client
1347+
1348+
reporter = TableauSourceReport()
1349+
tableau_source = TableauSiteSource(
1350+
platform="tableau",
1351+
config=mock.MagicMock(),
1352+
ctx=mock.MagicMock(),
1353+
site=mock.MagicMock(spec=SiteItem, id="Site1", content_url="site1"),
1354+
server=mock_sdk.return_value,
1355+
report=reporter,
1356+
)
1357+
1358+
tableau_source.get_connection_object_page(
1359+
query=mock.MagicMock(),
1360+
connection_type=mock.MagicMock(),
1361+
query_filter=mock.MagicMock(),
1362+
current_cursor=None,
1363+
retries_remaining=1,
1364+
fetch_size=10,
1365+
)
1366+
1367+
assert reporter.num_actual_tableau_metadata_queries == 2
1368+
assert reporter.tableau_server_error_stats
1369+
assert reporter.tableau_server_error_stats["NonXMLResponseError"] == 1
1370+
1371+
assert reporter.warnings == []
1372+
assert reporter.failures == []
1373+
1374+
13151375
@freeze_time(FROZEN_TIME)
13161376
@pytest.mark.parametrize(
13171377
"extract_project_hierarchy, allowed_projects",

0 commit comments

Comments
 (0)