Skip to content

Commit

Permalink
feat: incremental refresh for extracts (#1545)
Browse files Browse the repository at this point in the history
* implement incremental refresh
* add sample that creates an incremental extract/runs 'refresh now'
  • Loading branch information
jacalata authored Dec 13, 2024
1 parent e3f1e22 commit bdfecfb
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 31 deletions.
19 changes: 12 additions & 7 deletions samples/create_extract_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def main():
help="desired logging level (set to error by default)",
)
# Options specific to this sample:
# This sample has no additional options, yet. If you add some, please add them here
parser.add_argument("resource_type", choices=["workbook", "datasource"])
parser.add_argument("resource_id")
parser.add_argument("--incremental", default=False)

args = parser.parse_args()

Expand All @@ -45,6 +47,7 @@ def main():
# Monthly Schedule
# This schedule will run on the 15th of every month at 11:30PM
monthly_interval = TSC.MonthlyInterval(start_time=time(23, 30), interval_value=15)
print(monthly_interval)
monthly_schedule = TSC.ScheduleItem(
None,
None,
Expand All @@ -53,18 +56,20 @@ def main():
monthly_interval,
)

# Default to using first workbook found in server
all_workbook_items, pagination_item = server.workbooks.get()
my_workbook: TSC.WorkbookItem = all_workbook_items[0]
my_workbook: TSC.WorkbookItem = server.workbooks.get_by_id(args.resource_id)

target_item = TSC.Target(
my_workbook.id, # the id of the workbook or datasource
"workbook", # alternatively can be "datasource"
)

extract_item = TSC.TaskItem(
refresh_type = "FullRefresh"
if args.incremental:
refresh_type = "Incremental"

scheduled_extract_item = TSC.TaskItem(
None,
"FullRefresh",
refresh_type,
None,
None,
None,
Expand All @@ -74,7 +79,7 @@ def main():
)

try:
response = server.tasks.create(extract_item)
response = server.tasks.create(scheduled_extract_item)
print(response)
except Exception as e:
print(e)
Expand Down
46 changes: 37 additions & 9 deletions samples/extracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ def main():
help="desired logging level (set to error by default)",
)
# Options specific to this sample
parser.add_argument("--delete")
parser.add_argument("--create")
parser.add_argument("--create", action="store_true")
parser.add_argument("--delete", action="store_true")
parser.add_argument("--refresh", action="store_true")
parser.add_argument("--workbook", required=False)
parser.add_argument("--datasource", required=False)
args = parser.parse_args()

# Set logging level based on user input, or error by default
Expand All @@ -39,20 +42,45 @@ def main():
server.add_http_options({"verify": False})
server.use_server_version()
with server.auth.sign_in(tableau_auth):
# Gets all workbook items
all_workbooks, pagination_item = server.workbooks.get()
print(f"\nThere are {pagination_item.total_available} workbooks on site: ")
print([workbook.name for workbook in all_workbooks])

if all_workbooks:
# Pick one workbook from the list
wb = all_workbooks[3]
wb = None
ds = None
if args.workbook:
wb = server.workbooks.get_by_id(args.workbook)
if wb is None:
raise ValueError(f"Workbook not found for id {args.workbook}")
elif args.datasource:
ds = server.datasources.get_by_id(args.datasource)
if ds is None:
raise ValueError(f"Datasource not found for id {args.datasource}")
else:
# Gets all workbook items
all_workbooks, pagination_item = server.workbooks.get()
print(f"\nThere are {pagination_item.total_available} workbooks on site: ")
print([workbook.name for workbook in all_workbooks])

if all_workbooks:
# Pick one workbook from the list
wb = all_workbooks[3]

if args.create:
print("create extract on wb ", wb.name)
extract_job = server.workbooks.create_extract(wb, includeAll=True)
print(extract_job)

if args.refresh:
extract_job = None
if ds is not None:
print(f"refresh extract on datasource {ds.name}")
extract_job = server.datasources.refresh(ds, includeAll=True, incremental=True)
elif wb is not None:
print(f"refresh extract on workbook {wb.name}")
extract_job = server.workbooks.refresh(wb)
else:
print("no content item selected to refresh")

print(extract_job)

if args.delete:
print("delete extract on wb ", wb.name)
jj = server.workbooks.delete_extract(wb)
Expand Down
2 changes: 1 addition & 1 deletion samples/publish_workbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def main():

# Step 1: Sign in to server.
tableau_auth = TSC.PersonalAccessTokenAuth(args.token_name, args.token_value, site_id=args.site)
server = TSC.Server(args.server, use_server_version=True)
server = TSC.Server(args.server, use_server_version=True, http_options={"verify": False})
with server.auth.sign_in(tableau_auth):
# Step2: Retrieve the project id, if a project name was passed
if args.project is not None:
Expand Down
33 changes: 25 additions & 8 deletions samples/refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,51 @@ def main():
# Options specific to this sample
parser.add_argument("resource_type", choices=["workbook", "datasource"])
parser.add_argument("resource_id")
parser.add_argument("--incremental")
parser.add_argument("--synchronous")

