Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added blogpost contributing first data pipeline #1390

Merged
merged 4 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
keywords: [destination, credentials, example, bigquery, custom destination]
---

In this example, you'll find a Python script that demonstrates how to load to bigquey with the custom destination.
In this example, you'll find a Python script that demonstrates how to load to BigQuery with the custom destination.

We'll learn how to:
- use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials)
- use the [custom destination](../dlt-ecosystem/destinations/destination.md)
- Use pyarrow tables to create complex column types on bigquery
- Use bigquery `autodetect=True` for schema inference from parquet files
- Use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials)
- Use the [custom destination](../dlt-ecosystem/destinations/destination.md)
- Use pyarrow tables to create complex column types on BigQuery
- Use BigQuery `autodetect=True` for schema inference from parquet files

"""

Expand All @@ -38,7 +38,7 @@
def resource(url: str):
# load pyarrow table with pandas
table = pa.Table.from_pandas(pd.read_csv(url))
# we add a list type column to demontrate bigquery lists
# we add a list type column to demonstrate bigquery lists
table = table.append_column(
"tags",
pa.array(
Expand All @@ -57,12 +57,12 @@ def resource(url: str):
yield table


# dlt biquery custom destination
# dlt bigquery custom destination
# we can use the dlt provided credentials class
# to retrieve the gcp credentials from the secrets
@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0)
@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0, naming_convention="snake_case")
def bigquery_insert(
items, table, credentials: GcpServiceAccountCredentials = dlt.secrets.value
items, table=BIGQUERY_TABLE_ID, credentials: GcpServiceAccountCredentials = dlt.secrets.value
) -> None:
client = bigquery.Client(
credentials.project_id, credentials.to_native_credentials(), location="US"
Expand All @@ -74,7 +74,7 @@ def bigquery_insert(
)
# since we have set the batch_size to 0, we get a filepath and can load the file directly
with open(items, "rb") as f:
load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config)
load_job = client.load_table_from_file(f, table, job_config=job_config)
load_job.result() # Waits for the job to complete.


Expand Down
90 changes: 90 additions & 0 deletions docs/website/blog/2024-05-23-contributed-first-pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
---
slug: contributed-first-pipeline
title: "How I contributed my first data pipeline to the open source."
image: https://storage.googleapis.com/dlt-blog-images/blog_my_first_data_pipeline.png
authors:
name: Aman Gupta
title: Junior Data Engineer
url: https://github.com/dat-a-man
image_url: https://dlt-static.s3.eu-central-1.amazonaws.com/images/aman.png
tags: [data ingestion, python sdk, ETL, python data pipelines, Open Source, Developer Tools]
---

Hello, I'm Aman Gupta. Over the past eight years, I have navigated the structured world of civil engineering, but recently, I have found myself captivated by data engineering. Initially, I knew how to stack bricks and build structural pipelines. But this newfound interest has helped me build data pipelines, and most of all, it was sparked by a workshop hosted by **dlt.**

:::info
dlt (data loading tool) is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets.
:::

The `dlt` workshop took place in November 2022, co-hosted by Adrian Brudaru, my former mentor and co-founder of `dlt`.

An opportunity arose when another client needed data migration from FreshDesk to BigQuery. I crafted a basic pipeline version, initially designed to support my use case. Upon presenting my basic pipeline to the dlt team, Alena Astrakhatseva, a team member, generously offered to review it and refine it into a community-verified source.

![image](https://storage.googleapis.com/dlt-blog-images/blog_my_first_data_pipeline.png)

My first iteration was straightforward—loading data in [replace mode](https://dlthub.com/docs/general-usage/incremental-loading#the-3-write-dispositions). While adequate for initial purposes, a verified source demanded features like [pagination](https://dlthub.com/docs/general-usage/http/overview#explicitly-specifying-pagination-parameters) and [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading). To achieve this, I developed an API client tailored for the Freshdesk API, integrating rate limit handling and pagination:

```py
class FreshdeskClient:
"""
Client for making authenticated requests to the Freshdesk API. It incorporates API requests with
rate limit and pagination.
"""

def __init__(self, api_key: str, domain: str):
# Contains stuff like domain, credentials and base URL.
pass

def _request_with_rate_limit(self, url: str, **kwargs: Any) -> requests.Response:
# Handles rate limits in HTTP requests and ensures that the client doesn't exceed the limit set by the server.
pass

def paginated_response(
self,
endpoint: str,
per_page: int,
updated_at: Optional[str] = None,
) -> Iterable[TDataItem]:
# Fetches a paginated response from a specified endpoint.
pass
```

To further make the pipeline effective, I developed dlt [resources](https://dlthub.com/docs/general-usage/resource) that could handle incremental data loading. This involved creating resources that used **`dlt`**'s incremental functionality to fetch only new or updated data:

```py
def incremental_resource(
endpoint: str,
updated_at: Optional[Any] = dlt.sources.incremental(
"updated_at", initial_value="2022-01-01T00:00:00Z"
),
) -> Generator[Dict[Any, Any], Any, None]:
"""
Fetches and yields paginated data from a specified API endpoint.
Each page of data is fetched based on the `updated_at` timestamp
to ensure incremental loading.
"""

# Retrieve the last updated timestamp to fetch only new or updated records.
updated_at = updated_at.last_value

# Use the FreshdeskClient instance to fetch paginated responses
yield from freshdesk.paginated_response(
endpoint=endpoint,
per_page=per_page,
updated_at=updated_at,
)
```

With the steps defined above, I was able to load the data from Freshdesk to BigQuery and use the pipeline in production. Here’s a summary of the steps I followed:

1. Created a Freshdesk API token with sufficient privileges.
1. Created an API client to make requests to the Freshdesk API with rate limit and pagination.
1. Made incremental requests to this client based on the “updated_at” field in the response.
1. Ran the pipeline using the Python script.


While my journey from civil engineering to data engineering was initially intimidating, it has proved to be a profound learning experience. Writing a pipeline with **`dlt`** mirrors the simplicity of a GET request: you request data, yield it, and it flows from the source to its destination. Now, I help other clients integrate **`dlt`** to streamline their data workflows, which has been an invaluable part of my professional growth.

In conclusion, diving into data engineering has expanded my technical skill set and provided a new lens through which I view challenges and solutions. As for me, the lens view mainly was concrete and steel a couple of years back, which has now begun to notice the pipelines of the data world.

Data engineering has proved both challenging, satisfying and a good carrier option for me till now. For those interested in the detailed workings of these pipelines, I encourage exploring dlt's [GitHub repository](https://github.com/dlt-hub/verified-sources) or diving into the [documentation](https://dlthub.com/docs/dlt-ecosystem/verified-sources/freshdesk).
84 changes: 47 additions & 37 deletions docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ For example, you can set the primary key, write disposition, and other default s
```py
config = {
"client": {
...
# ...
},
"resource_defaults": {
"primary_key": "id",
Expand All @@ -216,15 +216,17 @@ config = {
},
"resources": [
"resource1",
"resource2": {
"name": "resource2_name",
"write_disposition": "append",
"endpoint": {
"params": {
"param1": "value1",
{
"resource2": {
"name": "resource2_name",
"write_disposition": "append",
"endpoint": {
"params": {
"param1": "value1",
},
},
},
},
}
}
],
}
```
Expand Down Expand Up @@ -309,7 +311,7 @@ To specify the pagination configuration, use the `paginator` field in the [clien

```py
{
...
# ...
"paginator": {
"type": "json_links",
"next_url_path": "paging.next",
Expand All @@ -321,7 +323,7 @@ Or using the paginator instance:

```py
{
...
# ...
"paginator": JSONResponsePaginator(
next_url_path="paging.next"
),
Expand Down Expand Up @@ -394,11 +396,11 @@ One of the most common method is token-based authentication. To authenticate wit
```py
{
"client": {
...
# ...
"auth": {
"token": dlt.secrets["your_api_token"],
},
...
# ...
},
}
```
Expand All @@ -424,7 +426,7 @@ To specify the authentication configuration, use the `auth` field in the [client
"type": "bearer",
"token": dlt.secrets["your_api_token"],
},
...
# ...
},
}
```
Expand All @@ -438,7 +440,7 @@ config = {
"client": {
"auth": BearTokenAuth(dlt.secrets["your_api_token"]),
},
...
# ...
}
```

Expand All @@ -455,7 +457,7 @@ In the GitHub example, the `issue_comments` resource depends on the `issues` res
"name": "issues",
"endpoint": {
"path": "issues",
...
# ...
},
},
{
Expand Down Expand Up @@ -495,10 +497,12 @@ The `issue_comments` resource will make requests to the following endpoints:
The syntax for the `resolve` field in parameter configuration is:

```py
"<parameter_name>": {
"type": "resolve",
"resource": "<parent_resource_name>",
"field": "<parent_resource_field_name>",
{
"<parameter_name>": {
"type": "resolve",
"resource": "<parent_resource_name>",
"field": "<parent_resource_field_name>",
}
}
```

Expand Down Expand Up @@ -530,35 +534,41 @@ When the API endpoint supports incremental loading, you can configure the source
1. Defining a special parameter in the `params` section of the [endpoint configuration](#endpoint-configuration):

```py
"<parameter_name>": {
"type": "incremental",
"cursor_path": "<path_to_cursor_field>",
"initial_value": "<initial_value>",
},
{
"<parameter_name>": {
"type": "incremental",
"cursor_path": "<path_to_cursor_field>",
"initial_value": "<initial_value>",
},
}
```

For example, in the `issues` resource configuration in the GitHub example, we have:

```py
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-25T11:21:28Z",
},
{
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-25T11:21:28Z",
},
}
```

This configuration tells the source to create an incremental object that will keep track of the `updated_at` field in the response and use it as a value for the `since` parameter in subsequent requests.

2. Specifying the `incremental` field in the [endpoint configuration](#endpoint-configuration):

```py
"incremental": {
"start_param": "<parameter_name>",
"end_param": "<parameter_name>",
"cursor_path": "<path_to_cursor_field>",
"initial_value": "<initial_value>",
"end_value": "<end_value>",
},
{
"incremental": {
"start_param": "<parameter_name>",
"end_param": "<parameter_name>",
"cursor_path": "<path_to_cursor_field>",
"initial_value": "<initial_value>",
"end_value": "<end_value>",
}
}
```

This configuration is more flexible and allows you to specify the start and end conditions for the incremental loading.
Expand Down
Loading