From 61ba65d01b2f4a0af68c8f63a8af59958fec8499 Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Tue, 17 Sep 2024 16:01:19 +0200 Subject: [PATCH] Docs: Fix imports, cosmetics for core sources (#1833) --- .../verified-sources/rest_api/advanced.md | 3 + .../verified-sources/sql_database/advanced.md | 100 ++++++++++-------- .../sql_database/configuration.md | 80 ++++++++------ .../verified-sources/sql_database/usage.md | 20 ++-- docs/website/docs/tutorial/rest-api.md | 8 +- docs/website/docs/tutorial/sql-database.md | 12 +++ 6 files changed, 140 insertions(+), 83 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/advanced.md index fa663b9ca5..27d2cc0b6e 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/advanced.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/advanced.md @@ -51,6 +51,9 @@ In this example, the source will ignore responses with a status code of 404, res #### Example B ```py +from requests.models import Response +from dlt.common import json + def set_encoding(response, *args, **kwargs): # sets the encoding in case it's not correctly detected response.encoding = 'windows-1252' diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md index 7ff08f8095..4db09b521e 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md @@ -27,48 +27,61 @@ certain range. #### Examples -**1. Incremental loading with the resource `sql_table`** - Consider a table "family" with a timestamp column `last_modified` that indicates when a row was last modified. To ensure that only rows modified after midnight (00:00:00) on January 1, 2024, are loaded, you would set `last_modified` timestamp as the cursor as follows: - ```py - from sql_database import sql_table - from datetime import datetime - - # Example: Incrementally loading a table based on a timestamp column - table = sql_table( - table='family', - incremental=dlt.sources.incremental( - 'last_modified', # Cursor column name - initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0) # Initial cursor value - ) - ) - - info = pipeline.extract(table, write_disposition="merge") - print(info) - ``` - Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). - In subsequent runs, it is the latest value of `last_modified` that `dlt` stores in [state](https://dlthub.com/docs/general-usage/state). - -**2. Incremental loading with the source `sql_database`** - To achieve the same using the `sql_database` source, you would specify your cursor as follows: +1. **Incremental loading with the resource `sql_table`**. - ```py - source = sql_database().with_resources("family") - #using the "last_modified" field as an incremental field using initial value of midnight January 1, 2024 - source.family.apply_hints(incremental=dlt.sources.incremental("updated", initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0))) - #running the pipeline - info = pipeline.run(source, write_disposition="merge") - print(info) - ``` - - :::info - * When using "merge" write disposition, the source table needs a primary key, which `dlt` automatically sets up. - * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. - ::: + Consider a table "family" with a timestamp column `last_modified` that indicates when a row was last modified. To ensure that only rows modified after midnight (00:00:00) on January 1, 2024, are loaded, you would set `last_modified` timestamp as the cursor as follows: + + ```py + import dlt + from dlt.sources.sql_database import sql_table + from dlt.common.pendulum import pendulum + + # Example: Incrementally loading a table based on a timestamp column + table = sql_table( + table='family', + incremental=dlt.sources.incremental( + 'last_modified', # Cursor column name + initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0) # Initial cursor value + ) + ) + + pipeline = dlt.pipeline(destination="duckdb") + info = pipeline.extract(table, write_disposition="merge") + print(info) + ``` + + Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). + In subsequent runs, it is the latest value of `last_modified` that `dlt` stores in [state](https://dlthub.com/docs/general-usage/state). + +2. **Incremental loading with the source `sql_database`**. + + To achieve the same using the `sql_database` source, you would specify your cursor as follows: + + ```py + import dlt + from dlt.sources.sql_database import sql_database + + source = sql_database().with_resources("family") + #using the "last_modified" field as an incremental field using initial value of midnight January 1, 2024 + source.family.apply_hints(incremental=dlt.sources.incremental("updated", initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0))) + + #running the pipeline + pipeline = dlt.pipeline(destination="duckdb") + info = pipeline.run(source, write_disposition="merge") + print(info) + ``` + + :::info + * When using "merge" write disposition, the source table needs a primary key, which `dlt` automatically sets up. + * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. + ::: ## Parallelized extraction You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. To enable this, declare your sources/resources as follows: ```py +from dlt.sources.sql_database import sql_database, sql_table + database = sql_database().parallelize() table = sql_table().parallelize() ``` @@ -83,7 +96,7 @@ The `reflection_level` argument controls how much information is reflected: - `reflection_level = "full"`: Column names, nullability, and data types are detected. For decimal types we always add precision and scale. **This is the default.** - `reflection_level = "full_with_precision"`: Column names, nullability, data types, and precision/scale are detected, also for types like text and binary. Integer sizes are set to bigint and to int for all other types. -If the SQL type is unknown or not supported by `dlt`, then, in the pyarrow backend, the column will be skipped, whereas in the other backends the type will be inferred directly from the data irrespective of the `reflection_level` specified. In the latter case, this often means that some types are coerced to strings and `dataclass` based values from sqlalchemy are inferred as `json` (JSON in most destinations). +If the SQL type is unknown or not supported by `dlt`, then, in the pyarrow backend, the column will be skipped, whereas in the other backends the type will be inferred directly from the data irrespective of the `reflection_level` specified. In the latter case, this often means that some types are coerced to strings and `dataclass` based values from sqlalchemy are inferred as `json` (JSON in most destinations). :::tip If you use reflection level **full** / **full_with_precision** you may encounter a situation where the data returned by sqlalchemy or pyarrow backend does not match the reflected data types. Most common symptoms are: 1. The destination complains that it cannot cast one type to another for a certain column. For example `connector-x` returns TIME in nanoseconds @@ -104,8 +117,9 @@ In the following example, when loading timestamps from Snowflake, you ensure tha ```py import dlt -from snowflake.sqlalchemy import TIMESTAMP_NTZ import sqlalchemy as sa +from dlt.sources.sql_database import sql_database, sql_table +from snowflake.sqlalchemy import TIMESTAMP_NTZ def type_adapter_callback(sql_type): if isinstance(sql_type, TIMESTAMP_NTZ): # Snowflake does not inherit from sa.DateTime @@ -142,9 +156,9 @@ The examples below show how you can set arguments in any of the `.toml` files (` [sources.sql_database.chat_message.incremental] cursor_path="updated_at" ``` - This is especially useful with `sql_table()` in a situation where you may want to run this resource for multiple tables. Setting parameters like this would then give you a clean way of maintaing separate configurations for each table. + This is especially useful with `sql_table()` in a situation where you may want to run this resource for multiple tables. Setting parameters like this would then give you a clean way of maintaing separate configurations for each table. -3. Handling separate configurations for database and individual tables +3. Handling separate configurations for database and individual tables When using the `sql_database()` source, you can separately configure the parameters for the database and for the individual tables. ```toml [sources.sql_database] @@ -155,7 +169,7 @@ The examples below show how you can set arguments in any of the `.toml` files (` [sources.sql_database.chat_message.incremental] cursor_path="updated_at" - ``` + ``` The resulting source created below will extract data using **pandas** backend with **chunk_size** 1000. The table **chat_message** will load data incrementally using **updated_at** column. All the other tables will not use incremental loading, and will instead load the full data. @@ -163,8 +177,8 @@ The examples below show how you can set arguments in any of the `.toml` files (` database = sql_database() ``` -You'll be able to configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources). - +You'll be able to configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources). + It is also possible to set these arguments as environment variables [using the proper naming convention](https://dlthub.com/docs/general-usage/credentials/config_providers#toml-vs-environment-variables): ```sh SOURCES__SQL_DATABASE__CREDENTIALS="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md index 88ea268378..186bb1cc18 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md @@ -12,7 +12,7 @@ import Header from '../_source-info-header.md'; ## Configuring the SQL Database source -`dlt` sources are python scripts made up of source and resource functions that can be easily customized. The SQL Database verified source has the following built-in source and resource: +`dlt` sources are python scripts made up of source and resource functions that can be easily customized. The SQL Database verified source has the following built-in source and resource: 1. `sql_database`: a `dlt` source which can be used to load multiple tables and views from a SQL database 2. `sql_table`: a `dlt` resource that loads a single table from the SQL database @@ -20,16 +20,18 @@ Read more about sources and resources here: [General usage: source](../../../gen ### Example usage: -1. **Load all the tables from a database** - Calling `sql_database()` loads all tables from the database. +1. **Load all the tables from a database** + Calling `sql_database()` loads all tables from the database. ```py - def load_entire_database() -> None: + import dlt + from dlt.sources.sql_database import sql_database + def load_entire_database() -> None: # Define the pipeline pipeline = dlt.pipeline( - pipeline_name="rfam", - destination='synapse', + pipeline_name="rfam", + destination='synapse', dataset_name="rfam_data" ) @@ -41,22 +43,24 @@ Read more about sources and resources here: [General usage: source](../../../gen # Print load info print(info) - ``` + ``` -2. **Load select tables from a database** - Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. +2. **Load select tables from a database** + Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. ```py - def load_select_tables_from_database() -> None: + import dlt + from dlt.sources.sql_database import sql_database + def load_select_tables_from_database() -> None: # Define the pipeline pipeline = dlt.pipeline( - pipeline_name="rfam", - destination="postgres", + pipeline_name="rfam", + destination="postgres", dataset_name="rfam_data" ) - # Fetch tables "family" and "clan" + # Fetch tables "family" and "clan" source = sql_database().with_resources("family", "clan") # Run the pipeline @@ -65,22 +69,24 @@ Read more about sources and resources here: [General usage: source](../../../gen # Print load info print(info) - ``` + ``` -3. **Load a standalone table** +3. **Load a standalone table** Calling `sql_table(table="family")` fetches only the table `"family"` ```py - def load_select_tables_from_database() -> None: + import dlt + from dlt.sources.sql_database import sql_table + def load_select_tables_from_database() -> None: # Define the pipeline pipeline = dlt.pipeline( - pipeline_name="rfam", - destination="duckdb", + pipeline_name="rfam", + destination="duckdb", dataset_name="rfam_data" ) - # Fetch the table "family" + # Fetch the table "family" table = sql_table(table="family") # Run the pipeline @@ -92,8 +98,8 @@ Read more about sources and resources here: [General usage: source](../../../gen ``` :::tip -We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs. -::: +We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs. +::: ## Configuring the connection @@ -106,12 +112,12 @@ We intend our sources to be fully hackable. Feel free to change the source code "dialect+database_type://username:password@server:port/database_name" ``` -For example, to connect to a MySQL database using the `pymysql` dialect you can use the following connection string: +For example, to connect to a MySQL database using the `pymysql` dialect you can use the following connection string: ```py "mysql+pymysql://rfamro:PWD@mysql-rfam-public.ebi.ac.uk:4497/Rfam" ``` -Database-specific drivers can be passed into the connection string using query parameters. For example, to connect to Microsoft SQL Server using the ODBC Driver, you would need to pass the driver as a query parameter as follows: +Database-specific drivers can be passed into the connection string using query parameters. For example, to connect to Microsoft SQL Server using the ODBC Driver, you would need to pass the driver as a query parameter as follows: ```py "mssql+pyodbc://username:password@server/database?driver=ODBC+Driver+17+for+SQL+Server" @@ -124,30 +130,32 @@ There are several options for adding your connection credentials into your `dlt` #### 1. Setting them in `secrets.toml` or as environment variables (Recommended) -You can set up credentials using [any method](https://dlthub.com/docs/devel/general-usage/credentials/setup#available-config-providers) supported by `dlt`. We recommend using `.dlt/secrets.toml` or the environment variables. See Step 2 of the [setup](./setup) for how to set credentials inside `secrets.toml`. For more information on passing credentials read [here](https://dlthub.com/docs/devel/general-usage/credentials/setup). +You can set up credentials using [any method](https://dlthub.com/docs/devel/general-usage/credentials/setup#available-config-providers) supported by `dlt`. We recommend using `.dlt/secrets.toml` or the environment variables. See Step 2 of the [setup](./setup) for how to set credentials inside `secrets.toml`. For more information on passing credentials read [here](https://dlthub.com/docs/devel/general-usage/credentials/setup). -#### 2. Passing them directly in the script +#### 2. Passing them directly in the script It is also possible to explicitly pass credentials inside the source. Example: + ```py from dlt.sources.credentials import ConnectionStringCredentials -from sql_database import sql_table +from dlt.sources.sql_database import sql_database credentials = ConnectionStringCredentials( "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" ) -source = sql_table(credentials).with_resource("family") +source = sql_database(credentials).with_resource("family") ``` -:::note -It is recommended to configure credentials in `.dlt/secrets.toml` and to not include any sensitive information in the pipeline code. +:::note +It is recommended to configure credentials in `.dlt/secrets.toml` and to not include any sensitive information in the pipeline code. ::: ### Other connection options -#### Using SqlAlchemy Engine as credentials +#### Using SqlAlchemy Engine as credentials You are able to pass an instance of SqlAlchemy Engine instead of credentials: ```py +from dlt.sources.sql_database import sql_table from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") @@ -175,7 +183,10 @@ reflects the database table and preserves original types (i.e. **decimal** / **n Note that if `pandas` is installed, we'll use it to convert `SQLAlchemy` tuples into `ndarray` as it seems to be 20-30% faster than using `numpy` directly. ```py +import dlt import sqlalchemy as sa +from dlt.sources.sql_database import sql_database + pipeline = dlt.pipeline( pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_arrow" ) @@ -210,10 +221,13 @@ With the default settings, several data types will be coerced to dtypes in the y not to use the** `pandas` **backend if your source tables contain date, time, or decimal columns** ::: -Internally dlt uses `pandas.io.sql._wrap_result` to generate `pandas` frames. To adjust [pandas-specific settings,](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) pass it in the `backend_kwargs` parameter. For example, below we set `coerce_float` to `False`: +Internally dlt uses `pandas.io.sql._wrap_result` to generate `pandas` frames. To adjust [pandas-specific settings,](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) pass it in the `backend_kwargs` parameter. For example, below we set `coerce_float` to `False`: ```py +import dlt import sqlalchemy as sa +from dlt.sources.sql_database import sql_database + pipeline = dlt.pipeline( pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_pandas_2" ) @@ -249,7 +263,7 @@ There are certain limitations when using this backend: * JSON fields (at least those coming from postgres) are double wrapped in strings. To unwrap this, you can pass the in-built transformation function `unwrap_json_connector_x` (for example, with `add_map`): ```py - from sources.sql_database.helpers import unwrap_json_connector_x + from dlt.sources.sql_database.helpers import unwrap_json_connector_x ``` :::note @@ -259,7 +273,9 @@ There are certain limitations when using this backend: ```py """This example is taken from the benchmarking tests for ConnectorX performed on the UNSW_Flow dataset (~2mln rows, 25+ columns). Full code here: https://github.com/dlt-hub/sql_database_benchmarking""" import os +import dlt from dlt.destinations import filesystem +from dlt.sources.sql_database import sql_table unsw_table = sql_table( "postgresql://loader:loader@localhost:5432/dlt_data", diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/usage.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/usage.md index ee70e92ea0..392fb64da6 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/usage.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/usage.md @@ -12,11 +12,13 @@ import Header from '../_source-info-header.md'; ## Applying column-wise filtering on the data being ingested -By default, the existing source and resource functions, `sql_database` and `sql_table`, ingest all of the records from the source table. But by using `query_adapter_callback`, it is possible to pass a `WHERE` clause inside the underlying `SELECT` statement using the [SQLAlchemy syntax](https://docs.sqlalchemy.org/en/14/core/selectable.html#). Thich enables filtering the data based on specific columns before extract. +By default, the existing source and resource functions, `sql_database` and `sql_table`, ingest all of the records from the source table. But by using `query_adapter_callback`, it is possible to pass a `WHERE` clause inside the underlying `SELECT` statement using the [SQLAlchemy syntax](https://docs.sqlalchemy.org/en/14/core/selectable.html#). Thich enables filtering the data based on specific columns before extract. The example below uses `query_adapter_callback` to filter on the column `customer_id` for the table `orders`: ```py +from dlt.sources.sql_database import sql_database + def query_adapter_callback(query, table): if table.name == "orders": # Only select rows where the column customer_id has value 1 @@ -30,19 +32,21 @@ source = sql_database( ``` ## Transforming the data before load -You have direct access to the extracted data through the resource objects (`sql_table()` or `sql_database().with_resource())`), each of which represents a single SQL table. These objects are generators that yield -individual rows of the table which can be modified by using custom python functions. These functions can be applied to the resource using `add_map`. +You have direct access to the extracted data through the resource objects (`sql_table()` or `sql_database().with_resource())`), each of which represents a single SQL table. These objects are generators that yield +individual rows of the table which can be modified by using custom python functions. These functions can be applied to the resource using `add_map`. :::note The PyArrow backend does not yield individual rows rather loads chunks of data as `ndarray`. In this case, the transformation function that goes into `add_map` should be configured to expect an `ndarray` input. ::: - + Examples: 1. Pseudonymizing data to hide personally identifiable information (PII) before loading it to the destination. (See [here](https://dlthub.com/docs/general-usage/customising-pipelines/pseudonymizing_columns) for more information on pseudonymizing data with `dlt`) ```py + import dlt import hashlib + from dlt.sources.sql_database import sql_database def pseudonymize_name(doc): ''' @@ -65,7 +69,7 @@ Examples: # using sql_database source to load family table and pseudonymize the column "rfam_acc" source = sql_database().with_resources("family") # modify this source instance's resource - source = source.family.add_map(pseudonymize_name) + source.family.add_map(pseudonymize_name) # Run the pipeline. For a large db this may take a while info = pipeline.run(source, write_disposition="replace") print(info) @@ -74,6 +78,9 @@ Examples: 2. Excluding unnecessary columns before load ```py + import dlt + from dlt.sources.sql_database import sql_database + def remove_columns(doc): del doc["rfam_id"] return doc @@ -84,7 +91,7 @@ Examples: # using sql_database source to load family table and remove the column "rfam_id" source = sql_database().with_resources("family") # modify this source instance's resource - source = source.family.add_map(remove_columns) + source.family.add_map(remove_columns) # Run the pipeline. For a large db this may take a while info = pipeline.run(source, write_disposition="replace") print(info) @@ -99,4 +106,3 @@ When running on Airflow: 1. Use the `dlt` [Airflow Helper](../../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from the `sql_database` source. (If you want to run table extraction in parallel, then you can do this by setting `decompose = "parallel-isolated"` when doing the source->DAG conversion. See [here](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer#2-modify-dag-file) for code example.) 2. Reflect tables at runtime with `defer_table_reflect` argument. 3. Set `allow_external_schedulers` to load data using [Airflow intervals](../../../general-usage/incremental-loading.md#using-airflow-schedule-for-backfill-and-incremental-loading). - diff --git a/docs/website/docs/tutorial/rest-api.md b/docs/website/docs/tutorial/rest-api.md index 3e214e0b55..0ae50695b4 100644 --- a/docs/website/docs/tutorial/rest-api.md +++ b/docs/website/docs/tutorial/rest-api.md @@ -109,6 +109,9 @@ You can explore the loaded data, run queries and see some pipeline execution det Now that your environment and the project are set up, let's take a closer look at the configuration of the REST API source. Open the `rest_api_pipeline.py` file in your code editor and locate the following code snippet: ```py +import dlt +from dlt.sources.rest_api import rest_api_source + def load_pokemon() -> None: pipeline = dlt.pipeline( pipeline_name="rest_api_pokemon", @@ -263,6 +266,9 @@ When working with some APIs, you may need to load data incrementally to avoid fe To illustrate incremental loading, let's consider the GitHub API. In the `rest_api_pipeline.py` file, you can find an example of how to load data from the GitHub API incrementally. Let's take a look at the configuration: ```py +import dlt +from dlt.sources.rest_api import rest_api_source + pipeline = dlt.pipeline( pipeline_name="rest_api_github", destination="duckdb", @@ -302,7 +308,7 @@ github_source = rest_api_source({ ], }) -load_info = pipeline.run(github_source()) +load_info = pipeline.run(github_source) print(load_info) ``` diff --git a/docs/website/docs/tutorial/sql-database.md b/docs/website/docs/tutorial/sql-database.md index 1a7702b637..64444fac32 100644 --- a/docs/website/docs/tutorial/sql-database.md +++ b/docs/website/docs/tutorial/sql-database.md @@ -64,6 +64,9 @@ Running the script as it is will execute the function `load_standalone_table_res The following function will load the tables `family` and `genome`. ```py +import dlt +from dlt.sources.sql_database import sql_database + def load_tables_family_and_genome(): # create a dlt source that will load tables "family" and "genome" @@ -172,6 +175,9 @@ all the tables have the data duplicated. This happens as dlt, by default, append To prevent the data from being duplicated in each row, set `write_disposition` to `replace`: ```py +import dlt +from dlt.sources.sql_database import sql_database + def load_tables_family_and_genome(): source = sql_database().with_resources("family", "genome") @@ -200,6 +206,9 @@ When you want to update the existing data as new data is loaded, you can use the In the previous example, we set `write_disposition="replace"` inside `pipeline.run()` which caused all the tables to be loaded with `replace`. However, it's also possible to define the `write_disposition` strategy separately for each tables using the `apply_hints` method. In the example below, we use `apply_hints` on each table to specify different primary keys for merge: ```py +import dlt +from dlt.sources.sql_database import sql_database + def load_tables_family_and_genome(): source = sql_database().with_resources("family", "genome") @@ -229,6 +238,9 @@ Often you don't want to load the whole data in each load, but rather only the ne In the example below, we configure the table `"family"` to load incrementally based on the column `"updated"`: ```py +import dlt +from dlt.sources.sql_database import sql_database + def load_tables_family_and_genome(): source = sql_database().with_resources("family", "genome")