Skip to content

Commit

Permalink
update custom destination example
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Mar 19, 2024
1 parent 7e02f82 commit 040ed32
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# @@@DLT_SNIPPET_START example
# you can just paste services.json as credentials
[destination.bigquery.credentials]
client_email = ""
private_key = ""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from tests.utils import skipifgithubfork
from tests.pipeline.utils import assert_load_info


@skipifgithubfork
def custom_destination_biquery_snippets() -> None:
def custom_destination_biquery_snippet() -> None:
# @@@DLT_SNIPPET_START example
import dlt
import pandas as pd
import pyarrow as pa
from google.cloud import bigquery

from dlt.common.configuration.specs import GcpServiceAccountCredentials
Expand All @@ -23,8 +25,25 @@ def custom_destination_biquery_snippets() -> None:
# dlt sources
@dlt.resource(name="natural_disasters")
def resource(url: str):
df = pd.read_csv(OWID_DISASTERS_URL)
yield df.to_dict(orient="records")
# load pyarrow table with pandas
table = pa.Table.from_pandas(pd.read_csv(url))
# we add a list type column to demontrate bigquery lists
table = table.append_column(
"tags",
pa.array(
[["disasters", "earthquakes", "floods", "tsunamis"]] * len(table),
pa.list_(pa.string()),
),
)
# we add a struct type column to demonstrate bigquery structs
table = table.append_column(
"meta",
pa.array(
[{"loaded_by": "dlt"}] * len(table),
pa.struct([("loaded_by", pa.string())]),
),
)
yield table

# dlt biquery custom destination
# we can use the dlt provided credentials class
Expand All @@ -46,11 +65,7 @@ def bigquery_insert(
load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config)
load_job.result() # Waits for the job to complete.

# we can add some tags to each data item
# to demonstrate lists in biqquery
meta_data = {"meta": {"tags": ["disasters", "earthquakes", "floods", "tsunamis"]}}
resource.add_map(lambda item: {**item, **meta_data})

__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
# run the pipeline and print load results
pipeline = dlt.pipeline(
Expand All @@ -62,3 +77,6 @@ def bigquery_insert(
load_info = pipeline.run(resource(url=OWID_DISASTERS_URL))

print(load_info)

assert_load_info(load_info)
# @@@DLT_SNIPPET_END example
47 changes: 32 additions & 15 deletions docs/website/docs/examples/custom_destination_bigquery/index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: Custom destination BigQuery Example
title: Custom destination with BigQuery
description: Learn how use the custom destination to load to bigquery and use credentials
keywords: [destination, credentials, example, bigquery, custom destination]
---
Expand All @@ -18,11 +18,14 @@ In this example, you'll find a Python script that demonstrates how to load Googl

We'll learn how to:
- use [built-in credentials](../../general-usage/credentials/config_specs#gcp-credentials)
- use the [custom destination](../../dlt-ecosystem/destinations/destination.md).
- use the [custom destination](../../dlt-ecosystem/destinations/destination.md)
- Use pyarrow tables to create complex column types on bigquery
- Use bigquery autodetect=True for schema inference from parquet files

### Your bigquery credentials in secrets.toml
<!--@@@DLT_SNIPPET_START code/.dlt/example.secrets.toml::example-->
```toml
# you can just paste services.json as credentials
[destination.bigquery.credentials]
client_email = ""
private_key = ""
Expand All @@ -41,6 +44,7 @@ client_secret = ""
```py
import dlt
import pandas as pd
import pyarrow as pa
from google.cloud import bigquery

from dlt.common.configuration.specs import GcpServiceAccountCredentials
Expand All @@ -58,8 +62,25 @@ BIGQUERY_TABLE_ID = "chat-analytics-rasa-ci.ci_streaming_insert.natural-disaster
# dlt sources
@dlt.resource(name="natural_disasters")
def resource(url: str):
df = pd.read_csv(OWID_DISASTERS_URL)
yield df.to_dict(orient="records")
# load pyarrow table with pandas
table = pa.Table.from_pandas(pd.read_csv(url))
# we add a list type column to demontrate bigquery lists
table = table.append_column(
"tags",
pa.array(
[["disasters", "earthquakes", "floods", "tsunamis"]] * len(table),
pa.list_(pa.string()),
),
)
# we add a struct type column to demonstrate bigquery structs
table = table.append_column(
"meta",
pa.array(
[{"loaded_by": "dlt"}] * len(table),
pa.struct([("loaded_by", pa.string())]),
),
)
yield table

# dlt biquery custom destination
# we can use the dlt provided credentials class
Expand All @@ -69,34 +90,30 @@ def bigquery_insert(
items, table, credentials: GcpServiceAccountCredentials = dlt.secrets.value
) -> None:
client = bigquery.Client(
credentials.project_id,
credentials.to_native_credentials(),
location="US"
credentials.project_id, credentials.to_native_credentials(), location="US"
)
job_config = bigquery.LoadJobConfig(
autodetect=True, source_format=bigquery.SourceFormat.PARQUET,
schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
autodetect=True,
source_format=bigquery.SourceFormat.PARQUET,
schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
)
# since we have set the batch_size to 0, we get a filepath and can load the file directly
with open(items, "rb") as f:
load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config)
load_job.result() # Waits for the job to complete.

# we can add some tags to each data item
# to demonstrate lists in biqquery
meta_data = {"meta": {"tags": ["disasters", "earthquakes", "floods", "tsunamis"]}}
resource.add_map(lambda item: {**item, **meta_data})

if __name__ == "__main__":
# run the pipeline and print load results
pipeline = dlt.pipeline(
pipeline_name="csv_to_bigquery_insert",
destination=bigquery_insert,
dataset_name="mydata",
full_refresh=True
full_refresh=True,
)
load_info = pipeline.run(resource(url=OWID_DISASTERS_URL))

print(load_info)

assert_load_info(load_info)
```
<!--@@@DLT_SNIPPET_END code/custom_destination_bigquery-snippets.py::example-->

0 comments on commit 040ed32

Please sign in to comment.