args = parser.parse_args()

# Set logging level based on user input, or error by default
logging_level = getattr(logging, args.logging_level.upper())
logging.basicConfig(level=logging_level)

refresh_type = "FullRefresh"
incremental = False
if args.incremental:
refresh_type = "Incremental"
incremental = True

tableau_auth = TSC.PersonalAccessTokenAuth(args.token_name, args.token_value, site_id=args.site)
server = TSC.Server(args.server, use_server_version=True)
server = TSC.Server(args.server, use_server_version=True, http_options={"verify": False})
with server.auth.sign_in(tableau_auth):
if args.resource_type == "workbook":
# Get the workbook by its Id to make sure it exists
resource = server.workbooks.get_by_id(args.resource_id)
print(resource)

# trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done
job = server.workbooks.refresh(args.resource_id)
job = server.workbooks.refresh(args.resource_id, incremental=incremental)
else:
# Get the datasource by its Id to make sure it exists
resource = server.datasources.get_by_id(args.resource_id)
print(resource)

# server.datasources.create_extract(resource)

# trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done
job = server.datasources.refresh(resource)
job = server.datasources.refresh(resource, incremental=incremental) # by default runs as a sync task,

print(f"Update job posted (ID: {job.id})")
print("Waiting for job...")
# `wait_for_job` will throw if the job isn't executed successfully
job = server.jobs.wait_for_job(job)
print("Job finished succesfully")
print(f"{refresh_type} job posted (ID: {job.id})")
if args.synchronous:
# equivalent to tabcmd --synchnronous: wait for the job to complete
try:
# `wait_for_job` will throw if the job isn't executed successfully
print("Waiting for job...")
server.jobs.wait_for_job(job)
print("Job finished succesfully")
except Exception as e:
print(f"Job failed! {e}")


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions tableauserverclient/server/endpoint/datasources_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ def update_connection(
return connection

@api(version="2.8")
def refresh(self, datasource_item: DatasourceItem) -> JobItem:
def refresh(self, datasource_item: DatasourceItem, incremental: bool = False) -> JobItem:
id_ = getattr(datasource_item, "id", datasource_item)
url = f"{self.baseurl}/{id_}/refresh"
empty_req = RequestFactory.Empty.empty_req()
server_response = self.post_request(url, empty_req)
refresh_req = RequestFactory.Task.refresh_req(incremental)
server_response = self.post_request(url, refresh_req)
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job

Expand Down
8 changes: 5 additions & 3 deletions tableauserverclient/server/endpoint/workbooks_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ def get_by_id(self, workbook_id: str) -> WorkbookItem:
return WorkbookItem.from_response(server_response.content, self.parent_srv.namespace)[0]

@api(version="2.8")
def refresh(self, workbook_item: Union[WorkbookItem, str]) -> JobItem:
def refresh(self, workbook_item: Union[WorkbookItem, str], incremental: bool = False) -> JobItem:
"""
Refreshes the extract of an existing workbook.
Parameters
----------
workbook_item : WorkbookItem | str
The workbook item or workbook ID.
incremental: bool
Whether to do a full refresh or incremental refresh of the extract data
Returns
-------
Expand All @@ -134,8 +136,8 @@ def refresh(self, workbook_item: Union[WorkbookItem, str]) -> JobItem:
"""
id_ = getattr(workbook_item, "id", workbook_item)
url = f"{self.baseurl}/{id_}/refresh"
empty_req = RequestFactory.Empty.empty_req()
server_response = self.post_request(url, empty_req)
refresh_req = RequestFactory.Task.refresh_req(incremental)
server_response = self.post_request(url, refresh_req)
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job

Expand Down
7 changes: 7 additions & 0 deletions tableauserverclient/server/request_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,13 @@ def run_req(self, xml_request: ET.Element, task_item: Any) -> None:
# Send an empty tsRequest
pass

@_tsrequest_wrapped
def refresh_req(self, xml_request: ET.Element, incremental: bool = False) -> bytes:
task_element = ET.SubElement(xml_request, "extractRefresh")
if incremental:
task_element.attrib["incremental"] = "true"
return ET.tostring(xml_request)

@_tsrequest_wrapped
def create_extract_req(self, xml_request: ET.Element, extract_item: "TaskItem") -> bytes:
extract_element = ET.SubElement(xml_request, "extractRefresh")
Expand Down

0 comments on commit bdfecfb

Please sign in to comment.