Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clarifies cursor field incremental in docs #910

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 39 additions & 33 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,57 +145,58 @@ In example above we enforce the root key propagation with `fb_ads.root_key = Tru
that correct data is propagated on initial `replace` load so the future `merge` load can be
executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`.

## Incremental loading with last value
## Incremental loading with a cursor field

In most of the APIs (and other data sources i.e. database tables) you can request only new or updated
data by passing a timestamp or id of the last record to a query. The API/database returns just the
new/updated records from which you take "last value" timestamp/id for the next load.
In most of the REST APIs (and other data sources i.e. database tables) you can request new or updated
data by passing a timestamp or id of the "last" record to a query. The API/database returns just the
new/updated records from which you take maximum/minimum timestamp/id for the next load.

To do incremental loading this way, we need to

- figure which data element is used to get new/updated records (e.g. “last value”, “last updated
at”, etc.);
- request the new part only (how we do this depends on the source API).
- figure which field is used to track changes (the so called **cursor field**) (e.g. “inserted_at”, "updated_at”, etc.);
- how to past the "last" (maximum/minimum) value of cursor field to an API to get just new / modified data (how we do this depends on the source API).

Once you've figured that out, `dlt` takes care of the loading of the incremental, removing
duplicates and managing the state with last values. Take a look at GitHub example below, where we
Once you've figured that out, `dlt` takes care of finding maximum/minimum cursor field values, removing
duplicates and managing the state with last values of cursor. Take a look at GitHub example below, where we
request recently created issues.

```python
@dlt.resource(primary_key="id")
def repo_issues(
access_token,
repository,
created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z")
updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
# get issues since "created_at" stored in state on previous run (or initial_value on first run)
for page in _get_issues_page(access_token, repository, since=created_at.start_value):
# get issues since "updated_at" stored in state on previous run (or initial_value on first run)
for page in _get_issues_page(access_token, repository, since=updated_at.start_value):
yield page
# last_value is updated after every page
print(created_at.last_value)
print(updated_at.last_value)
```

Here we add `created_at` argument that will receive incremental state, initialized to
`1970-01-01T00:00:00Z`. It is configured to track `created_at` field in issues returned by
`_get_issues_page` and then yielded. It will store the newest `created_at` value in `dlt`
[state](state.md) and make it available in `created_at.start_value` on next pipeline
run. This value is used to request only issues newer (or equal) via GitHub API.
Here we add `updated_at` argument that will receive incremental state, initialized to
`1970-01-01T00:00:00Z`. It is configured to track `updated_at` field in issues yielded by
`repo_issues` resource. It will store the newest `updated_at` value in `dlt`
[state](state.md) and make it available in `updated_at.start_value` on next pipeline
run. This value is inserted in `_get_issues_page` function into request query param **since** to [Github API](https://docs.github.com/en/rest/issues/issues?#list-repository-issues)

In essence, `dlt.sources.incremental` instance above
* **created_at.initial_value** which is always equal to "1970-01-01T00:00:00Z" passed in constructor
* **created_at.start_value** a maximum `created_at` value from the previous run or the **initial_value** on first run
* **created_at.last_value** a "real time" `created_at` value updated with each yielded item or page. before first yield it equals **start_value**
* **created_at.end_value** (here not used) [marking end of backfill range](#using-dltsourcesincremental-for-backfill)
* **updated_at.initial_value** which is always equal to "1970-01-01T00:00:00Z" passed in constructor
* **updated_at.start_value** a maximum `updated_at` value from the previous run or the **initial_value** on first run
* **updated_at.last_value** a "real time" `updated_at` value updated with each yielded item or page. before first yield it equals **start_value**
* **updated_at.end_value** (here not used) [marking end of backfill range](#using-dltsourcesincremental-for-backfill)

When paginating you probably need **start_value** which does not change during the execution of the resource, however
most paginators will return a **next page** link which you should use.

Behind the scenes, `dlt` will deduplicate the results ie. in case the last issue is returned again
(`created_at` filter is inclusive) and skip already loaded ones. In the example below we
(`updated_at` filter is inclusive) and skip already loaded ones.


In the example below we
incrementally load the GitHub events, where API does not let us filter for the newest events - it
always returns all of them. Nevertheless, `dlt` will load only the new items, filtering out all the
duplicates and past issues.

```python
# use naming function in table name to generate separate tables for each event
@dlt.resource(primary_key="id", table_name=lambda i: i['type']) # type: ignore
Expand Down Expand Up @@ -301,13 +302,15 @@ def stripe():
Please note that in the example above, `get_resource` is passed as a function to `dlt.resource` to
which we bind the endpoint: **dlt.resource(...)(endpoint)**.

> 🛑 The typical mistake is to pass a generator (not a function) as below:
>
> `yield dlt.resource(get_resource(endpoint), name=endpoint.value, write_disposition="merge", primary_key="id")`.
>
> Here we call **get_resource(endpoint)** and that creates un-evaluated generator on which resource
> is created. That prevents `dlt` from controlling the **created** argument during runtime and will
> result in `IncrementalUnboundError` exception.
:::caution
The typical mistake is to pass a generator (not a function) as below:

`yield dlt.resource(get_resource(endpoint), name=endpoint.value, write_disposition="merge", primary_key="id")`.

Here we call **get_resource(endpoint)** and that creates un-evaluated generator on which resource
is created. That prevents `dlt` from controlling the **created** argument during runtime and will
result in `IncrementalUnboundError` exception.
:::

### Using `dlt.sources.incremental` for backfill
You can specify both initial and end dates when defining incremental loading. Let's go back to our Github example:
Expand Down Expand Up @@ -457,10 +460,13 @@ when using `min()` "higher" and "lower" are inverted.
You can use these flags when both:

1. The source does **not** offer start/end filtering of results (e.g. there is no `start_time/end_time` query parameter or similar)
2. The source returns results ordered by the cursor field
2. The source returns results **ordered by the cursor field**

**Note**: These flags should not be used for unordered sources, e.g. if an API returns results both higher and lower
:::caution
If you use those flags, **make sure that the data source returns record ordered** (ascending / descending) on the cursor field,
e.g. if an API returns results both higher and lower
than the given `end_value` in no particular order, the `end_out_of_range` flag can be `True` but you'll still want to keep loading.
:::

The github events example above demonstrates how to use `start_out_of_range` as a stop condition.
This approach works in any case where the API returns items in descending order and we're incrementally loading newer data.
Expand Down
Loading