Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the tutorial to use rest_client.paginate for pagination #1287

Merged
merged 4 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 43 additions & 98 deletions docs/website/docs/tutorial/grouping-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ This tutorial continues the [previous](load-data-from-an-api) part. We'll use th
In the previous tutorial, we loaded issues from the GitHub API. Now we'll prepare to load comments from the API as well. Here's a sample [dlt resource](../general-usage/resource) that does that:

```py
import dlt
from dlt.sources.helpers.rest_client import paginate

@dlt.resource(
table_name="comments",
write_disposition="merge",
Expand All @@ -22,17 +25,11 @@ In the previous tutorial, we loaded issues from the GitHub API. Now we'll prepar
def get_comments(
updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
url = "https://api.github.com/repos/dlt-hub/dlt/comments?per_page=100"

while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()

# get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/comments",
params={"per_page": 100}
):
yield page
AstrakhantsevaAA marked this conversation as resolved.
Show resolved Hide resolved
```

We can load this resource separately from the issues resource, however loading both issues and comments in one go is more efficient. To do that, we'll use the `@dlt.source` decorator on a function that returns a list of resources:
Expand All @@ -47,7 +44,7 @@ def github_source():

```py
import dlt
from dlt.sources.helpers import requests
from dlt.sources.helpers.rest_client import paginate

@dlt.resource(
table_name="issues",
Expand All @@ -57,21 +54,17 @@ from dlt.sources.helpers import requests
def get_issues(
updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
url = (
"https://api.github.com/repos/dlt-hub/dlt/issues"
f"?since={updated_at.last_value}&per_page=100"
"&sort=updated&directions=desc&state=open"
)

while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()

# Get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/issues",
params={
"since": updated_at.last_value,
"per_page": 100,
"sort": "updated",
"directions": "desc",
"state": "open",
}
):
yield page


@dlt.resource(
Expand All @@ -82,20 +75,14 @@ def get_issues(
def get_comments(
updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
url = (
"https://api.github.com/repos/dlt-hub/dlt/comments"
"?per_page=100"
)

while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()

# Get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/comments",
params={
"since": updated_at.last_value,
"per_page": 100,
}
):
yield page


@dlt.source
Expand Down Expand Up @@ -124,18 +111,8 @@ from dlt.sources.helpers import requests
BASE_GITHUB_URL = "https://api.github.com/repos/dlt-hub/dlt"

def fetch_github_data(endpoint, params={}):
"""Fetch data from GitHub API based on endpoint and params."""
url = f"{BASE_GITHUB_URL}/{endpoint}"

while True:
response = requests.get(url, params=params)
response.raise_for_status()
yield response.json()

# Get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]
return paginate(url, params=params)

@dlt.source
def github_source():
Expand Down Expand Up @@ -164,21 +141,16 @@ For the next step we'd want to get the [number of repository clones](https://doc
Let's handle this by changing our `fetch_github_data()` first:

```py
def fetch_github_data(endpoint, params={}, access_token=None):
"""Fetch data from GitHub API based on endpoint and params."""
headers = {"Authorization": f"Bearer {access_token}"} if access_token else {}
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

def fetch_github_data(endpoint, params={}, access_token=None):
url = f"{BASE_GITHUB_URL}/{endpoint}"
return paginate(
url,
params=params,
auth=BearerTokenAuth(token=access_token) if access_token else None,
)

while True:
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
yield response.json()

# Get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]

@dlt.source
def github_source(access_token):
Expand Down Expand Up @@ -229,28 +201,7 @@ access_token = "ghp_A...3aRY"
Now we can run the script and it will load the data from the `traffic/clones` endpoint:

