From 04de62dc66eeca23d578a0eb667ff9e8a3ae9a83 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 24 Jan 2024 18:04:59 +0100 Subject: [PATCH] clarifies cursor field incremental in docs --- .../docs/general-usage/incremental-loading.md | 72 ++++++++++--------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index bd3bd4733a..0dc85332de 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -145,20 +145,19 @@ In example above we enforce the root key propagation with `fb_ads.root_key = Tru that correct data is propagated on initial `replace` load so the future `merge` load can be executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`. -## Incremental loading with last value +## Incremental loading with a cursor field -In most of the APIs (and other data sources i.e. database tables) you can request only new or updated -data by passing a timestamp or id of the last record to a query. The API/database returns just the -new/updated records from which you take "last value" timestamp/id for the next load. +In most of the REST APIs (and other data sources i.e. database tables) you can request new or updated +data by passing a timestamp or id of the "last" record to a query. The API/database returns just the +new/updated records from which you take maximum/minimum timestamp/id for the next load. To do incremental loading this way, we need to -- figure which data element is used to get new/updated records (e.g. “last value”, “last updated - at”, etc.); -- request the new part only (how we do this depends on the source API). +- figure which field is used to track changes (the so called **cursor field**) (e.g. “inserted_at”, "updated_at”, etc.); +- how to past the "last" (maximum/minimum) value of cursor field to an API to get just new / modified data (how we do this depends on the source API). -Once you've figured that out, `dlt` takes care of the loading of the incremental, removing -duplicates and managing the state with last values. Take a look at GitHub example below, where we +Once you've figured that out, `dlt` takes care of finding maximum/minimum cursor field values, removing +duplicates and managing the state with last values of cursor. Take a look at GitHub example below, where we request recently created issues. ```python @@ -166,36 +165,38 @@ request recently created issues. def repo_issues( access_token, repository, - created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z") + updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): - # get issues since "created_at" stored in state on previous run (or initial_value on first run) - for page in _get_issues_page(access_token, repository, since=created_at.start_value): + # get issues since "updated_at" stored in state on previous run (or initial_value on first run) + for page in _get_issues_page(access_token, repository, since=updated_at.start_value): yield page # last_value is updated after every page - print(created_at.last_value) + print(updated_at.last_value) ``` -Here we add `created_at` argument that will receive incremental state, initialized to -`1970-01-01T00:00:00Z`. It is configured to track `created_at` field in issues returned by -`_get_issues_page` and then yielded. It will store the newest `created_at` value in `dlt` -[state](state.md) and make it available in `created_at.start_value` on next pipeline -run. This value is used to request only issues newer (or equal) via GitHub API. +Here we add `updated_at` argument that will receive incremental state, initialized to +`1970-01-01T00:00:00Z`. It is configured to track `updated_at` field in issues yielded by +`repo_issues` resource. It will store the newest `updated_at` value in `dlt` +[state](state.md) and make it available in `updated_at.start_value` on next pipeline +run. This value is inserted in `_get_issues_page` function into request query param **since** to [Github API](https://docs.github.com/en/rest/issues/issues?#list-repository-issues) In essence, `dlt.sources.incremental` instance above -* **created_at.initial_value** which is always equal to "1970-01-01T00:00:00Z" passed in constructor -* **created_at.start_value** a maximum `created_at` value from the previous run or the **initial_value** on first run -* **created_at.last_value** a "real time" `created_at` value updated with each yielded item or page. before first yield it equals **start_value** -* **created_at.end_value** (here not used) [marking end of backfill range](#using-dltsourcesincremental-for-backfill) +* **updated_at.initial_value** which is always equal to "1970-01-01T00:00:00Z" passed in constructor +* **updated_at.start_value** a maximum `updated_at` value from the previous run or the **initial_value** on first run +* **updated_at.last_value** a "real time" `updated_at` value updated with each yielded item or page. before first yield it equals **start_value** +* **updated_at.end_value** (here not used) [marking end of backfill range](#using-dltsourcesincremental-for-backfill) When paginating you probably need **start_value** which does not change during the execution of the resource, however most paginators will return a **next page** link which you should use. Behind the scenes, `dlt` will deduplicate the results ie. in case the last issue is returned again -(`created_at` filter is inclusive) and skip already loaded ones. In the example below we +(`updated_at` filter is inclusive) and skip already loaded ones. + + +In the example below we incrementally load the GitHub events, where API does not let us filter for the newest events - it always returns all of them. Nevertheless, `dlt` will load only the new items, filtering out all the duplicates and past issues. - ```python # use naming function in table name to generate separate tables for each event @dlt.resource(primary_key="id", table_name=lambda i: i['type']) # type: ignore @@ -301,13 +302,15 @@ def stripe(): Please note that in the example above, `get_resource` is passed as a function to `dlt.resource` to which we bind the endpoint: **dlt.resource(...)(endpoint)**. -> 🛑 The typical mistake is to pass a generator (not a function) as below: -> -> `yield dlt.resource(get_resource(endpoint), name=endpoint.value, write_disposition="merge", primary_key="id")`. -> -> Here we call **get_resource(endpoint)** and that creates un-evaluated generator on which resource -> is created. That prevents `dlt` from controlling the **created** argument during runtime and will -> result in `IncrementalUnboundError` exception. +:::caution +The typical mistake is to pass a generator (not a function) as below: + +`yield dlt.resource(get_resource(endpoint), name=endpoint.value, write_disposition="merge", primary_key="id")`. + +Here we call **get_resource(endpoint)** and that creates un-evaluated generator on which resource +is created. That prevents `dlt` from controlling the **created** argument during runtime and will +result in `IncrementalUnboundError` exception. +::: ### Using `dlt.sources.incremental` for backfill You can specify both initial and end dates when defining incremental loading. Let's go back to our Github example: @@ -457,10 +460,13 @@ when using `min()` "higher" and "lower" are inverted. You can use these flags when both: 1. The source does **not** offer start/end filtering of results (e.g. there is no `start_time/end_time` query parameter or similar) -2. The source returns results ordered by the cursor field +2. The source returns results **ordered by the cursor field** -**Note**: These flags should not be used for unordered sources, e.g. if an API returns results both higher and lower +:::caution +If you use those flags, **make sure that the data source returns record ordered** (ascending / descending) on the cursor field, +e.g. if an API returns results both higher and lower than the given `end_value` in no particular order, the `end_out_of_range` flag can be `True` but you'll still want to keep loading. +::: The github events example above demonstrates how to use `start_out_of_range` as a stop condition. This approach works in any case where the API returns items in descending order and we're incrementally loading newer data.