From 0e949b9ebd17126f12ea9564cf0576392cfada86 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 17 May 2024 22:51:06 +0200 Subject: [PATCH] adds information on table backends --- .../docs/dlt-ecosystem/destinations/mssql.md | 5 +- .../verified-sources/sql_database.md | 352 +++++++++++------- 2 files changed, 211 insertions(+), 146 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/mssql.md b/docs/website/docs/dlt-ecosystem/destinations/mssql.md index 3e5b209aaa..6554d24bf7 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/mssql.md +++ b/docs/website/docs/dlt-ecosystem/destinations/mssql.md @@ -70,12 +70,11 @@ destination.mssql.credentials="mssql://loader:@loader.database.windows You can place any ODBC-specific settings into the query string or **destination.mssql.credentials.query** TOML table as in the example above. -**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. This method is useful when SQL logins aren't available, and you use Windows credentials. +**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. ```toml -destination.mssql.credentials="mssql://username:password@loader.database.windows.net/dlt_data?trusted_connection=yes" +destination.mssql.credentials="mssql://loader.database.windows.net/dlt_data?trusted_connection=yes" ``` -> The username and password must be filled out with the appropriate login credentials or left untouched. Leaving these empty is not recommended. **To connect to a local sql server instance running without SSL** pass `encrypt=no` parameter: ```toml diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index 27ab47b527..1891157b4a 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -12,7 +12,7 @@ import Header from './_source-info-header.md'; SQL databases are management systems (DBMS) that store data in a structured format, commonly used for efficient and reliable data retrieval. -Our SQL Database verified source loads data to your specified destination using SQLAlchemy. +Our SQL Database verified source loads data to your specified destination using SQLAlchemy, pyarrow, pandas or ConnectorX :::tip View the pipeline example [here](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py). @@ -20,10 +20,11 @@ View the pipeline example [here](https://github.com/dlt-hub/verified-sources/blo Sources and resources that can be loaded using this verified source are: -| Name | Description | -| ------------ | ----------------------------------------- | -| sql_database | Retrieves data from an SQL database | -| sql_table | Retrieves data from an SQL database table | +| Name | Description | +| ------------ | -------------------------------------------------------------------- | +| sql_database | Reflects the tables and views in SQL database and retrieves the data | +| sql_table | Retrieves data from a particular SQL database table | +| | | ### Supported databases @@ -51,77 +52,7 @@ Note that there many unofficial dialects, such as [DuckDB](https://duckdb.org/). ## Setup Guide -### Grab credentials - -This verified source utilizes SQLAlchemy for database connectivity. Let's take a look at the following public database example: - -`connection_url = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` - -The database above doesn't require a password. - -The connection URL can be broken down into: - -```py -connection_url = f"{drivername}://{username}:{password}@{host}:{port}/{database}" -``` - -`drivername`: Indicates both the database system and driver used. - -- E.g., "mysql+pymysql" uses MySQL with the pymysql driver. Alternatives might include mysqldb and - mysqlclient. - -`username`: Used for database authentication. - -- E.g., "rfamro" as a possible read-only user. - -`password`: The password for the given username. - -`host`: The server's address or domain where the database is hosted. - -- E.g., A public database at "mysql-rfam-public.ebi.ac.uk" hosted by EBI. - -`port`: The port for the database connection. - -- E.g., "4497", in the above connection URL. -`port`: The port for the database connection. - -- E.g., "4497", in the above connection URL. - -`database`: The specific database on the server. - -- E.g., Connecting to the "Rfam" database. - -### Configure connection - -Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections). - -1. To enforce SSL on the client without a client certificate you may pass the following DSN: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=" - ``` - -1. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false" - ``` - -1. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem" - ``` - -1. For MSSQL destinations using Windows Authentication, you can modify your connection string to include `trusted_connection=yes`. This bypasses the need for specifying a username and password, which is particularly useful when SQL login credentials are not an option. Here’s how you can set it up: - - ```toml - sources.sql_database.credentials="mssql://user:pw@my_host/my_database?trusted_connection=yes" - ``` - - >Note: The (user:pw) may be included but will be ignored by the server if `trusted_connection=yes` is set. - -### Initialize the verified source +1. ### Initialize the verified source To get started with your data pipeline, follow these steps: @@ -145,7 +76,7 @@ To get started with your data pipeline, follow these steps: For more information, read the guide on [how to add a verified source](../../walkthroughs/add-a-verified-source). -### Add credentials +2. ### Add credentials 1. In the `.dlt` folder, there's a file called `secrets.toml`. It's where you store sensitive information securely, like access tokens. Keep this file safe. @@ -164,17 +95,9 @@ For more information, read the guide on [how to add a verified source](../../wal 1. Alternatively, you can also provide credentials in "secrets.toml" as: ```toml - sources.sql_database.credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" + [sources.sql_database] + credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" ``` - -1. You can also pass credentials in the pipeline script the following way: - - ```py - credentials = ConnectionStringCredentials( - "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" - ) - ``` - > See > [pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py) > for details. @@ -183,7 +106,17 @@ For more information, read the guide on [how to add a verified source](../../wal For more information, read the [General Usage: Credentials.](../../general-usage/credentials) -## Run the pipeline +#### Credentials format + +`sql_database` uses SQLAlchemy to create database connections and reflect table schemas. You can pass credentials using +[database urls](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls). For example: + +"mysql+pymysql://rfamro:PWD@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` + +will connect to `myssql` database with a name `Rfam` using `pymysql` dialect. The database host is at `mysql-rfam-public.ebi.ac.uk`, port `4497`. +User name is `rfmaro` and password is `PWD`. + +3. ### Run the pipeline 1. Install the necessary dependencies by running the following command: @@ -208,76 +141,166 @@ For more information, read the [General Usage: Credentials.](../../general-usage custom name instead. ::: +## Source and resource functions +Import `sql_database` and `sql_table` functions as follows: +```py +from sql_database import sql_database, sql_table +``` +and read the docstrings to learn about available options. + +:::tip +We intend our sources to be fully hackable. Feel free to change the code of the source to customize it to your needs +::: + +## Pick the right backend to load table data +Table backends convert stream of rows from database tables into batches in various formats. The default backend **sqlalchemy** is following standard `dlt` behavior of +extracting and normalizing Python dictionaries. We recommend it for smaller tables, initial development work and when minimal dependencies or pure Python environment is required. This backend is also the slowest. +Database tables are structured data and other backends speed up dealing with such data significantly. The **pyarrow** will convert rows into `arrow` tables, has +good performance, preserves exact database types and we recommend it for large tables. + +### **sqlalchemy** backend -## Sources and resources +**sqlalchemy** (the default) yields table data as list of Python dictionaries. This data goes through regular extract +and normalize steps and does not require additional dependencies to be installed. It is the most robust (works with any destination, correctly represents data types) but also the slowest. You can use `detect_precision_hints` to pass exact database types to `dlt` schema. -`dlt` works on the principle of [sources](../../general-usage/source) and -[resources](../../general-usage/resource). +### **pyarrow** backend -### Source `sql_database`: +**pyarrow** yields data as Arrow tables. It uses **SqlAlchemy** to read rows in batches but then immediately converts them into `ndarray`, transposes it and uses to set columns in an arrow table. This backend always fully +reflects the database table and preserves original types ie. **decimal** / **numeric** will be extracted without loss of precision. If the destination loads parquet files, this backend will skip `dlt` normalizer and you can gain two orders of magnitude (20x - 30x) speed increase. -This function loads data from an SQL database via SQLAlchemy and auto-creates resources for each -table or from a specified list of tables. +Note that if **pandas** is installed, we'll use it to convert SqlAlchemy tuples into **ndarray** as it seems to be 20-30% faster than using **numpy** directly. ```py -@dlt.source -def sql_database( - credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, - schema: Optional[str] = dlt.config.value, - metadata: Optional[MetaData] = None, - table_names: Optional[List[str]] = dlt.config.value, - chunk_size: int = 1000, - detect_precision_hints: Optional[bool] = dlt.config.value, - defer_table_reflect: Optional[bool] = dlt.config.value, - table_adapter_callback: Callable[[Table], None] = None, -) -> Iterable[DltResource]: - ... +import sqlalchemy as sa +pipeline = dlt.pipeline( + pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_arrow" +) + +def _double_as_decimal_adapter(table: sa.Table) -> None: + """Return double as double, not decimals, this is mysql thing""" + for column in table.columns.values(): + if isinstance(column.type, sa.Double): + column.type.asdecimal = False + +sql_alchemy_source = sql_database( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam?&binary_prefix=true", + backend="pyarrow", + table_adapter_callback=_double_as_decimal_adapter +).with_resources("family", "genome") + +info = pipeline.run(sql_alchemy_source) +print(info) ``` -`credentials`: Database details or an 'sqlalchemy.Engine' instance. +### **pandas** backend -`schema`: Database schema name (default if unspecified). +**pandas** backend yield data as data frames using the `pandas.io.sql` module. `dlt` use **pyarrow** dtypes by default as they generate more stable typing. -`metadata`: Optional SQLAlchemy.MetaData; takes precedence over schema. +With default settings, several database types will be coerced to dtypes in yielded data frame: +* **decimal** are mapped to doubles so it is possible to lose precision. +* **date** and **time** are mapped to strings +* all types are nullable. -`table_names`: List of tables to load; defaults to all if not provided. +Note: `dlt` will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse +type differences. Most of the destinations will be able to parse date/time strings and convert doubles into decimals (Please note that you' still lose precision on decimals with default settings.). **However we strongly suggest +not to use pandas backend if your source tables contain date, time or decimal columns** -`chunk_size`: Number of records in a batch. Internally SqlAlchemy maintains a buffer twice that size +Example: Use `backend_kwargs` to pass [backend-specific settings](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) ie. `coerce_float`. Internally dlt uses `pandas.io.sql._wrap_result` to generate panda frames. -`detect_precision_hints`: Infers full schema for columns including data type, precision and scale +```py +import sqlalchemy as sa +pipeline = dlt.pipeline( + pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_pandas_2" +) -`defer_table_reflect`: Will connect to the source database and reflect the tables -only at runtime. Use when running on Airflow +def _double_as_decimal_adapter(table: sa.Table) -> None: + """Emits decimals instead of floats.""" + for column in table.columns.values(): + if isinstance(column.type, sa.Float): + column.type.asdecimal = True + +sql_alchemy_source = sql_database( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam?&binary_prefix=true", + backend="pandas", + table_adapter_callback=_double_as_decimal_adapter, + chunk_size=100000, + # set coerce_float to False to represent them as string + backend_kwargs={"coerce_float": False, "dtype_backend": "numpy_nullable"}, +).with_resources("family", "genome") + +info = pipeline.run(sql_alchemy_source) +print(info) +``` -`table_adapter_callback`: A callback with SQLAlchemy `Table` where you can, for example, -remove certain columns to be selected. +### **connectorx** backend +[connectorx](https://sfu-db.github.io/connector-x/intro.html) backend completely skips **sqlalchemy** when reading table rows, in favor of doing that in rust. This is claimed to be significantly faster than any other method (confirmed only on postgres - see next chapter). With the default settings it will emit **pyarrow** tables, but you can configure it via **backend_kwargs**. -### Resource `sql_table` +There are certain limitations when using this backend: +* it will ignore `chunk_size`. **connectorx** cannot yield data in batches. +* in many cases it requires a connection string that differs from **sqlalchemy** connection string. Use `conn` argument in **backend_kwargs** to set it up. +* it will convert **decimals** to **doubles** so you'll will lose precision. +* nullability of the columns is ignored (always true) +* it uses different database type mappings for each database type. [check here for more details](https://sfu-db.github.io/connector-x/databases.html) +* JSON fields (at least those coming from postgres) are double wrapped in strings. Here's a transform to be added with `add_map` that will unwrap it: -This function loads data from specific database tables. +```py +from sources.sql_database.helpers import unwrap_json_connector_x +``` + +Note: dlt will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse type differences. Please note that you' still lose precision on decimals with default settings. ```py -@dlt.common.configuration.with_config( - sections=("sources", "sql_database"), spec=SqlTableResourceConfiguration +"""Uses unsw_flow dataset (~2mln rows, 25+ columns) to test connectorx speed""" +import os +from dlt.destinations import filesystem + +unsw_table = sql_table( + "postgresql://loader:loader@localhost:5432/dlt_data", + "unsw_flow_7", + "speed_test", + # this is ignored by connectorx + chunk_size=100000, + backend="connectorx", + # keep source data types + detect_precision_hints=True, + # just to demonstrate how to setup a separate connection string for connectorx + backend_kwargs={"conn": "postgresql://loader:loader@localhost:5432/dlt_data"} +) + +pipeline = dlt.pipeline( + pipeline_name="unsw_download", + destination=filesystem(os.path.abspath("../_storage/unsw")), + progress="log", + full_refresh=True, +) + +info = pipeline.run( + unsw_table, + dataset_name="speed_test", + table_name="unsw_flow", + loader_file_format="parquet", ) -def sql_table( - credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, - table: str = dlt.config.value, - schema: Optional[str] = dlt.config.value, - metadata: Optional[MetaData] = None, - incremental: Optional[dlt.sources.incremental[Any]] = None, - chunk_size: int = 1000, - detect_precision_hints: Optional[bool] = dlt.config.value, - defer_table_reflect: Optional[bool] = dlt.config.value, - table_adapter_callback: Callable[[Table], None] = None, -) -> DltResource: - ... +print(info) ``` -`incremental`: Optional, enables incremental loading. +With dataset above and local postgres instance, connectorx is 2x faster than pyarrow backend. -`write_disposition`: Can be "merge", "replace", or "append". +### Notes on source databases + +#### Oracle +1. When using **oracledb** dialect in thin mode we are getting protocol errors. Use thick mode or **cx_oracle** (old) client. +2. Mind that **sqlalchemy** translates Oracle identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. +3. Connectorx is for some reason slower for Oracle than `pyarrow` backend. + +#### DB2 +1. Mind that **sqlalchemy** translates DB2 identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. +2. DB2 `DOUBLE` type is mapped to `Numeric` SqlAlchemy type with default precision, still `float` python types are returned. That requires `dlt` to perform additional casts. The cost of the cast however is minuscule compared to the cost of reading rows from database + +#### MySQL +1. SqlAlchemy dialect converts doubles to decimals, we disable that behavior via table adapter in our demo pipeline + +#### Postgres / MSSQL +No issues found. Postgres is the only backend where we observed 2x speedup with connector x. On other db systems it performs same as `pyarrrow` backend or slower. -for other arguments, see `sql_database` source above. ## Incremental Loading Efficient data management often requires loading only new or updated data from your SQL databases, rather than reprocessing the entire dataset. This is where incremental loading comes into play. @@ -344,30 +367,73 @@ certain range. :::info * For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up. - * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appendend, or replaced resources. + * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. ::: -### Run on Airflow +## Run on Airflow When running on Airflow 1. Use `dlt` [Airflow Helper](../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from `sql_database` source. You should be able to run table extraction in parallel with `parallel-isolated` source->DAG conversion. 2. Reflect tables at runtime with `defer_table_reflect` argument. 3. Set `allow_external_schedulers` to load data using [Airflow intervals](../../general-usage/incremental-loading.md#using-airflow-schedule-for-backfill-and-incremental-loading). -### Parallel extraction +## Parallel extraction You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. ```py database = sql_database().parallelize() table = sql_table().parallelize() ``` +## Troubleshooting + +### Connect to mysql with SSL +Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections). + +1. To enforce SSL on the client without a client certificate you may pass the following DSN: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=" + ``` + +1. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false" + ``` + +1. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem" + ``` + +### SQL Server connection options + +**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. + +```toml +destination.mssql.credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` + +**To connect to a local sql server instance running without SSL** pass `encrypt=no` parameter: +```toml +destination.mssql.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?encrypt=no&driver=ODBC+Driver+17+for+SQL+Server" +``` + +**To allow self signed SSL certificate** when you are getting `certificate verify failed:unable to get local issuer certificate`: +```toml +destination.mssql.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?TrustServerCertificate=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` -### Troubleshooting -If you encounter issues where the expected WHERE clause for incremental loading is not generated, ensure your configuration aligns with the `sql_table` resource rather than applying hints post-resource creation. This ensures the loader generates the correct query for incremental loading. +***To use long strings (>8k) and avoid collation errors**: +```toml +destination.mssql.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?LongAsMax=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` -## Customization -### Create your own pipeline +## Customizations +### Transform the data in Python before it is loaded -To create your own pipeline, use source and resource methods from this verified source. +You have direct access to all resources (that represent tables) and you can modify hints, add python transforms, parallelize execution etc. as for any other +resource. Below we show you an example on how to pseudonymize the data before it is loaded by using deterministic hashing. 1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows: