diff --git a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py index ea60b9b00d..125938ace5 100644 --- a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py +++ b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py @@ -5,13 +5,13 @@ keywords: [destination, credentials, example, bigquery, custom destination] --- -In this example, you'll find a Python script that demonstrates how to load to bigquey with the custom destination. +In this example, you'll find a Python script that demonstrates how to load to BigQuery with the custom destination. We'll learn how to: -- use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials) -- use the [custom destination](../dlt-ecosystem/destinations/destination.md) -- Use pyarrow tables to create complex column types on bigquery -- Use bigquery `autodetect=True` for schema inference from parquet files +- Use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials) +- Use the [custom destination](../dlt-ecosystem/destinations/destination.md) +- Use pyarrow tables to create complex column types on BigQuery +- Use BigQuery `autodetect=True` for schema inference from parquet files """ @@ -38,7 +38,7 @@ def resource(url: str): # load pyarrow table with pandas table = pa.Table.from_pandas(pd.read_csv(url)) - # we add a list type column to demontrate bigquery lists + # we add a list type column to demonstrate bigquery lists table = table.append_column( "tags", pa.array( @@ -57,12 +57,12 @@ def resource(url: str): yield table -# dlt biquery custom destination +# dlt bigquery custom destination # we can use the dlt provided credentials class # to retrieve the gcp credentials from the secrets -@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0) +@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0, naming_convention="snake_case") def bigquery_insert( - items, table, credentials: GcpServiceAccountCredentials = dlt.secrets.value + items, table=BIGQUERY_TABLE_ID, credentials: GcpServiceAccountCredentials = dlt.secrets.value ) -> None: client = bigquery.Client( credentials.project_id, credentials.to_native_credentials(), location="US" @@ -74,7 +74,7 @@ def bigquery_insert( ) # since we have set the batch_size to 0, we get a filepath and can load the file directly with open(items, "rb") as f: - load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config) + load_job = client.load_table_from_file(f, table, job_config=job_config) load_job.result() # Waits for the job to complete. diff --git a/docs/website/blog/2024-05-23-contributed-first-pipeline.md b/docs/website/blog/2024-05-23-contributed-first-pipeline.md new file mode 100644 index 0000000000..aae6e0f298 --- /dev/null +++ b/docs/website/blog/2024-05-23-contributed-first-pipeline.md @@ -0,0 +1,90 @@ +--- +slug: contributed-first-pipeline +title: "How I contributed my first data pipeline to the open source." +image: https://storage.googleapis.com/dlt-blog-images/blog_my_first_data_pipeline.png +authors: + name: Aman Gupta + title: Junior Data Engineer + url: https://github.com/dat-a-man + image_url: https://dlt-static.s3.eu-central-1.amazonaws.com/images/aman.png +tags: [data ingestion, python sdk, ETL, python data pipelines, Open Source, Developer Tools] +--- + +Hello, I'm Aman Gupta. Over the past eight years, I have navigated the structured world of civil engineering, but recently, I have found myself captivated by data engineering. Initially, I knew how to stack bricks and build structural pipelines. But this newfound interest has helped me build data pipelines, and most of all, it was sparked by a workshop hosted by **dlt.** + +:::info +dlt (data loading tool) is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets. +::: + +The `dlt` workshop took place in November 2022, co-hosted by Adrian Brudaru, my former mentor and co-founder of `dlt`. + +An opportunity arose when another client needed data migration from FreshDesk to BigQuery. I crafted a basic pipeline version, initially designed to support my use case. Upon presenting my basic pipeline to the dlt team, Alena Astrakhatseva, a team member, generously offered to review it and refine it into a community-verified source. + +![image](https://storage.googleapis.com/dlt-blog-images/blog_my_first_data_pipeline.png) + +My first iteration was straightforward—loading data in [replace mode](https://dlthub.com/docs/general-usage/incremental-loading#the-3-write-dispositions). While adequate for initial purposes, a verified source demanded features like [pagination](https://dlthub.com/docs/general-usage/http/overview#explicitly-specifying-pagination-parameters) and [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading). To achieve this, I developed an API client tailored for the Freshdesk API, integrating rate limit handling and pagination: + +```py +class FreshdeskClient: + """ + Client for making authenticated requests to the Freshdesk API. It incorporates API requests with + rate limit and pagination. + """ + + def __init__(self, api_key: str, domain: str): + # Contains stuff like domain, credentials and base URL. + pass + + def _request_with_rate_limit(self, url: str, **kwargs: Any) -> requests.Response: + # Handles rate limits in HTTP requests and ensures that the client doesn't exceed the limit set by the server. + pass + + def paginated_response( + self, + endpoint: str, + per_page: int, + updated_at: Optional[str] = None, + ) -> Iterable[TDataItem]: + # Fetches a paginated response from a specified endpoint. + pass +``` + +To further make the pipeline effective, I developed dlt [resources](https://dlthub.com/docs/general-usage/resource) that could handle incremental data loading. This involved creating resources that used **`dlt`**'s incremental functionality to fetch only new or updated data: + +```py +def incremental_resource( + endpoint: str, + updated_at: Optional[Any] = dlt.sources.incremental( + "updated_at", initial_value="2022-01-01T00:00:00Z" + ), +) -> Generator[Dict[Any, Any], Any, None]: + """ + Fetches and yields paginated data from a specified API endpoint. + Each page of data is fetched based on the `updated_at` timestamp + to ensure incremental loading. + """ + + # Retrieve the last updated timestamp to fetch only new or updated records. + updated_at = updated_at.last_value + + # Use the FreshdeskClient instance to fetch paginated responses + yield from freshdesk.paginated_response( + endpoint=endpoint, + per_page=per_page, + updated_at=updated_at, + ) +``` + +With the steps defined above, I was able to load the data from Freshdesk to BigQuery and use the pipeline in production. Here’s a summary of the steps I followed: + +1. Created a Freshdesk API token with sufficient privileges. +1. Created an API client to make requests to the Freshdesk API with rate limit and pagination. +1. Made incremental requests to this client based on the “updated_at” field in the response. +1. Ran the pipeline using the Python script. + + +While my journey from civil engineering to data engineering was initially intimidating, it has proved to be a profound learning experience. Writing a pipeline with **`dlt`** mirrors the simplicity of a GET request: you request data, yield it, and it flows from the source to its destination. Now, I help other clients integrate **`dlt`** to streamline their data workflows, which has been an invaluable part of my professional growth. + +In conclusion, diving into data engineering has expanded my technical skill set and provided a new lens through which I view challenges and solutions. As for me, the lens view mainly was concrete and steel a couple of years back, which has now begun to notice the pipelines of the data world. + +Data engineering has proved both challenging, satisfying and a good carrier option for me till now. For those interested in the detailed workings of these pipelines, I encourage exploring dlt's [GitHub repository](https://github.com/dlt-hub/verified-sources) or diving into the [documentation](https://dlthub.com/docs/dlt-ecosystem/verified-sources/freshdesk). \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md index 1f79055d06..0022850987 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md @@ -203,7 +203,7 @@ For example, you can set the primary key, write disposition, and other default s ```py config = { "client": { - ... + # ... }, "resource_defaults": { "primary_key": "id", @@ -216,15 +216,17 @@ config = { }, "resources": [ "resource1", - "resource2": { - "name": "resource2_name", - "write_disposition": "append", - "endpoint": { - "params": { - "param1": "value1", + { + "resource2": { + "name": "resource2_name", + "write_disposition": "append", + "endpoint": { + "params": { + "param1": "value1", + }, }, - }, - }, + } + } ], } ``` @@ -309,7 +311,7 @@ To specify the pagination configuration, use the `paginator` field in the [clien ```py { - ... + # ... "paginator": { "type": "json_links", "next_url_path": "paging.next", @@ -321,7 +323,7 @@ Or using the paginator instance: ```py { - ... + # ... "paginator": JSONResponsePaginator( next_url_path="paging.next" ), @@ -394,11 +396,11 @@ One of the most common method is token-based authentication. To authenticate wit ```py { "client": { - ... + # ... "auth": { "token": dlt.secrets["your_api_token"], }, - ... + # ... }, } ``` @@ -424,7 +426,7 @@ To specify the authentication configuration, use the `auth` field in the [client "type": "bearer", "token": dlt.secrets["your_api_token"], }, - ... + # ... }, } ``` @@ -438,7 +440,7 @@ config = { "client": { "auth": BearTokenAuth(dlt.secrets["your_api_token"]), }, - ... + # ... } ``` @@ -455,7 +457,7 @@ In the GitHub example, the `issue_comments` resource depends on the `issues` res "name": "issues", "endpoint": { "path": "issues", - ... + # ... }, }, { @@ -495,10 +497,12 @@ The `issue_comments` resource will make requests to the following endpoints: The syntax for the `resolve` field in parameter configuration is: ```py -"": { - "type": "resolve", - "resource": "", - "field": "", +{ + "": { + "type": "resolve", + "resource": "", + "field": "", + } } ``` @@ -530,21 +534,25 @@ When the API endpoint supports incremental loading, you can configure the source 1. Defining a special parameter in the `params` section of the [endpoint configuration](#endpoint-configuration): ```py - "": { - "type": "incremental", - "cursor_path": "", - "initial_value": "", - }, + { + "": { + "type": "incremental", + "cursor_path": "", + "initial_value": "", + }, + } ``` For example, in the `issues` resource configuration in the GitHub example, we have: ```py - "since": { - "type": "incremental", - "cursor_path": "updated_at", - "initial_value": "2024-01-25T11:21:28Z", - }, + { + "since": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": "2024-01-25T11:21:28Z", + }, + } ``` This configuration tells the source to create an incremental object that will keep track of the `updated_at` field in the response and use it as a value for the `since` parameter in subsequent requests. @@ -552,13 +560,15 @@ When the API endpoint supports incremental loading, you can configure the source 2. Specifying the `incremental` field in the [endpoint configuration](#endpoint-configuration): ```py - "incremental": { - "start_param": "", - "end_param": "", - "cursor_path": "", - "initial_value": "", - "end_value": "", - }, + { + "incremental": { + "start_param": "", + "end_param": "", + "cursor_path": "", + "initial_value": "", + "end_value": "", + } + } ``` This configuration is more flexible and allows you to specify the start and end conditions for the incremental loading.