From 1243b46805e3c7a40b652247d4ab2b8dca054ba3 Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Wed, 27 Sep 2023 16:01:14 +0200 Subject: [PATCH] example for zendesk incremental loading --- .../.dlt/config.toml | 1 + .../incremental_loading_zendesk/__init__.py | 0 .../incremental_loading_zendesk/run.py | 119 ++++++++++++++++++ 3 files changed, 120 insertions(+) create mode 100644 docs/examples/incremental_loading_zendesk/.dlt/config.toml create mode 100644 docs/examples/incremental_loading_zendesk/__init__.py create mode 100644 docs/examples/incremental_loading_zendesk/run.py diff --git a/docs/examples/incremental_loading_zendesk/.dlt/config.toml b/docs/examples/incremental_loading_zendesk/.dlt/config.toml new file mode 100644 index 0000000000..7e76a40d9c --- /dev/null +++ b/docs/examples/incremental_loading_zendesk/.dlt/config.toml @@ -0,0 +1 @@ +some_key="some_value" \ No newline at end of file diff --git a/docs/examples/incremental_loading_zendesk/__init__.py b/docs/examples/incremental_loading_zendesk/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/examples/incremental_loading_zendesk/run.py b/docs/examples/incremental_loading_zendesk/run.py new file mode 100644 index 0000000000..c7879764e4 --- /dev/null +++ b/docs/examples/incremental_loading_zendesk/run.py @@ -0,0 +1,119 @@ +from typing import Iterator, Optional, Dict, Any, Sequence + +import dlt +from dlt.common import pendulum +from dlt.common.time import ensure_pendulum_datetime +from dlt.common.typing import TDataItem, TDataItems, TAnyDateTime +from dlt.extract.source import DltResource +from dlt.sources.helpers.requests import client + + +@dlt.source(max_table_nesting=2) +def zendesk_support( + credentials=dlt.secrets.value, + start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), + end_date: Optional[TAnyDateTime] = None, +) -> DltResource: + """ + Retrieves data from Zendesk Support for tickets events. + + Args: + credentials: Zendesk credentials (default: dlt.secrets.value) + start_date: Start date for data extraction (default: 2000-01-01) + end_date: End date for data extraction (default: None). + If end time is not provided, the incremental loading will be + enabled, and after the initial run, only new data will be retrieved. + + Returns: + DltResource. + """ + # Convert start_date and end_date to Pendulum datetime objects + start_date_obj = ensure_pendulum_datetime(start_date) + end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None + + # Convert Pendulum datetime objects to Unix timestamps + start_date_ts = start_date_obj.int_timestamp + end_date_ts: Optional[int] = None + if end_date_obj: + end_date_ts = end_date_obj.int_timestamp + + # Extract credentials from secrets dictionary + auth = (credentials["email"], credentials["password"]) + subdomain = credentials["subdomain"] + url = f"https://{subdomain}.zendesk.com" + + @dlt.resource(primary_key="id", write_disposition="append") + def ticket_events( + timestamp: dlt.sources.incremental[int] = dlt.sources.incremental( + "timestamp", + initial_value=start_date_ts, + end_value=end_date_ts, + allow_external_schedulers=True, + ), + ) -> Iterator[TDataItem]: + # URL For ticket events + # 'https://d3v-dlthub.zendesk.com/api/v2/incremental/ticket_events.json?start_time=946684800' + event_pages = get_pages( + url=url, + endpoint="/api/v2/incremental/ticket_events.json", + auth=auth, + data_point_name="ticket_events", + params={"start_time": timestamp.last_value}, + ) + for page in event_pages: + yield page + if timestamp.end_out_of_range: + return + + return ticket_events + + +def get_pages( + url: str, + endpoint: str, + auth: Sequence[str], + data_point_name: str, + params: Optional[Dict[str, Any]] = None, +) -> Iterator[TDataItems]: + """ + Makes a request to a paginated endpoint and returns a generator of data items per page. + + Args: + url: The base URL. + endpoint: The url to the endpoint, e.g. /api/v2/calls + auth: Credentials for authentication. + data_point_name: The key which data items are nested under in the response object (e.g. calls) + params: Optional dict of query params to include in the request. + + Returns: + Generator of pages, each page is a list of dict data items. + """ + # update the page size to enable cursor pagination + params = params or {} + params["per_page"] = 1000 + headers = None + + # make request and keep looping until there is no next page + get_url = f"{url}{endpoint}" + while get_url: + response = client.get( + get_url, headers=headers, auth=auth, params=params + ) + response.raise_for_status() + response_json = response.json() + result = response_json[data_point_name] + yield result + + get_url = None + # See https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#json-format + if not response_json["end_of_stream"]: + get_url = response_json["next_page"] + + +# build duckdb pipeline +pipeline = dlt.pipeline( + pipeline_name="zendesk", destination="duckdb", dataset_name="zendesk_data" +) + +load_info = pipeline.run(zendesk_support()) +print(load_info) \ No newline at end of file