From cdbb05250c1c236800345d5f8d2bea0bf4a736cc Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Mon, 11 Sep 2023 07:09:25 +0000 Subject: [PATCH 1/3] Update SQL Database docs. --- .../verified-sources/sql_database.md | 428 +++++++++++++----- 1 file changed, 311 insertions(+), 117 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index c49b379fcd..351d74e518 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -1,179 +1,373 @@ # SQL Database -:::info -Need help deploying these sources, or figuring out how to run them in your data stack? +:::info Need help deploying these sources, or figuring out how to run them in your data stack? -[Join our slack community](https://dlthub-community.slack.com/join/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) or [book a call](https://calendar.app.google/kiLhuMsWKpZUpfho6) with our support engineer Adrian. +[Join our Slack community](https://dlthub-community.slack.com/join/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) +or [book a call](https://calendar.app.google/kiLhuMsWKpZUpfho6) with our support engineer Adrian. ::: +SQL databases are management systems (DBMS) that store data in a structured format, commonly used +for efficient and reliable data retrieval. -SQL databases, or Structured Query Language databases, are a type of database management system (DBMS) that stores and manages data in a structured format. SQL databases are widely used for storing and retrieving structured data efficiently and reliably. +This SQL database `dlt` verified source and +[pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py) +loads data using SqlAlchemy to the destination of your choice. -The SQL Database `dlt` verified source loads data from your SQL Database to a destination of your choosing. It offers flexibility in terms of loading either the entire database or specific tables to the destination. You have the option to perform a single load using the "replace" mode, or load data incrementally using the "merge" or "append" modes. +Sources and resources that can be loaded using this verified source are: -Internally we use `SqlAlchemy` to query the data. You may need to pip install the right dialect for your database. `dlt` will inform you on the missing dialect on the first run of the pipeline. +| Name | Description | +| ------------ | ----------------------------------------- | +| sql_database | Retrieves data from an SQL database | +| sql_table | Retrieves data from an SQL database table | -`dlt` understands the SqlAlchemy connection strings. Special options that are passed via query strings are supported. We give a few examples below. +## Setup Guide -## Initialize the SQL Database verified source and the pipeline example +### Grab credentials -To get started with this verified source, follow these steps: +This verified source utilizes SQLAlchemy for database connectivity. Let us consider this public +database example: -1. Open up your terminal or command prompt and navigate to the directory where you'd like to create your project. -2. Enter the following command: +`connection_url = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` -```properties -dlt init sql_database duckdb +> This public database doesn't require a password. + +Connection URL can be broken down into: + +```python +connection_url = "connection_string = f"{drivername}://{username}:{password}@{host}:{port}/{database}" ``` -This command will initialize your verified source with SQL Database and creates pipeline example with duckdb as the destination. If you'd like to use a different destination, simply replace **duckdb** with the name of your preferred destination. You can find supported destinations and their configuration options in our [documentation](../destinations/) +`drivername`: Indicates both the database system and driver used. -3. After running this command, a new directory will be created with the necessary files and configuration settings to get started. +- E.g., "mysql+pymysql" uses MySQL with the pymysql driver. Alternatives might include mysqldb and + mysqlclient. -``` -sql_database -├── .dlt -│ ├── config.toml -│ └── secrets.toml -├── sql_database -│ └── __init__.py -│ └── helpers.py -│ └── settings.py -├── .gitignore -├── requirements.txt -└── sql_database_pipeline.py -``` +`username`: Used for database authentication. -## Add credentials +- E.g., "rfamro" as a possible read-only user. -1. Inside the **`.dlt`** folder, you'll find a file called **`secrets.toml`**, which is where you can securely store credentials and other sensitive information. It's important to handle this file with care and keep it safe. -2. To proceed with this demo, we will establish credentials using the provided connection URL given below. The connection URL is associated with a public database and is: `mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam` +`password`: The password for the given username. -Here's what the `secrets.toml` looks like +`host`: The server's address or domain where the database is hosted. -```toml -# make sure this is at the top of toml file, not in any section like [sources] -sources.sql_database.credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" -``` +- E.g., A public database at "mysql-rfam-public.ebi.ac.uk" hosted by EBI. -You can set up the credentials by passing host, password etc. separately -```toml -[sources.sql_database.credentials] -drivername = "mysql+pymysql" # driver name for the database -database = "Rfam" # database name -username = "rfamro" # username associated with the database -host = "mysql-rfam-public.ebi.ac.uk" # host address -port = "4497" # port required for connection -``` +`port`: The port for the database connection. E.g., "4497", in the above connection URL. -4. Finally, follow the instructions in **[Destinations](../destinations/)** to add credentials for your chosen destination. This will ensure that your data is properly routed to its final destination. +`database`: The specific database on the server. -## Run the pipeline example +- E.g., Connecting to the "Rfam" database. -1. Install the necessary dependencies by running the following command: +### Provide special options in connection string -```properties -pip install -r requirements.txt -``` +Here we use `mysql` and `pymysql` dialect to set up SSL connection to a server. All information +taken from the +[SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections) -2. Now the verified source can be run by using the command: -```properties -python3 sql_database_pipeline.py -``` +1. To force SSL on the client without a client certificate you may pass the following DSN: -3. To make sure that everything is loaded as expected, use the command: + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=" + ``` -```properties -dlt pipeline show -``` +1. You can also pass server public certificate as a file. For servers with a public certificate + (potentially bundled with your pipeline) and disabling host name checks: -(For example, the pipeline_name for the above pipeline example is `rfam`, you may also use any custom name instead) + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false" + ``` -## Customizations -This source let's you build your own pipeline where you pick the whole database or selected tables to be synchronized with the destination. The source and resource functions allow to set up write disposition (append or replace data) or incremental/merge loads. +1. For servers requiring a client certificate, provide the client's private key (a secret value). In + Airflow, this is usually saved as a variable and exported to a file before use. Server cert is + omitted in the example below: -The demo script that you get with `dlt init` contains several well commented examples that we explain further below. + ```toml + sources.sql_database.credentials="mysql+pymysql://root:8P5gyDPNo9zo582rQG6a@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem" + ``` -### Include and configure the tables to be synchronized +### Initialize the verified source -**Source sql_database**: -```python -from sql_database import sql_database -``` +To get started with your data pipeline, follow these steps: -This `sql_database` source uses SQLAlchemy to reflect the whole source database and create dlt resource for each table. You can also provide and explicit list of tables to be reflected. +1. Enter the following command: + ```bash + dlt init sql_database duckdb + ``` -**Resource sql_table:** + [This command](../../reference/command-line-interface) will initialize + [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py) + with SQL database as the [source](../../general-usage/source) and + [duckdb](../destinations/duckdb.md) as the [destination](../destinations). -```python -from sql_database import sql_table -``` -`sql_table` resource will reflect a single table with provided name. +1. If you'd like to use a different destination, simply replace `duckdb` with the name of your + preferred [destination](../destinations). -### Usage examples +1. After running this command, a new directory will be created with the necessary files and + configuration settings to get started. -1. Declare the pipeline by specifying the pipeline name, destination, and dataset. To read more about pipeline configuration, please refer to our documentation [here](https://dlthub.com/docs/general-usage/pipeline). +For more information, read the +[Walkthrough: Add a verified source.](../../walkthroughs/add-a-verified-source) -```python -pipeline = dlt.pipeline( - pipeline_name="rfam", destination='duckdb', dataset_name="rfam" -) -``` +### Add credentials + +1. In the `.dlt` folder, there's a file called `secrets.toml`. It's where you store sensitive + information securely, like access tokens. Keep this file safe. + + Here's what the `secrets.toml` looks like + + You can pass the credentials + + ```toml + [sources.sql_database.credentials] + drivername = "please set me up!" # driver name for the database + database = "please set me up!" # database name + username = "please set me up!" # username associated with the database + host = "please set me up!" # host address + port = "please set me up!" # port required for connection + ``` + +1. Alternatively, you can also provide credentials in "secrets.toml" as: + + ```toml + sources.sql_database.credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" + ``` + +1. Alternatively, you can also pass credentials in the pipeline script like this: -2. To load the entire database, you can use the sql_database source function as follows: + ```python + credentials = ConnectionStringCredentials( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" + ) + ``` + + > See + > [pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py) + > for details. + +1. Finally, follow the instructions in [Destinations](../destinations/) to add credentials for your + chosen destination. This will ensure that your data is properly routed to its final destination. + +## Run the pipeline example + +1. Install the necessary dependencies by running the following command: + + ```bash + pip install -r requirements.txt + ``` + +1. Now the verified source can be run by using the command: + + ```bash + python3 sql_database_pipeline.py + ``` + +1. To make sure that everything is loaded as expected, use the command: + + ```bash + dlt pipeline show + ``` + + For example, the pipeline_name for the above pipeline example is `rfam`, you may also use any + custom name instead) + +## Sources and resources + +`dlt` works on the principle of [sources](../../general-usage/source) and +[resources](../../general-usage/resource). + +### Source `sql_database`: + +This function loads data from an SQL database via SQLAlchemy and auto-creates resources for each +table or from a specified list. ```python -load_source = sql_database() -load_info = pipeline.run(load_source, write_disposition="replace") -print(info) +@dlt.source +def sql_database( + credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, + schema: Optional[str] = dlt.config.value, + metadata: Optional[MetaData] = None, + table_names: Optional[List[str]] = dlt.config.value, +) -> Iterable[DltResource]: ``` -3. To load data from the "family" table in incremental mode, utilizing the "updated" field, you can employ the `sql_table` resource. In the provided code, the last value of the "updated" field is stored in the DLT state, initially set to January 1, 2022, at midnight (00:00:00). During subsequent runs, only the new data created after the last recorded "updated" value will be loaded. This ensures efficient and targeted data retrieval, optimizing the processing of your pipeline. +`credentials`: Database details or an 'sqlalchemy.Engine' instance. + +`schema`: Database schema name (default if unspecified). + +`metadata`: Optional, sqlalchemy.MetaData takes precedence over schema. + +`table_names`: List of tables to load. Defaults to all if not provided. + +### Resource `sql_table` + +This function loads data from specific database tables. ```python -family = sql_table( - table="family", - incremental=dlt.sources.incremental( - "updated", initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0) - ), +@dlt.common.configuration.with_config( + sections=("sources", "sql_database"), spec=SqlTableResourceConfiguration ) -#running the pipeline -load_info = pipeline.extract(family, write_disposition="merge") -print(info) +def sql_table( + credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, + table: str = dlt.config.value, + schema: Optional[str] = dlt.config.value, + metadata: Optional[MetaData] = None, + incremental: Optional[dlt.sources.incremental[Any]] = None, +) -> DltResource: ``` -In the provided code, the "_merge_" mode write deposition is utilized. This mode ensures that only unique rows are added to the destination. Had the "_append_" mode been used instead, all rows, regardless of their uniqueness, would have been added to the destination after the last "updated" field. By employing the merge mode, data integrity is maintained, and only distinct records are included. +`credentials`: Database info or an Engine instance. -> 💡 Please note that to use merge write disposition a primary key must exist in the source table. `dlt` finds and sets up primary keys automatically. +`table`: Table to load, set in code or default from "config.toml". +`schema`: Optional, name of table schema. -4. You can also load data from the “family” table in incremental mode, using the "sql_database" source method: +`metadata`: Optional, sqlalchemy.MetaData takes precedence over schema. -```python -load_data = sql_database().with_resources("family") -#using the "updated" field as an incremental field using initial value of January 1, 2022, at midnight -load_data.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0)) +`incremental`: Optional, enables incremental loading. -#running the pipeline -load_info = pipeline.run(load_data, write_disposition="merge") -print(load_info) -``` +`write_disposition`: Can be "merge", "replace", or "append". -In the example above first we reflect the whole source database and then select only the **family** table to be loaded (`with_resources()`). Then we use `apply_hints` method to set up incremental load on **updated** column. +### Create your own pipeline -> 💡 `apply_hints` is a powerful method that allows to modify the schema of the resource after it was created: including the write disposition and primary keys. You are free to select many different tables and use `apply_hints` several times to have pipelines where some resources are merged, appended or replaced. +If you wish to create your own pipelines, you can leverage source and resource methods from this +verified source. -5. It's important to keep the pipeline name and destination dataset name unchanged. The pipeline name is crucial for retrieving the [state](https://dlthub.com/docs/general-usage/state) of the last pipeline run, which includes the end date needed for loading data incrementally. Modifying these names can lead to [“full_refresh”](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh) which will disrupt the tracking of relevant metadata(state) for [incremental data loading](https://dlthub.com/docs/general-usage/incremental-loading). +1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows: -### Provide special options in connection string -Here we use `mysql` and `pymysql` dialect to set up ssl connection to a server. All information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections) + ```python + pipeline = dlt.pipeline( + pipeline_name="rfam", # Use a custom name if desired + destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post) + dataset_name="rfam_data" # Use a custom name if desired + ) + ``` -1. If your server accepts clients without client certificate but exclusively over ssl you may try to force ssl on the client by passing the following DSN to `dlt`: `mysql+pymysql://root:@:3306/mysql?ssl_ca=` -2. You can also pass server public certificate as a file. Possibly bundled with your pipeline as this is not any secret. Below we also disable the host name check on the client: most certificates are self-issued and host name typically does not match: `mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false` -3. If your server requires client certificate, you must pass the private key of the client which is a secret value. In Airflow we typically paste it into a variable and then dump it to file before use. We do not provide server cert below. -```toml -sources.sql_database.credentials="'mysql+pymysql://root:8P5gyDPNo9zo582rQG6a@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem') -``` +1. You can pass credentials using any of the methods discussed above. + +1. To load the entire database, use the `sql_database` source as: + + ```python + source = sql_database() + info = pipeline.run(source, write_disposition="replace") + print(info) + ``` + + > Use one method from the methods [described above](#add-credentials) to pass credentials. + +1. To load just the "family" table using the `sql_database` source: -That's it! Enjoy running your SQL Database DLT pipeline! + ```python + source = sql_database().with_resources("family") + #running the pipeline + info = pipeline.run(source, write_disposition="replace") + print(info) + ``` + +1. To pseudonymize columns and hide personally identifiable information (PII), refer to the + [documentation](https://dlthub.com/docs/general-usage/customising-pipelines/pseudonymizing_columns). + For example, to pseudonymize the "rfam_acc" column in the "family" table: + + ```python + import hashlib + + def pseudonymize_name(doc): + ''' + Pseudonmyisation is a deterministic type of PII-obscuring + Its role is to allow identifying users by their hash, + without revealing the underlying info. + ''' + # add a constant salt to generate + salt = 'WI@N57%zZrmk#88c' + salted_string = doc['rfam_acc'] + salt + sh = hashlib.sha256() + sh.update(salted_string.encode()) + hashed_string = sh.digest().hex() + doc['rfam_acc'] = hashed_string + return doc + + def load_table_with_pseudonymized_columns(): + + pipeline = dlt.pipeline( + # Configure the pipeline + ) + # using sql_database source to load family table and pseudonymize the column "rfam_acc" + source = sql_database().with_resources("family") + + # modify this source instance's resource + source = source.family.add_map(pseudonymize_name) + + # Run the pipeline. For a large db this may take a while + info = pipeline.run(source, write_disposition="replace") + print(info) + ``` + +1. To exclude the columns, for eg. "rfam_id" column from the "family" table before loading: + + ```python + def remove_columns(doc): + del doc["rfam_id"] + return doc + + def load_table_with_deleted_columns(): + + pipeline = dlt.pipeline( + # Configure the pipeline + ) + # using sql_database source to load family table and remove the column "rfam_id" + source = sql_database().with_resources("family") + + # modify this source instance's resource + source = source.family.add_map(remove_columns) + + # Run the pipeline. For a large db this may take a while + info = pipeline.run(source, write_disposition="replace") + print(info) + ``` + +1. To incrementally load the "family" table using the sql_database source method: + + ```python + source = sql_database().with_resources("family") + #using the "updated" field as an incremental field using initial value of January 1, 2022, at midnight + source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0)) + #running the pipeline + info = pipeline.run(source, write_disposition="merge") + print(info) + ``` + + > In this example, we load the "family" table and set the "updated" column for incremental + > loading. In first run it loads all the data from January 1, 2022, at midnight (00:00:00) and + > then loads incrementally in subsequent runs using "updated" field. + +1. To incrementally load the "family" table using the 'sql_table' resource. + + ```python + family = sql_table( + table="family", + incremental=dlt.sources.incremental( + "updated", initial_value=pendulum.datetime(2022, 1, 1, 0, 0, 0) + ), + ) + # Running the pipeline + info = pipeline.extract(family, write_disposition="merge") + print(info) + ``` + + > Loads all data from "family" table from January 1, 2022, at midnight (00:00:00) and then loads + > incrementally in subsequent runs using "updated" field. + + > 💡 Please note that to use merge write disposition a primary key must exist in the source table. + > `dlt` finds and sets up primary keys automatically. + + > 💡 `apply_hints` is a powerful method that allows to modify the schema of the resource after it + > was created: including the write disposition and primary keys. You are free to select many + > different tables and use `apply_hints` several times to have pipelines where some resources are + > merged, appended or replaced. + +1. Remember, to maintain the same pipeline name and destination dataset name. The pipeline name + retrieves the [state](https://dlthub.com/docs/general-usage/state) from the last run, essential + for incremental data loading. Changing these names might trigger a + [“full_refresh”](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh), + disrupting metadata tracking for + [incremental loads](https://dlthub.com/docs/general-usage/incremental-loading). From 3e4796251a0f9d108db33fa8620b05fd7e9f1bad Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Mon, 11 Sep 2023 07:14:25 +0000 Subject: [PATCH 2/3] Removing extra spaces in code --- .../docs/dlt-ecosystem/verified-sources/sql_database.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index 351d74e518..877a4e1050 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -293,10 +293,8 @@ verified source. ) # using sql_database source to load family table and pseudonymize the column "rfam_acc" source = sql_database().with_resources("family") - # modify this source instance's resource source = source.family.add_map(pseudonymize_name) - # Run the pipeline. For a large db this may take a while info = pipeline.run(source, write_disposition="replace") print(info) @@ -316,10 +314,8 @@ verified source. ) # using sql_database source to load family table and remove the column "rfam_id" source = sql_database().with_resources("family") - # modify this source instance's resource source = source.family.add_map(remove_columns) - # Run the pipeline. For a large db this may take a while info = pipeline.run(source, write_disposition="replace") print(info) From 1013afb9cfb7bf98235fc615ee21c484ba14a952 Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Thu, 14 Sep 2023 17:04:42 +0200 Subject: [PATCH 3/3] fix --- .../verified-sources/sql_database.md | 64 +++++++++---------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index 877a4e1050..1579b80de7 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -114,17 +114,15 @@ For more information, read the 1. In the `.dlt` folder, there's a file called `secrets.toml`. It's where you store sensitive information securely, like access tokens. Keep this file safe. - Here's what the `secrets.toml` looks like - - You can pass the credentials + Here's what the `secrets.toml` looks like: ```toml [sources.sql_database.credentials] - drivername = "please set me up!" # driver name for the database - database = "please set me up!" # database name - username = "please set me up!" # username associated with the database - host = "please set me up!" # host address - port = "please set me up!" # port required for connection + drivername = "mysql+pymysql" # driver name for the database + database = "Rfam" # database name + username = "rfamro" # username associated with the database + host = "mysql-rfam-public.ebi.ac.uk" # host address + port = "4497" # port required for connection ``` 1. Alternatively, you can also provide credentials in "secrets.toml" as: @@ -191,7 +189,7 @@ def sql_database( ) -> Iterable[DltResource]: ``` -`credentials`: Database details or an 'sqlalchemy.Engine' instance. +`credentials`: Database details or a 'sqlalchemy.Engine' instance. `schema`: Database schema name (default if unspecified). @@ -286,39 +284,35 @@ verified source. doc['rfam_acc'] = hashed_string return doc - def load_table_with_pseudonymized_columns(): - - pipeline = dlt.pipeline( - # Configure the pipeline - ) - # using sql_database source to load family table and pseudonymize the column "rfam_acc" - source = sql_database().with_resources("family") - # modify this source instance's resource - source = source.family.add_map(pseudonymize_name) - # Run the pipeline. For a large db this may take a while - info = pipeline.run(source, write_disposition="replace") - print(info) + pipeline = dlt.pipeline( + # Configure the pipeline + ) + # using sql_database source to load family table and pseudonymize the column "rfam_acc" + source = sql_database().with_resources("family") + # modify this source instance's resource + source = source.family.add_map(pseudonymize_name) + # Run the pipeline. For a large db this may take a while + info = pipeline.run(source, write_disposition="replace") + print(info) ``` -1. To exclude the columns, for eg. "rfam_id" column from the "family" table before loading: +1. To exclude the columns, for e.g. "rfam_id" column from the "family" table before loading: ```python def remove_columns(doc): del doc["rfam_id"] return doc - def load_table_with_deleted_columns(): - - pipeline = dlt.pipeline( - # Configure the pipeline - ) - # using sql_database source to load family table and remove the column "rfam_id" - source = sql_database().with_resources("family") - # modify this source instance's resource - source = source.family.add_map(remove_columns) - # Run the pipeline. For a large db this may take a while - info = pipeline.run(source, write_disposition="replace") - print(info) + pipeline = dlt.pipeline( + # Configure the pipeline + ) + # using sql_database source to load family table and remove the column "rfam_id" + source = sql_database().with_resources("family") + # modify this source instance's resource + source = source.family.add_map(remove_columns) + # Run the pipeline. For a large db this may take a while + info = pipeline.run(source, write_disposition="replace") + print(info) ``` 1. To incrementally load the "family" table using the sql_database source method: @@ -333,7 +327,7 @@ verified source. ``` > In this example, we load the "family" table and set the "updated" column for incremental - > loading. In first run it loads all the data from January 1, 2022, at midnight (00:00:00) and + > loading. In the first run, it loads all the data from January 1, 2022, at midnight (00:00:00) and > then loads incrementally in subsequent runs using "updated" field. 1. To incrementally load the "family" table using the 'sql_table' resource.