From bdfecfbf28c2adb91cea5b2539665a5dc84d60cb Mon Sep 17 00:00:00 2001 From: Jac Date: Thu, 12 Dec 2024 16:20:40 -0800 Subject: [PATCH] feat: incremental refresh for extracts (#1545) * implement incremental refresh * add sample that creates an incremental extract/runs 'refresh now' --- samples/create_extract_task.py | 19 +++++--- samples/extracts.py | 46 +++++++++++++++---- samples/publish_workbook.py | 2 +- samples/refresh.py | 33 +++++++++---- .../server/endpoint/datasources_endpoint.py | 6 +-- .../server/endpoint/workbooks_endpoint.py | 8 ++-- tableauserverclient/server/request_factory.py | 7 +++ 7 files changed, 90 insertions(+), 31 deletions(-) diff --git a/samples/create_extract_task.py b/samples/create_extract_task.py index 8408f67ee..8c02fefff 100644 --- a/samples/create_extract_task.py +++ b/samples/create_extract_task.py @@ -29,7 +29,9 @@ def main(): help="desired logging level (set to error by default)", ) # Options specific to this sample: - # This sample has no additional options, yet. If you add some, please add them here + parser.add_argument("resource_type", choices=["workbook", "datasource"]) + parser.add_argument("resource_id") + parser.add_argument("--incremental", default=False) args = parser.parse_args() @@ -45,6 +47,7 @@ def main(): # Monthly Schedule # This schedule will run on the 15th of every month at 11:30PM monthly_interval = TSC.MonthlyInterval(start_time=time(23, 30), interval_value=15) + print(monthly_interval) monthly_schedule = TSC.ScheduleItem( None, None, @@ -53,18 +56,20 @@ def main(): monthly_interval, ) - # Default to using first workbook found in server - all_workbook_items, pagination_item = server.workbooks.get() - my_workbook: TSC.WorkbookItem = all_workbook_items[0] + my_workbook: TSC.WorkbookItem = server.workbooks.get_by_id(args.resource_id) target_item = TSC.Target( my_workbook.id, # the id of the workbook or datasource "workbook", # alternatively can be "datasource" ) - extract_item = TSC.TaskItem( + refresh_type = "FullRefresh" + if args.incremental: + refresh_type = "Incremental" + + scheduled_extract_item = TSC.TaskItem( None, - "FullRefresh", + refresh_type, None, None, None, @@ -74,7 +79,7 @@ def main(): ) try: - response = server.tasks.create(extract_item) + response = server.tasks.create(scheduled_extract_item) print(response) except Exception as e: print(e) diff --git a/samples/extracts.py b/samples/extracts.py index c0dd885bc..8e7a66aac 100644 --- a/samples/extracts.py +++ b/samples/extracts.py @@ -25,8 +25,11 @@ def main(): help="desired logging level (set to error by default)", ) # Options specific to this sample - parser.add_argument("--delete") - parser.add_argument("--create") + parser.add_argument("--create", action="store_true") + parser.add_argument("--delete", action="store_true") + parser.add_argument("--refresh", action="store_true") + parser.add_argument("--workbook", required=False) + parser.add_argument("--datasource", required=False) args = parser.parse_args() # Set logging level based on user input, or error by default @@ -39,20 +42,45 @@ def main(): server.add_http_options({"verify": False}) server.use_server_version() with server.auth.sign_in(tableau_auth): - # Gets all workbook items - all_workbooks, pagination_item = server.workbooks.get() - print(f"\nThere are {pagination_item.total_available} workbooks on site: ") - print([workbook.name for workbook in all_workbooks]) - if all_workbooks: - # Pick one workbook from the list - wb = all_workbooks[3] + wb = None + ds = None + if args.workbook: + wb = server.workbooks.get_by_id(args.workbook) + if wb is None: + raise ValueError(f"Workbook not found for id {args.workbook}") + elif args.datasource: + ds = server.datasources.get_by_id(args.datasource) + if ds is None: + raise ValueError(f"Datasource not found for id {args.datasource}") + else: + # Gets all workbook items + all_workbooks, pagination_item = server.workbooks.get() + print(f"\nThere are {pagination_item.total_available} workbooks on site: ") + print([workbook.name for workbook in all_workbooks]) + + if all_workbooks: + # Pick one workbook from the list + wb = all_workbooks[3] if args.create: print("create extract on wb ", wb.name) extract_job = server.workbooks.create_extract(wb, includeAll=True) print(extract_job) + if args.refresh: + extract_job = None + if ds is not None: + print(f"refresh extract on datasource {ds.name}") + extract_job = server.datasources.refresh(ds, includeAll=True, incremental=True) + elif wb is not None: + print(f"refresh extract on workbook {wb.name}") + extract_job = server.workbooks.refresh(wb) + else: + print("no content item selected to refresh") + + print(extract_job) + if args.delete: print("delete extract on wb ", wb.name) jj = server.workbooks.delete_extract(wb) diff --git a/samples/publish_workbook.py b/samples/publish_workbook.py index 052eee1f5..077ddaddd 100644 --- a/samples/publish_workbook.py +++ b/samples/publish_workbook.py @@ -55,7 +55,7 @@ def main(): # Step 1: Sign in to server. tableau_auth = TSC.PersonalAccessTokenAuth(args.token_name, args.token_value, site_id=args.site) - server = TSC.Server(args.server, use_server_version=True) + server = TSC.Server(args.server, use_server_version=True, http_options={"verify": False}) with server.auth.sign_in(tableau_auth): # Step2: Retrieve the project id, if a project name was passed if args.project is not None: diff --git a/samples/refresh.py b/samples/refresh.py index d3e49ed24..99242fcdb 100644 --- a/samples/refresh.py +++ b/samples/refresh.py @@ -27,6 +27,8 @@ def main(): # Options specific to this sample parser.add_argument("resource_type", choices=["workbook", "datasource"]) parser.add_argument("resource_id") + parser.add_argument("--incremental") + parser.add_argument("--synchronous") args = parser.parse_args() @@ -34,27 +36,42 @@ def main(): logging_level = getattr(logging, args.logging_level.upper()) logging.basicConfig(level=logging_level) + refresh_type = "FullRefresh" + incremental = False + if args.incremental: + refresh_type = "Incremental" + incremental = True + tableau_auth = TSC.PersonalAccessTokenAuth(args.token_name, args.token_value, site_id=args.site) - server = TSC.Server(args.server, use_server_version=True) + server = TSC.Server(args.server, use_server_version=True, http_options={"verify": False}) with server.auth.sign_in(tableau_auth): if args.resource_type == "workbook": # Get the workbook by its Id to make sure it exists resource = server.workbooks.get_by_id(args.resource_id) + print(resource) # trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done - job = server.workbooks.refresh(args.resource_id) + job = server.workbooks.refresh(args.resource_id, incremental=incremental) else: # Get the datasource by its Id to make sure it exists resource = server.datasources.get_by_id(args.resource_id) + print(resource) + + # server.datasources.create_extract(resource) # trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done - job = server.datasources.refresh(resource) + job = server.datasources.refresh(resource, incremental=incremental) # by default runs as a sync task, - print(f"Update job posted (ID: {job.id})") - print("Waiting for job...") - # `wait_for_job` will throw if the job isn't executed successfully - job = server.jobs.wait_for_job(job) - print("Job finished succesfully") + print(f"{refresh_type} job posted (ID: {job.id})") + if args.synchronous: + # equivalent to tabcmd --synchnronous: wait for the job to complete + try: + # `wait_for_job` will throw if the job isn't executed successfully + print("Waiting for job...") + server.jobs.wait_for_job(job) + print("Job finished succesfully") + except Exception as e: + print(f"Job failed! {e}") if __name__ == "__main__": diff --git a/tableauserverclient/server/endpoint/datasources_endpoint.py b/tableauserverclient/server/endpoint/datasources_endpoint.py index 88c739dcf..a7a111516 100644 --- a/tableauserverclient/server/endpoint/datasources_endpoint.py +++ b/tableauserverclient/server/endpoint/datasources_endpoint.py @@ -187,11 +187,11 @@ def update_connection( return connection @api(version="2.8") - def refresh(self, datasource_item: DatasourceItem) -> JobItem: + def refresh(self, datasource_item: DatasourceItem, incremental: bool = False) -> JobItem: id_ = getattr(datasource_item, "id", datasource_item) url = f"{self.baseurl}/{id_}/refresh" - empty_req = RequestFactory.Empty.empty_req() - server_response = self.post_request(url, empty_req) + refresh_req = RequestFactory.Task.refresh_req(incremental) + server_response = self.post_request(url, refresh_req) new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0] return new_job diff --git a/tableauserverclient/server/endpoint/workbooks_endpoint.py b/tableauserverclient/server/endpoint/workbooks_endpoint.py index 53bf0c1a7..4fdcf075b 100644 --- a/tableauserverclient/server/endpoint/workbooks_endpoint.py +++ b/tableauserverclient/server/endpoint/workbooks_endpoint.py @@ -118,7 +118,7 @@ def get_by_id(self, workbook_id: str) -> WorkbookItem: return WorkbookItem.from_response(server_response.content, self.parent_srv.namespace)[0] @api(version="2.8") - def refresh(self, workbook_item: Union[WorkbookItem, str]) -> JobItem: + def refresh(self, workbook_item: Union[WorkbookItem, str], incremental: bool = False) -> JobItem: """ Refreshes the extract of an existing workbook. @@ -126,6 +126,8 @@ def refresh(self, workbook_item: Union[WorkbookItem, str]) -> JobItem: ---------- workbook_item : WorkbookItem | str The workbook item or workbook ID. + incremental: bool + Whether to do a full refresh or incremental refresh of the extract data Returns ------- @@ -134,8 +136,8 @@ def refresh(self, workbook_item: Union[WorkbookItem, str]) -> JobItem: """ id_ = getattr(workbook_item, "id", workbook_item) url = f"{self.baseurl}/{id_}/refresh" - empty_req = RequestFactory.Empty.empty_req() - server_response = self.post_request(url, empty_req) + refresh_req = RequestFactory.Task.refresh_req(incremental) + server_response = self.post_request(url, refresh_req) new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0] return new_job diff --git a/tableauserverclient/server/request_factory.py b/tableauserverclient/server/request_factory.py index f0b2d1846..79ac6e4ca 100644 --- a/tableauserverclient/server/request_factory.py +++ b/tableauserverclient/server/request_factory.py @@ -1117,6 +1117,13 @@ def run_req(self, xml_request: ET.Element, task_item: Any) -> None: # Send an empty tsRequest pass + @_tsrequest_wrapped + def refresh_req(self, xml_request: ET.Element, incremental: bool = False) -> bytes: + task_element = ET.SubElement(xml_request, "extractRefresh") + if incremental: + task_element.attrib["incremental"] = "true" + return ET.tostring(xml_request) + @_tsrequest_wrapped def create_extract_req(self, xml_request: ET.Element, extract_item: "TaskItem") -> bytes: extract_element = ET.SubElement(xml_request, "extractRefresh")