```py
import dlt
from dlt.sources.helpers import requests

BASE_GITHUB_URL = "https://api.github.com/repos/dlt-hub/dlt"


def fetch_github_data(endpoint, params={}, access_token=None):
"""Fetch data from GitHub API based on endpoint and params."""
headers = {"Authorization": f"Bearer {access_token}"} if access_token else {}

url = f"{BASE_GITHUB_URL}/{endpoint}"

while True:
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
yield response.json()

# get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]

...

@dlt.source
def github_source(
Expand Down Expand Up @@ -287,19 +238,12 @@ BASE_GITHUB_URL = "https://api.github.com/repos/{repo_name}"

def fetch_github_data(repo_name, endpoint, params={}, access_token=None):
"""Fetch data from GitHub API based on repo_name, endpoint, and params."""
headers = {"Authorization": f"Bearer {access_token}"} if access_token else {}

url = BASE_GITHUB_URL.format(repo_name=repo_name) + f"/{endpoint}"

while True:
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
yield response.json()

# Get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]
return paginate(
url,
params=params,
auth=BearerTokenAuth(token=access_token) if access_token else None,
)


@dlt.source
Expand Down Expand Up @@ -347,5 +291,6 @@ Interested in learning more? Here are some suggestions:
- [Pass config and credentials into your sources and resources](../general-usage/credentials).
- [Run in production: inspecting, tracing, retry policies and cleaning up](../running-in-production/running).
- [Run resources in parallel, optimize buffers and local storage](../reference/performance.md)
- [Use REST API client helpers](../general-usage/http/rest-client.md) to simplify working with REST APIs.
3. Check out our [how-to guides](../walkthroughs) to get answers to some common questions.
4. Explore the [Examples](../examples) section to see how dlt can be used in real-world scenarios
51 changes: 50 additions & 1 deletion docs/website/docs/tutorial/load-data-from-an-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dlt pipeline github_issues show

## Append or replace your data

Try running the pipeline again with `python github_issues.py`. You will notice that the **issues** table contains two copies of the same data. This happens because the default load mode is `append`. It is very useful, for example, when you have a new folder created daily with `json` file logs, and you want to ingest them.
Try running the pipeline again with `python github_issues.py`. You will notice that the **issues** table contains two copies of the same data. This happens because the default load mode is `append`. It is very useful, for example, when you have daily data updates and you want to ingest them.

To get the latest data, we'd need to run the script again. But how to do that without duplicating the data?
One option is to tell `dlt` to replace the data in existing tables in the destination by using `replace` write disposition. Change the `github_issues.py` script to the following:
Expand Down Expand Up @@ -148,6 +148,55 @@ and `updated_at.last_value` to tell GitHub to return issues updated only **after

[Learn more about merge write disposition](../general-usage/incremental-loading#merge-incremental_loading).

## Using pagination helper

In the previous examples, we used the `requests` library to make HTTP requests to the GitHub API and handled pagination manually. `dlt` has the built-in [REST client](../general-usage/http/rest-client.md) that simplifies API requests. We'll pick the `paginate()` helper from it for the next example. The `paginate` function takes a URL and optional parameters (quite similar to `requests`) and returns a generator that yields pages of data.

Here's how the updated script looks:

```py
import dlt
from dlt.sources.helpers.rest_client import paginate

@dlt.resource(
table_name="issues",
write_disposition="merge",
primary_key="id",
)
def get_issues(
updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/issues",
params={
"since": updated_at.last_value,
"per_page": 100,
"sort": "updated",
"direction": "desc",
"state": "open",
},
):
yield page

pipeline = dlt.pipeline(
pipeline_name="github_issues_merge",
destination="duckdb",
dataset_name="github_data_merge",
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info

print(row_counts)
print("------")
print(load_info)
```

burnash marked this conversation as resolved.
Show resolved Hide resolved
Let's zoom in on the changes:

1. The `while` loop that handled pagination is replaced with reading pages from the `paginate()` generator.
2. `paginate()` takes the URL of the API endpoint and optional parameters. In this case, we pass the `since` parameter to get only issues updated after the last pipeline run.
3. We're not explicitly setting up pagination, `paginate()` handles it for us. Magic! Under the hood, `paginate()` analyzes the response and detects the pagination method used by the API. Read more about pagination in the [REST client documentation](../general-usage/http/rest-client.md#paginating-api-responses).

## Next steps

Continue your journey with the [Resource Grouping and Secrets](grouping-resources) tutorial.
Expand Down
Loading