diff --git a/docs/website/docs/examples/custom_destination_bigquery/code/.dlt/example.secrets.toml b/docs/website/docs/examples/custom_destination_bigquery/code/.dlt/example.secrets.toml index b1ecf70395..71f41f9878 100644 --- a/docs/website/docs/examples/custom_destination_bigquery/code/.dlt/example.secrets.toml +++ b/docs/website/docs/examples/custom_destination_bigquery/code/.dlt/example.secrets.toml @@ -1,5 +1,4 @@ # @@@DLT_SNIPPET_START example -# you can just paste services.json as credentials [destination.bigquery.credentials] client_email = "" private_key = "" diff --git a/docs/website/docs/examples/custom_destination_bigquery/code/custom_destination_bigquery-snippets.py b/docs/website/docs/examples/custom_destination_bigquery/code/custom_destination_bigquery-snippets.py index f17948772c..867697d514 100644 --- a/docs/website/docs/examples/custom_destination_bigquery/code/custom_destination_bigquery-snippets.py +++ b/docs/website/docs/examples/custom_destination_bigquery/code/custom_destination_bigquery-snippets.py @@ -1,11 +1,13 @@ from tests.utils import skipifgithubfork +from tests.pipeline.utils import assert_load_info @skipifgithubfork -def custom_destination_biquery_snippets() -> None: +def custom_destination_biquery_snippet() -> None: # @@@DLT_SNIPPET_START example import dlt import pandas as pd + import pyarrow as pa from google.cloud import bigquery from dlt.common.configuration.specs import GcpServiceAccountCredentials @@ -23,8 +25,25 @@ def custom_destination_biquery_snippets() -> None: # dlt sources @dlt.resource(name="natural_disasters") def resource(url: str): - df = pd.read_csv(OWID_DISASTERS_URL) - yield df.to_dict(orient="records") + # load pyarrow table with pandas + table = pa.Table.from_pandas(pd.read_csv(url)) + # we add a list type column to demontrate bigquery lists + table = table.append_column( + "tags", + pa.array( + [["disasters", "earthquakes", "floods", "tsunamis"]] * len(table), + pa.list_(pa.string()), + ), + ) + # we add a struct type column to demonstrate bigquery structs + table = table.append_column( + "meta", + pa.array( + [{"loaded_by": "dlt"}] * len(table), + pa.struct([("loaded_by", pa.string())]), + ), + ) + yield table # dlt biquery custom destination # we can use the dlt provided credentials class @@ -46,11 +65,7 @@ def bigquery_insert( load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config) load_job.result() # Waits for the job to complete. - # we can add some tags to each data item - # to demonstrate lists in biqquery - meta_data = {"meta": {"tags": ["disasters", "earthquakes", "floods", "tsunamis"]}} - resource.add_map(lambda item: {**item, **meta_data}) - + __name__ = "__main__" # @@@DLT_REMOVE if __name__ == "__main__": # run the pipeline and print load results pipeline = dlt.pipeline( @@ -62,3 +77,6 @@ def bigquery_insert( load_info = pipeline.run(resource(url=OWID_DISASTERS_URL)) print(load_info) + + assert_load_info(load_info) + # @@@DLT_SNIPPET_END example diff --git a/docs/website/docs/examples/custom_destination_bigquery/index.md b/docs/website/docs/examples/custom_destination_bigquery/index.md index b70648f6ff..0531da23b1 100644 --- a/docs/website/docs/examples/custom_destination_bigquery/index.md +++ b/docs/website/docs/examples/custom_destination_bigquery/index.md @@ -1,5 +1,5 @@ --- -title: Custom destination BigQuery Example +title: Custom destination with BigQuery description: Learn how use the custom destination to load to bigquery and use credentials keywords: [destination, credentials, example, bigquery, custom destination] --- @@ -18,11 +18,14 @@ In this example, you'll find a Python script that demonstrates how to load Googl We'll learn how to: - use [built-in credentials](../../general-usage/credentials/config_specs#gcp-credentials) -- use the [custom destination](../../dlt-ecosystem/destinations/destination.md). +- use the [custom destination](../../dlt-ecosystem/destinations/destination.md) +- Use pyarrow tables to create complex column types on bigquery +- Use bigquery autodetect=True for schema inference from parquet files ### Your bigquery credentials in secrets.toml ```toml +# you can just paste services.json as credentials [destination.bigquery.credentials] client_email = "" private_key = "" @@ -41,6 +44,7 @@ client_secret = "" ```py import dlt import pandas as pd +import pyarrow as pa from google.cloud import bigquery from dlt.common.configuration.specs import GcpServiceAccountCredentials @@ -58,8 +62,25 @@ BIGQUERY_TABLE_ID = "chat-analytics-rasa-ci.ci_streaming_insert.natural-disaster # dlt sources @dlt.resource(name="natural_disasters") def resource(url: str): - df = pd.read_csv(OWID_DISASTERS_URL) - yield df.to_dict(orient="records") + # load pyarrow table with pandas + table = pa.Table.from_pandas(pd.read_csv(url)) + # we add a list type column to demontrate bigquery lists + table = table.append_column( + "tags", + pa.array( + [["disasters", "earthquakes", "floods", "tsunamis"]] * len(table), + pa.list_(pa.string()), + ), + ) + # we add a struct type column to demonstrate bigquery structs + table = table.append_column( + "meta", + pa.array( + [{"loaded_by": "dlt"}] * len(table), + pa.struct([("loaded_by", pa.string())]), + ), + ) + yield table # dlt biquery custom destination # we can use the dlt provided credentials class @@ -69,34 +90,30 @@ def bigquery_insert( items, table, credentials: GcpServiceAccountCredentials = dlt.secrets.value ) -> None: client = bigquery.Client( - credentials.project_id, - credentials.to_native_credentials(), - location="US" + credentials.project_id, credentials.to_native_credentials(), location="US" ) job_config = bigquery.LoadJobConfig( - autodetect=True, source_format=bigquery.SourceFormat.PARQUET, - schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION + autodetect=True, + source_format=bigquery.SourceFormat.PARQUET, + schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, ) # since we have set the batch_size to 0, we get a filepath and can load the file directly with open(items, "rb") as f: load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config) load_job.result() # Waits for the job to complete. -# we can add some tags to each data item -# to demonstrate lists in biqquery -meta_data = {"meta": {"tags": ["disasters", "earthquakes", "floods", "tsunamis"]}} -resource.add_map(lambda item: {**item, **meta_data}) - if __name__ == "__main__": # run the pipeline and print load results pipeline = dlt.pipeline( pipeline_name="csv_to_bigquery_insert", destination=bigquery_insert, dataset_name="mydata", - full_refresh=True + full_refresh=True, ) load_info = pipeline.run(resource(url=OWID_DISASTERS_URL)) print(load_info) + + assert_load_info(load_info) ```