diff --git a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py index ea60b9b00d..125938ace5 100644 --- a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py +++ b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py @@ -5,13 +5,13 @@ keywords: [destination, credentials, example, bigquery, custom destination] --- -In this example, you'll find a Python script that demonstrates how to load to bigquey with the custom destination. +In this example, you'll find a Python script that demonstrates how to load to BigQuery with the custom destination. We'll learn how to: -- use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials) -- use the [custom destination](../dlt-ecosystem/destinations/destination.md) -- Use pyarrow tables to create complex column types on bigquery -- Use bigquery `autodetect=True` for schema inference from parquet files +- Use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials) +- Use the [custom destination](../dlt-ecosystem/destinations/destination.md) +- Use pyarrow tables to create complex column types on BigQuery +- Use BigQuery `autodetect=True` for schema inference from parquet files """ @@ -38,7 +38,7 @@ def resource(url: str): # load pyarrow table with pandas table = pa.Table.from_pandas(pd.read_csv(url)) - # we add a list type column to demontrate bigquery lists + # we add a list type column to demonstrate bigquery lists table = table.append_column( "tags", pa.array( @@ -57,12 +57,12 @@ def resource(url: str): yield table -# dlt biquery custom destination +# dlt bigquery custom destination # we can use the dlt provided credentials class # to retrieve the gcp credentials from the secrets -@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0) +@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0, naming_convention="snake_case") def bigquery_insert( - items, table, credentials: GcpServiceAccountCredentials = dlt.secrets.value + items, table=BIGQUERY_TABLE_ID, credentials: GcpServiceAccountCredentials = dlt.secrets.value ) -> None: client = bigquery.Client( credentials.project_id, credentials.to_native_credentials(), location="US" @@ -74,7 +74,7 @@ def bigquery_insert( ) # since we have set the batch_size to 0, we get a filepath and can load the file directly with open(items, "rb") as f: - load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config) + load_job = client.load_table_from_file(f, table, job_config=job_config) load_job.result() # Waits for the job to complete. diff --git a/docs/examples/transformers/.dlt/config.toml b/docs/examples/transformers/.dlt/config.toml index 251808e8ef..87433b52da 100644 --- a/docs/examples/transformers/.dlt/config.toml +++ b/docs/examples/transformers/.dlt/config.toml @@ -3,7 +3,7 @@ log_level="WARNING" [extract] # use 2 workers to extract sources in parallel -worker=2 +workers=2 # allow 10 async items to be processed in parallel max_parallel_items=10 diff --git a/docs/website/blog/2024-05-23-contributed-first-pipeline.md b/docs/website/blog/2024-05-23-contributed-first-pipeline.md new file mode 100644 index 0000000000..aae6e0f298 --- /dev/null +++ b/docs/website/blog/2024-05-23-contributed-first-pipeline.md @@ -0,0 +1,90 @@ +--- +slug: contributed-first-pipeline +title: "How I contributed my first data pipeline to the open source." +image: https://storage.googleapis.com/dlt-blog-images/blog_my_first_data_pipeline.png +authors: + name: Aman Gupta + title: Junior Data Engineer + url: https://github.com/dat-a-man + image_url: https://dlt-static.s3.eu-central-1.amazonaws.com/images/aman.png +tags: [data ingestion, python sdk, ETL, python data pipelines, Open Source, Developer Tools] +--- + +Hello, I'm Aman Gupta. Over the past eight years, I have navigated the structured world of civil engineering, but recently, I have found myself captivated by data engineering. Initially, I knew how to stack bricks and build structural pipelines. But this newfound interest has helped me build data pipelines, and most of all, it was sparked by a workshop hosted by **dlt.** + +:::info +dlt (data loading tool) is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets. +::: + +The `dlt` workshop took place in November 2022, co-hosted by Adrian Brudaru, my former mentor and co-founder of `dlt`. + +An opportunity arose when another client needed data migration from FreshDesk to BigQuery. I crafted a basic pipeline version, initially designed to support my use case. Upon presenting my basic pipeline to the dlt team, Alena Astrakhatseva, a team member, generously offered to review it and refine it into a community-verified source. + +![image](https://storage.googleapis.com/dlt-blog-images/blog_my_first_data_pipeline.png) + +My first iteration was straightforward—loading data in [replace mode](https://dlthub.com/docs/general-usage/incremental-loading#the-3-write-dispositions). While adequate for initial purposes, a verified source demanded features like [pagination](https://dlthub.com/docs/general-usage/http/overview#explicitly-specifying-pagination-parameters) and [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading). To achieve this, I developed an API client tailored for the Freshdesk API, integrating rate limit handling and pagination: + +```py +class FreshdeskClient: + """ + Client for making authenticated requests to the Freshdesk API. It incorporates API requests with + rate limit and pagination. + """ + + def __init__(self, api_key: str, domain: str): + # Contains stuff like domain, credentials and base URL. + pass + + def _request_with_rate_limit(self, url: str, **kwargs: Any) -> requests.Response: + # Handles rate limits in HTTP requests and ensures that the client doesn't exceed the limit set by the server. + pass + + def paginated_response( + self, + endpoint: str, + per_page: int, + updated_at: Optional[str] = None, + ) -> Iterable[TDataItem]: + # Fetches a paginated response from a specified endpoint. + pass +``` + +To further make the pipeline effective, I developed dlt [resources](https://dlthub.com/docs/general-usage/resource) that could handle incremental data loading. This involved creating resources that used **`dlt`**'s incremental functionality to fetch only new or updated data: + +```py +def incremental_resource( + endpoint: str, + updated_at: Optional[Any] = dlt.sources.incremental( + "updated_at", initial_value="2022-01-01T00:00:00Z" + ), +) -> Generator[Dict[Any, Any], Any, None]: + """ + Fetches and yields paginated data from a specified API endpoint. + Each page of data is fetched based on the `updated_at` timestamp + to ensure incremental loading. + """ + + # Retrieve the last updated timestamp to fetch only new or updated records. + updated_at = updated_at.last_value + + # Use the FreshdeskClient instance to fetch paginated responses + yield from freshdesk.paginated_response( + endpoint=endpoint, + per_page=per_page, + updated_at=updated_at, + ) +``` + +With the steps defined above, I was able to load the data from Freshdesk to BigQuery and use the pipeline in production. Here’s a summary of the steps I followed: + +1. Created a Freshdesk API token with sufficient privileges. +1. Created an API client to make requests to the Freshdesk API with rate limit and pagination. +1. Made incremental requests to this client based on the “updated_at” field in the response. +1. Ran the pipeline using the Python script. + + +While my journey from civil engineering to data engineering was initially intimidating, it has proved to be a profound learning experience. Writing a pipeline with **`dlt`** mirrors the simplicity of a GET request: you request data, yield it, and it flows from the source to its destination. Now, I help other clients integrate **`dlt`** to streamline their data workflows, which has been an invaluable part of my professional growth. + +In conclusion, diving into data engineering has expanded my technical skill set and provided a new lens through which I view challenges and solutions. As for me, the lens view mainly was concrete and steel a couple of years back, which has now begun to notice the pipelines of the data world. + +Data engineering has proved both challenging, satisfying and a good carrier option for me till now. For those interested in the detailed workings of these pipelines, I encourage exploring dlt's [GitHub repository](https://github.com/dlt-hub/verified-sources) or diving into the [documentation](https://dlthub.com/docs/dlt-ecosystem/verified-sources/freshdesk). \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/destinations/mssql.md b/docs/website/docs/dlt-ecosystem/destinations/mssql.md index 3e5b209aaa..6554d24bf7 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/mssql.md +++ b/docs/website/docs/dlt-ecosystem/destinations/mssql.md @@ -70,12 +70,11 @@ destination.mssql.credentials="mssql://loader:@loader.database.windows You can place any ODBC-specific settings into the query string or **destination.mssql.credentials.query** TOML table as in the example above. -**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. This method is useful when SQL logins aren't available, and you use Windows credentials. +**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. ```toml -destination.mssql.credentials="mssql://username:password@loader.database.windows.net/dlt_data?trusted_connection=yes" +destination.mssql.credentials="mssql://loader.database.windows.net/dlt_data?trusted_connection=yes" ``` -> The username and password must be filled out with the appropriate login credentials or left untouched. Leaving these empty is not recommended. **To connect to a local sql server instance running without SSL** pass `encrypt=no` parameter: ```toml diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md index 1f79055d06..0022850987 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md @@ -203,7 +203,7 @@ For example, you can set the primary key, write disposition, and other default s ```py config = { "client": { - ... + # ... }, "resource_defaults": { "primary_key": "id", @@ -216,15 +216,17 @@ config = { }, "resources": [ "resource1", - "resource2": { - "name": "resource2_name", - "write_disposition": "append", - "endpoint": { - "params": { - "param1": "value1", + { + "resource2": { + "name": "resource2_name", + "write_disposition": "append", + "endpoint": { + "params": { + "param1": "value1", + }, }, - }, - }, + } + } ], } ``` @@ -309,7 +311,7 @@ To specify the pagination configuration, use the `paginator` field in the [clien ```py { - ... + # ... "paginator": { "type": "json_links", "next_url_path": "paging.next", @@ -321,7 +323,7 @@ Or using the paginator instance: ```py { - ... + # ... "paginator": JSONResponsePaginator( next_url_path="paging.next" ), @@ -394,11 +396,11 @@ One of the most common method is token-based authentication. To authenticate wit ```py { "client": { - ... + # ... "auth": { "token": dlt.secrets["your_api_token"], }, - ... + # ... }, } ``` @@ -424,7 +426,7 @@ To specify the authentication configuration, use the `auth` field in the [client "type": "bearer", "token": dlt.secrets["your_api_token"], }, - ... + # ... }, } ``` @@ -438,7 +440,7 @@ config = { "client": { "auth": BearTokenAuth(dlt.secrets["your_api_token"]), }, - ... + # ... } ``` @@ -455,7 +457,7 @@ In the GitHub example, the `issue_comments` resource depends on the `issues` res "name": "issues", "endpoint": { "path": "issues", - ... + # ... }, }, { @@ -495,10 +497,12 @@ The `issue_comments` resource will make requests to the following endpoints: The syntax for the `resolve` field in parameter configuration is: ```py -"": { - "type": "resolve", - "resource": "", - "field": "", +{ + "": { + "type": "resolve", + "resource": "", + "field": "", + } } ``` @@ -530,21 +534,25 @@ When the API endpoint supports incremental loading, you can configure the source 1. Defining a special parameter in the `params` section of the [endpoint configuration](#endpoint-configuration): ```py - "": { - "type": "incremental", - "cursor_path": "", - "initial_value": "", - }, + { + "": { + "type": "incremental", + "cursor_path": "", + "initial_value": "", + }, + } ``` For example, in the `issues` resource configuration in the GitHub example, we have: ```py - "since": { - "type": "incremental", - "cursor_path": "updated_at", - "initial_value": "2024-01-25T11:21:28Z", - }, + { + "since": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": "2024-01-25T11:21:28Z", + }, + } ``` This configuration tells the source to create an incremental object that will keep track of the `updated_at` field in the response and use it as a value for the `since` parameter in subsequent requests. @@ -552,13 +560,15 @@ When the API endpoint supports incremental loading, you can configure the source 2. Specifying the `incremental` field in the [endpoint configuration](#endpoint-configuration): ```py - "incremental": { - "start_param": "", - "end_param": "", - "cursor_path": "", - "initial_value": "", - "end_value": "", - }, + { + "incremental": { + "start_param": "", + "end_param": "", + "cursor_path": "", + "initial_value": "", + "end_value": "", + } + } ``` This configuration is more flexible and allows you to specify the start and end conditions for the incremental loading. diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index 27ab47b527..1891157b4a 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -12,7 +12,7 @@ import Header from './_source-info-header.md'; SQL databases are management systems (DBMS) that store data in a structured format, commonly used for efficient and reliable data retrieval. -Our SQL Database verified source loads data to your specified destination using SQLAlchemy. +Our SQL Database verified source loads data to your specified destination using SQLAlchemy, pyarrow, pandas or ConnectorX :::tip View the pipeline example [here](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py). @@ -20,10 +20,11 @@ View the pipeline example [here](https://github.com/dlt-hub/verified-sources/blo Sources and resources that can be loaded using this verified source are: -| Name | Description | -| ------------ | ----------------------------------------- | -| sql_database | Retrieves data from an SQL database | -| sql_table | Retrieves data from an SQL database table | +| Name | Description | +| ------------ | -------------------------------------------------------------------- | +| sql_database | Reflects the tables and views in SQL database and retrieves the data | +| sql_table | Retrieves data from a particular SQL database table | +| | | ### Supported databases @@ -51,77 +52,7 @@ Note that there many unofficial dialects, such as [DuckDB](https://duckdb.org/). ## Setup Guide -### Grab credentials - -This verified source utilizes SQLAlchemy for database connectivity. Let's take a look at the following public database example: - -`connection_url = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` - -The database above doesn't require a password. - -The connection URL can be broken down into: - -```py -connection_url = f"{drivername}://{username}:{password}@{host}:{port}/{database}" -``` - -`drivername`: Indicates both the database system and driver used. - -- E.g., "mysql+pymysql" uses MySQL with the pymysql driver. Alternatives might include mysqldb and - mysqlclient. - -`username`: Used for database authentication. - -- E.g., "rfamro" as a possible read-only user. - -`password`: The password for the given username. - -`host`: The server's address or domain where the database is hosted. - -- E.g., A public database at "mysql-rfam-public.ebi.ac.uk" hosted by EBI. - -`port`: The port for the database connection. - -- E.g., "4497", in the above connection URL. -`port`: The port for the database connection. - -- E.g., "4497", in the above connection URL. - -`database`: The specific database on the server. - -- E.g., Connecting to the "Rfam" database. - -### Configure connection - -Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections). - -1. To enforce SSL on the client without a client certificate you may pass the following DSN: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=" - ``` - -1. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false" - ``` - -1. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem" - ``` - -1. For MSSQL destinations using Windows Authentication, you can modify your connection string to include `trusted_connection=yes`. This bypasses the need for specifying a username and password, which is particularly useful when SQL login credentials are not an option. Here’s how you can set it up: - - ```toml - sources.sql_database.credentials="mssql://user:pw@my_host/my_database?trusted_connection=yes" - ``` - - >Note: The (user:pw) may be included but will be ignored by the server if `trusted_connection=yes` is set. - -### Initialize the verified source +1. ### Initialize the verified source To get started with your data pipeline, follow these steps: @@ -145,7 +76,7 @@ To get started with your data pipeline, follow these steps: For more information, read the guide on [how to add a verified source](../../walkthroughs/add-a-verified-source). -### Add credentials +2. ### Add credentials 1. In the `.dlt` folder, there's a file called `secrets.toml`. It's where you store sensitive information securely, like access tokens. Keep this file safe. @@ -164,17 +95,9 @@ For more information, read the guide on [how to add a verified source](../../wal 1. Alternatively, you can also provide credentials in "secrets.toml" as: ```toml - sources.sql_database.credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" + [sources.sql_database] + credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" ``` - -1. You can also pass credentials in the pipeline script the following way: - - ```py - credentials = ConnectionStringCredentials( - "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" - ) - ``` - > See > [pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py) > for details. @@ -183,7 +106,17 @@ For more information, read the guide on [how to add a verified source](../../wal For more information, read the [General Usage: Credentials.](../../general-usage/credentials) -## Run the pipeline +#### Credentials format + +`sql_database` uses SQLAlchemy to create database connections and reflect table schemas. You can pass credentials using +[database urls](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls). For example: + +"mysql+pymysql://rfamro:PWD@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` + +will connect to `myssql` database with a name `Rfam` using `pymysql` dialect. The database host is at `mysql-rfam-public.ebi.ac.uk`, port `4497`. +User name is `rfmaro` and password is `PWD`. + +3. ### Run the pipeline 1. Install the necessary dependencies by running the following command: @@ -208,76 +141,166 @@ For more information, read the [General Usage: Credentials.](../../general-usage custom name instead. ::: +## Source and resource functions +Import `sql_database` and `sql_table` functions as follows: +```py +from sql_database import sql_database, sql_table +``` +and read the docstrings to learn about available options. + +:::tip +We intend our sources to be fully hackable. Feel free to change the code of the source to customize it to your needs +::: + +## Pick the right backend to load table data +Table backends convert stream of rows from database tables into batches in various formats. The default backend **sqlalchemy** is following standard `dlt` behavior of +extracting and normalizing Python dictionaries. We recommend it for smaller tables, initial development work and when minimal dependencies or pure Python environment is required. This backend is also the slowest. +Database tables are structured data and other backends speed up dealing with such data significantly. The **pyarrow** will convert rows into `arrow` tables, has +good performance, preserves exact database types and we recommend it for large tables. + +### **sqlalchemy** backend -## Sources and resources +**sqlalchemy** (the default) yields table data as list of Python dictionaries. This data goes through regular extract +and normalize steps and does not require additional dependencies to be installed. It is the most robust (works with any destination, correctly represents data types) but also the slowest. You can use `detect_precision_hints` to pass exact database types to `dlt` schema. -`dlt` works on the principle of [sources](../../general-usage/source) and -[resources](../../general-usage/resource). +### **pyarrow** backend -### Source `sql_database`: +**pyarrow** yields data as Arrow tables. It uses **SqlAlchemy** to read rows in batches but then immediately converts them into `ndarray`, transposes it and uses to set columns in an arrow table. This backend always fully +reflects the database table and preserves original types ie. **decimal** / **numeric** will be extracted without loss of precision. If the destination loads parquet files, this backend will skip `dlt` normalizer and you can gain two orders of magnitude (20x - 30x) speed increase. -This function loads data from an SQL database via SQLAlchemy and auto-creates resources for each -table or from a specified list of tables. +Note that if **pandas** is installed, we'll use it to convert SqlAlchemy tuples into **ndarray** as it seems to be 20-30% faster than using **numpy** directly. ```py -@dlt.source -def sql_database( - credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, - schema: Optional[str] = dlt.config.value, - metadata: Optional[MetaData] = None, - table_names: Optional[List[str]] = dlt.config.value, - chunk_size: int = 1000, - detect_precision_hints: Optional[bool] = dlt.config.value, - defer_table_reflect: Optional[bool] = dlt.config.value, - table_adapter_callback: Callable[[Table], None] = None, -) -> Iterable[DltResource]: - ... +import sqlalchemy as sa +pipeline = dlt.pipeline( + pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_arrow" +) + +def _double_as_decimal_adapter(table: sa.Table) -> None: + """Return double as double, not decimals, this is mysql thing""" + for column in table.columns.values(): + if isinstance(column.type, sa.Double): + column.type.asdecimal = False + +sql_alchemy_source = sql_database( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam?&binary_prefix=true", + backend="pyarrow", + table_adapter_callback=_double_as_decimal_adapter +).with_resources("family", "genome") + +info = pipeline.run(sql_alchemy_source) +print(info) ``` -`credentials`: Database details or an 'sqlalchemy.Engine' instance. +### **pandas** backend -`schema`: Database schema name (default if unspecified). +**pandas** backend yield data as data frames using the `pandas.io.sql` module. `dlt` use **pyarrow** dtypes by default as they generate more stable typing. -`metadata`: Optional SQLAlchemy.MetaData; takes precedence over schema. +With default settings, several database types will be coerced to dtypes in yielded data frame: +* **decimal** are mapped to doubles so it is possible to lose precision. +* **date** and **time** are mapped to strings +* all types are nullable. -`table_names`: List of tables to load; defaults to all if not provided. +Note: `dlt` will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse +type differences. Most of the destinations will be able to parse date/time strings and convert doubles into decimals (Please note that you' still lose precision on decimals with default settings.). **However we strongly suggest +not to use pandas backend if your source tables contain date, time or decimal columns** -`chunk_size`: Number of records in a batch. Internally SqlAlchemy maintains a buffer twice that size +Example: Use `backend_kwargs` to pass [backend-specific settings](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) ie. `coerce_float`. Internally dlt uses `pandas.io.sql._wrap_result` to generate panda frames. -`detect_precision_hints`: Infers full schema for columns including data type, precision and scale +```py +import sqlalchemy as sa +pipeline = dlt.pipeline( + pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_pandas_2" +) -`defer_table_reflect`: Will connect to the source database and reflect the tables -only at runtime. Use when running on Airflow +def _double_as_decimal_adapter(table: sa.Table) -> None: + """Emits decimals instead of floats.""" + for column in table.columns.values(): + if isinstance(column.type, sa.Float): + column.type.asdecimal = True + +sql_alchemy_source = sql_database( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam?&binary_prefix=true", + backend="pandas", + table_adapter_callback=_double_as_decimal_adapter, + chunk_size=100000, + # set coerce_float to False to represent them as string + backend_kwargs={"coerce_float": False, "dtype_backend": "numpy_nullable"}, +).with_resources("family", "genome") + +info = pipeline.run(sql_alchemy_source) +print(info) +``` -`table_adapter_callback`: A callback with SQLAlchemy `Table` where you can, for example, -remove certain columns to be selected. +### **connectorx** backend +[connectorx](https://sfu-db.github.io/connector-x/intro.html) backend completely skips **sqlalchemy** when reading table rows, in favor of doing that in rust. This is claimed to be significantly faster than any other method (confirmed only on postgres - see next chapter). With the default settings it will emit **pyarrow** tables, but you can configure it via **backend_kwargs**. -### Resource `sql_table` +There are certain limitations when using this backend: +* it will ignore `chunk_size`. **connectorx** cannot yield data in batches. +* in many cases it requires a connection string that differs from **sqlalchemy** connection string. Use `conn` argument in **backend_kwargs** to set it up. +* it will convert **decimals** to **doubles** so you'll will lose precision. +* nullability of the columns is ignored (always true) +* it uses different database type mappings for each database type. [check here for more details](https://sfu-db.github.io/connector-x/databases.html) +* JSON fields (at least those coming from postgres) are double wrapped in strings. Here's a transform to be added with `add_map` that will unwrap it: -This function loads data from specific database tables. +```py +from sources.sql_database.helpers import unwrap_json_connector_x +``` + +Note: dlt will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse type differences. Please note that you' still lose precision on decimals with default settings. ```py -@dlt.common.configuration.with_config( - sections=("sources", "sql_database"), spec=SqlTableResourceConfiguration +"""Uses unsw_flow dataset (~2mln rows, 25+ columns) to test connectorx speed""" +import os +from dlt.destinations import filesystem + +unsw_table = sql_table( + "postgresql://loader:loader@localhost:5432/dlt_data", + "unsw_flow_7", + "speed_test", + # this is ignored by connectorx + chunk_size=100000, + backend="connectorx", + # keep source data types + detect_precision_hints=True, + # just to demonstrate how to setup a separate connection string for connectorx + backend_kwargs={"conn": "postgresql://loader:loader@localhost:5432/dlt_data"} +) + +pipeline = dlt.pipeline( + pipeline_name="unsw_download", + destination=filesystem(os.path.abspath("../_storage/unsw")), + progress="log", + full_refresh=True, +) + +info = pipeline.run( + unsw_table, + dataset_name="speed_test", + table_name="unsw_flow", + loader_file_format="parquet", ) -def sql_table( - credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, - table: str = dlt.config.value, - schema: Optional[str] = dlt.config.value, - metadata: Optional[MetaData] = None, - incremental: Optional[dlt.sources.incremental[Any]] = None, - chunk_size: int = 1000, - detect_precision_hints: Optional[bool] = dlt.config.value, - defer_table_reflect: Optional[bool] = dlt.config.value, - table_adapter_callback: Callable[[Table], None] = None, -) -> DltResource: - ... +print(info) ``` -`incremental`: Optional, enables incremental loading. +With dataset above and local postgres instance, connectorx is 2x faster than pyarrow backend. -`write_disposition`: Can be "merge", "replace", or "append". +### Notes on source databases + +#### Oracle +1. When using **oracledb** dialect in thin mode we are getting protocol errors. Use thick mode or **cx_oracle** (old) client. +2. Mind that **sqlalchemy** translates Oracle identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. +3. Connectorx is for some reason slower for Oracle than `pyarrow` backend. + +#### DB2 +1. Mind that **sqlalchemy** translates DB2 identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. +2. DB2 `DOUBLE` type is mapped to `Numeric` SqlAlchemy type with default precision, still `float` python types are returned. That requires `dlt` to perform additional casts. The cost of the cast however is minuscule compared to the cost of reading rows from database + +#### MySQL +1. SqlAlchemy dialect converts doubles to decimals, we disable that behavior via table adapter in our demo pipeline + +#### Postgres / MSSQL +No issues found. Postgres is the only backend where we observed 2x speedup with connector x. On other db systems it performs same as `pyarrrow` backend or slower. -for other arguments, see `sql_database` source above. ## Incremental Loading Efficient data management often requires loading only new or updated data from your SQL databases, rather than reprocessing the entire dataset. This is where incremental loading comes into play. @@ -344,30 +367,73 @@ certain range. :::info * For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up. - * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appendend, or replaced resources. + * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. ::: -### Run on Airflow +## Run on Airflow When running on Airflow 1. Use `dlt` [Airflow Helper](../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from `sql_database` source. You should be able to run table extraction in parallel with `parallel-isolated` source->DAG conversion. 2. Reflect tables at runtime with `defer_table_reflect` argument. 3. Set `allow_external_schedulers` to load data using [Airflow intervals](../../general-usage/incremental-loading.md#using-airflow-schedule-for-backfill-and-incremental-loading). -### Parallel extraction +## Parallel extraction You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. ```py database = sql_database().parallelize() table = sql_table().parallelize() ``` +## Troubleshooting + +### Connect to mysql with SSL +Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections). + +1. To enforce SSL on the client without a client certificate you may pass the following DSN: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=" + ``` + +1. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false" + ``` + +1. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem" + ``` + +### SQL Server connection options + +**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. + +```toml +destination.mssql.credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` + +**To connect to a local sql server instance running without SSL** pass `encrypt=no` parameter: +```toml +destination.mssql.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?encrypt=no&driver=ODBC+Driver+17+for+SQL+Server" +``` + +**To allow self signed SSL certificate** when you are getting `certificate verify failed:unable to get local issuer certificate`: +```toml +destination.mssql.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?TrustServerCertificate=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` -### Troubleshooting -If you encounter issues where the expected WHERE clause for incremental loading is not generated, ensure your configuration aligns with the `sql_table` resource rather than applying hints post-resource creation. This ensures the loader generates the correct query for incremental loading. +***To use long strings (>8k) and avoid collation errors**: +```toml +destination.mssql.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?LongAsMax=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` -## Customization -### Create your own pipeline +## Customizations +### Transform the data in Python before it is loaded -To create your own pipeline, use source and resource methods from this verified source. +You have direct access to all resources (that represent tables) and you can modify hints, add python transforms, parallelize execution etc. as for any other +resource. Below we show you an example on how to pseudonymize the data before it is loaded by using deterministic hashing. 1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows: diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index 3f29182044..9f29cb6965 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -367,6 +367,49 @@ from dlt.sources.helpers.rest_client.paginators import BasePaginator from dlt.sources.helpers.rest_client import RESTClient from dlt.sources.helpers.requests import Response, Request +class PostBodyPaginator(BasePaginator): + def __init__(self): + super().__init__() + self.cursor = None + + def update_state(self, response: Response) -> None: + # Assuming the API returns an empty list when no more data is available + if not response.json(): + self._has_next_page = False + else: + self.cursor = response.json().get("next_page_cursor") + + def update_request(self, request: Request) -> None: + if request.json is None: + request.json = {} + + # Add the cursor to the request body + request.json["cursor"] = self.cursor + +client = RESTClient( + base_url="https://api.example.com", + paginator=PostBodyPaginator() +) + +@dlt.resource +def get_data(): + for page in client.paginate("/data"): + yield page +``` + +:::tip +[`PageNumberPaginator`](#pagenumberpaginator) that ships with dlt does the same thing, but with more flexibility and error handling. This example is meant to demonstrate how to implement a custom paginator. For most use cases, you should use the [built-in paginators](#paginators). +::: + +#### Example 2: creating a paginator for POST requests + +Some APIs use POST requests for pagination, where the next page is fetched by sending a POST request with a cursor or other parameters in the request body. This is frequently used in "search" API endpoints or other endpoints with big payloads. Here's how you could implement a paginator for a case like this: + +```py +from dlt.sources.helpers.rest_client.paginators import BasePaginator +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.requests import Response, Request + class PostBodyPaginator(BasePaginator): def __init__(self): super().__init__() diff --git a/docs/website/docs/reference/performance_snippets/toml-snippets.toml b/docs/website/docs/reference/performance_snippets/toml-snippets.toml index d028e9b145..5e700c4e31 100644 --- a/docs/website/docs/reference/performance_snippets/toml-snippets.toml +++ b/docs/website/docs/reference/performance_snippets/toml-snippets.toml @@ -43,7 +43,7 @@ file_max_bytes=1000000 # @@@DLT_SNIPPET_START extract_workers_toml # for all sources and resources being extracted [extract] -worker=1 +workers=1 # for all resources in the zendesk_support source [sources.zendesk_support.extract]