From 511df6ee6b97bcff8e605b1bac0098814faf2dfe Mon Sep 17 00:00:00 2001 From: Maxime Lemaitre Date: Thu, 16 May 2024 10:46:59 +0200 Subject: [PATCH 1/6] Fix typo in Slack Docs (#1369) --- docs/website/docs/dlt-ecosystem/verified-sources/slack.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/slack.md b/docs/website/docs/dlt-ecosystem/verified-sources/slack.md index 970a891e60..38eda15c94 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/slack.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/slack.md @@ -70,7 +70,7 @@ To get started with your data pipeline, follow these steps: [This command](../../reference/command-line-interface) will initialize [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/slack_pipeline.py) - with Google Sheets as the [source](../../general-usage/source) and + with Slack as the [source](../../general-usage/source) and [duckdb](../destinations/duckdb.md) as the [destination](../destinations). 1. If you'd like to use a different destination, simply replace `duckdb` with the name of your From 80e78204d6a81e3c02cdd481a5caa50e30c88bf6 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Thu, 16 May 2024 10:52:25 +0200 Subject: [PATCH 2/6] Add the troubleshooting section (#1367) --- .../docs/general-usage/http/rest-client.md | 70 ++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index ca39046d35..481670ae4b 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -385,7 +385,7 @@ class PostBodyPaginator(BasePaginator): # Add the cursor to the request body request.json["cursor"] = self.cursor - + client = RESTClient( base_url="https://api.example.com", paginator=PostBodyPaginator() @@ -527,4 +527,70 @@ from dlt.sources.helpers.rest_client import paginate for page in paginate("https://api.example.com/posts"): print(page) -``` \ No newline at end of file +``` + +## Troubleshooting + +### `RESTClient.get()` and `RESTClient.post()` methods + +These methods work similarly to the [get()](https://docs.python-requests.org/en/latest/api/#requests.get) and [post()](https://docs.python-requests.org/en/latest/api/#requests.post) functions +from the Requests library. They return a [Response](https://docs.python-requests.org/en/latest/api/#requests.Response) object that contains the response data. +You can inspect the `Response` object to get the `response.status_code`, `response.headers`, and `response.content`. For example: + +```py +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + +client = RESTClient(base_url="https://api.example.com") +response = client.get("/posts", auth=BearerTokenAuth(token="your_access_token")) + +print(response.status_code) +print(response.headers) +print(response.content) +``` + +### `RESTClient.paginate()` + +Debugging `paginate()` is trickier because it's a generator function that yields [`PageData`](#pagedata) objects. Here's several ways to debug the `paginate()` method: + +1. Enable [logging](../../running-in-production/running.md#set-the-log-level-and-format) to see detailed information about the HTTP requests: + +```bash +RUNTIME__LOG_LEVEL=INFO python my_script.py +``` + +2. Use the [`PageData`](#pagedata) instance to inspect the [request](https://docs.python-requests.org/en/latest/api/#requests.Request) +and [response](https://docs.python-requests.org/en/latest/api/#requests.Response) objects: + +```py +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator + +client = RESTClient( + base_url="https://api.example.com", + paginator=JSONResponsePaginator(next_url_path="pagination.next") +) + +for page in client.paginate("/posts"): + print(page.request) + print(page.response) +``` + +3. Use the `hooks` parameter to add custom response handlers to the `paginate()` method: + +```py +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + +def response_hook(response, **kwargs): + print(response.status_code) + print(f"Content: {response.content}") + print(f"Request: {response.request.body}") + # Or import pdb; pdb.set_trace() to debug + +for page in client.paginate( + "/posts", + auth=BearerTokenAuth(token="your_access_token") + hooks={"response": [response_hook]} +): + print(page) +``` From 314e7a026619c5fc793ca7408581baa2d71d7e13 Mon Sep 17 00:00:00 2001 From: Sultan Iman <354868+sultaniman@users.noreply.github.com> Date: Thu, 16 May 2024 14:53:53 +0200 Subject: [PATCH 3/6] Replace weather api example with github in create a pipeline walkthrough (#1351) Co-authored-by: AstrakhantsevaAA Co-authored-by: Anton Burnashev --- .../docs/walkthroughs/create-a-pipeline.md | 142 +++++++++++------- 1 file changed, 85 insertions(+), 57 deletions(-) diff --git a/docs/website/docs/walkthroughs/create-a-pipeline.md b/docs/website/docs/walkthroughs/create-a-pipeline.md index 1d5974efbe..bba78dc6cb 100644 --- a/docs/website/docs/walkthroughs/create-a-pipeline.md +++ b/docs/website/docs/walkthroughs/create-a-pipeline.md @@ -1,31 +1,46 @@ --- title: Create a pipeline description: How to create a pipeline -keywords: [how to, create a pipeline] +keywords: [how to, create a pipeline, rest client] --- # Create a pipeline -Follow the steps below to create a [pipeline](../general-usage/glossary.md#pipeline) from the -WeatherAPI.com API to DuckDB from scratch. The same steps can be repeated for any source and -destination of your choice—use `dlt init ` and then build the pipeline for -that API instead. +This guide walks you through creating a pipeline that uses our [REST API Client](../general-usage/http/rest-client) +to connect to [DuckDB](../dlt-ecosystem/destinations/duckdb). +:::tip +We're using DuckDB as a destination here, but you can adapt the steps to any [source](https://dlthub.com/docs/dlt-ecosystem/verified-sources/) and [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) by +using the [command](../reference/command-line-interface#dlt-init) `dlt init ` and tweaking the pipeline accordingly. +::: -Please make sure you have [installed `dlt`](../reference/installation.md) before following the +Please make sure you have [installed `dlt`](../reference/installation) before following the steps below. +## Task overview + +Imagine you want to analyze issues from a GitHub project locally. +To achieve this, you need to write code that accomplishes the following: + +1. Constructs a correct request. +2. Authenticates your request. +3. Fetches and handles paginated issue data. +4. Stores the data for analysis. + +This may sound complicated, but dlt provides a [REST API Client](../general-usage/http/rest-client) that allows you to focus more on your data rather than on managing API interactions. + + ## 1. Initialize project Create a new empty directory for your `dlt` project by running: ```sh -mkdir weatherapi_duckdb && cd weatherapi_duckdb +mkdir github_api_duckdb && cd github_api_duckdb ``` Start a `dlt` project with a pipeline template that loads data to DuckDB by running: ```sh -dlt init weatherapi duckdb +dlt init github_api duckdb ``` Install the dependencies necessary for DuckDB: @@ -34,114 +49,127 @@ Install the dependencies necessary for DuckDB: pip install -r requirements.txt ``` -## 2. Add WeatherAPI.com API credentials +## 2. Obtain and add API credentials from GitHub -You will need to [sign up for the WeatherAPI.com API](https://www.weatherapi.com/signup.aspx). +You will need to [sign in](https://github.com/login) to your GitHub account and create your access token via [Personal access tokens page](https://github.com/settings/tokens). -Once you do this, you should see your `API Key` at the top of your -[user page](https://www.weatherapi.com/my/). - -Copy the value of the API key into `.dlt/secrets.toml`: +Copy your new access token over to `.dlt/secrets.toml`: ```toml [sources] api_secret_key = '' ``` -The **secret name** corresponds to the **argument name** in the source function. Below `api_secret_key` [will get its value](../general-usage/credentials/configuration.md#general-usage-and-an-example) from `secrets.toml` when `weatherapi_source()` is called. + +This token will be used by `github_api_source()` to authenticate requests. + +The **secret name** corresponds to the **argument name** in the source function. +Below `api_secret_key` [will get its value](../general-usage/credentials/configuration#allow-dlt-to-pass-the-config-and-secrets-automatically) +from `secrets.toml` when `github_api_source()` is called. + ```py @dlt.source -def weatherapi_source(api_secret_key=dlt.secrets.value): - ... +def github_api_source(api_secret_key: str = dlt.secrets.value): + return github_api_resource(api_secret_key=api_secret_key) ``` -Run the `weatherapi.py` pipeline script to test that authentication headers look fine: +Run the `github_api.py` pipeline script to test that authentication headers look fine: ```sh -python3 weatherapi.py +python github_api.py ``` Your API key should be printed out to stdout along with some test data. -## 3. Request data from the WeatherAPI.com API +## 3. Request project issues from then GitHub API -Replace the definition of the `weatherapi_resource` function definition in the `weatherapi.py` -pipeline script with a call to the WeatherAPI.com API: -```py -@dlt.resource(write_disposition="append") -def weatherapi_resource(api_secret_key=dlt.secrets.value): - url = "https://api.weatherapi.com/v1/current.json" - params = { - "q": "NYC", - "key": api_secret_key - } - response = requests.get(url, params=params) - response.raise_for_status() - yield response.json() -``` +:::tip +We will use `dlt` repository as an example GitHub project https://github.com/dlt-hub/dlt, feel free to replace it with your own repository. +::: -Run the `weatherapi.py` pipeline script to test that the API call works: +Modify `github_api_resource` in `github_api.py` to request issues data from your GitHub project's API: -```sh -python3 weatherapi.py +```py +from dlt.sources.helpers.rest_client import paginate +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth +from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator + +@dlt.resource(write_disposition="replace") +def github_api_resource(api_secret_key: str = dlt.secrets.value): + url = "https://api.github.com/repos/dlt-hub/dlt/issues" + + for page in paginate( + url, + auth=BearerTokenAuth(api_secret_key), + paginator=HeaderLinkPaginator(), + params={"state": "open"} + ): + yield page ``` -This should print out the weather in New York City right now. - ## 4. Load the data -Remove the `exit()` call from the `main` function in `weatherapi.py`, so that running the -`python3 weatherapi.py` command will now also run the pipeline: +Uncomment the commented out code in `main` function in `github_api.py`, so that running the +`python github_api.py` command will now also run the pipeline: ```py if __name__=='__main__': - # configure the pipeline with your destination details pipeline = dlt.pipeline( - pipeline_name='weatherapi', + pipeline_name='github_api_pipeline', destination='duckdb', - dataset_name='weatherapi_data' + dataset_name='github_api_data' ) # print credentials by running the resource - data = list(weatherapi_resource()) + data = list(github_api_resource()) # print the data yielded from resource print(data) # run the pipeline with your parameters - load_info = pipeline.run(weatherapi_source()) + load_info = pipeline.run(github_api_source()) # pretty print the information on data that was loaded print(load_info) ``` -Run the `weatherapi.py` pipeline script to load data into DuckDB: + +Run the `github_api.py` pipeline script to test that the API call works: ```sh -python3 weatherapi.py +python github_api.py ``` -Then this command to see that the data loaded: +This should print out JSON data containing the issues in the GitHub project. + +It also prints `load_info` object. + +Let's explore the loaded data with the [command](../reference/command-line-interface#show-tables-and-data-in-the-destination) `dlt pipeline show`. + +:::info +Make sure you have `streamlit` installed `pip install streamlit` +::: ```sh -dlt pipeline weatherapi show +dlt pipeline github_api_pipeline show ``` This will open a Streamlit app that gives you an overview of the data loaded. ## 5. Next steps -Now that you have a working pipeline, you have options for what to learn next: +With a functioning pipeline, consider exploring: +- Our [REST Client](../general-usage/http/rest-client). - [Deploy this pipeline with GitHub Actions](deploy-a-pipeline/deploy-with-github-actions), so that the data is automatically loaded on a schedule. - Transform the [loaded data](../dlt-ecosystem/transformations) with dbt or in Pandas DataFrames. -- Learn how to [run](../running-in-production/running.md), - [monitor](../running-in-production/monitoring.md), and - [alert](../running-in-production/alerting.md) when you put your pipeline in production. +- Learn how to [run](../running-in-production/running), + [monitor](../running-in-production/monitoring), and + [alert](../running-in-production/alerting) when you put your pipeline in production. - Try loading data to a different destination like - [Google BigQuery](../dlt-ecosystem/destinations/bigquery.md), - [Amazon Redshift](../dlt-ecosystem/destinations/redshift.md), or - [Postgres](../dlt-ecosystem/destinations/postgres.md). + [Google BigQuery](../dlt-ecosystem/destinations/bigquery), + [Amazon Redshift](../dlt-ecosystem/destinations/redshift), or + [Postgres](../dlt-ecosystem/destinations/postgres). From ca154015d8cdde3d0a0922389839463467d50ee9 Mon Sep 17 00:00:00 2001 From: Ilya Gurov Date: Thu, 16 May 2024 20:25:46 +0400 Subject: [PATCH 4/6] feat(pipeline): add an ability to auto truncate (#1292) * feat(pipeline): add an ability to auto truncate staging destination after load * lint fix * fix typo * improve tests * truncate dataset * do truncation after all the load is finished * fix the test, which already expects warnings * add docs, tests * lint fix * lint fix * fixes * fix typo * delete excess comment * fix the test * additional conditions for assert * use qualified name * lint fix * lint fix * fix tests * fix the test * fix test * if staging is not used, don't test it * test fix for clickhouse * test fix * uses with_staging_dataset correctly --------- Co-authored-by: Marcin Rudolf --- dlt/load/configuration.py | 3 ++ dlt/load/load.py | 35 ++++++++++++++++++- dlt/pipeline/pipeline.py | 1 + .../docs/running-in-production/running.md | 6 ++++ .../airflow_tests/test_airflow_wrapper.py | 12 ++++++- tests/load/pipeline/test_pipelines.py | 17 +++++++++ tests/pipeline/test_pipeline.py | 32 ++++++++++++++++- 7 files changed, 103 insertions(+), 3 deletions(-) diff --git a/dlt/load/configuration.py b/dlt/load/configuration.py index 97cf23fdfc..b3fc2fbcd4 100644 --- a/dlt/load/configuration.py +++ b/dlt/load/configuration.py @@ -15,6 +15,9 @@ class LoaderConfiguration(PoolRunnerConfiguration): raise_on_max_retries: int = 5 """When gt 0 will raise when job reaches raise_on_max_retries""" _load_storage_config: LoadStorageConfiguration = None + # if set to `True`, the staging dataset will be + # truncated after loading the data + truncate_staging_dataset: bool = False def on_resolved(self) -> None: self.pool_type = "none" if self.workers == 1 else "thread" diff --git a/dlt/load/load.py b/dlt/load/load.py index 66ddb1c308..9d898bc54d 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -53,7 +53,7 @@ LoadClientUnsupportedWriteDisposition, LoadClientUnsupportedFileFormats, ) -from dlt.load.utils import get_completed_table_chain, init_client +from dlt.load.utils import _extend_tables_with_table_chain, get_completed_table_chain, init_client class Load(Runnable[Executor], WithStepInfo[LoadMetrics, LoadInfo]): @@ -348,6 +348,8 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False) ) ): job_client.complete_load(load_id) + self._maybe_trancate_staging_dataset(schema, job_client) + self.load_storage.complete_load_package(load_id, aborted) # collect package info self._loaded_packages.append(self.load_storage.get_load_package_info(load_id)) @@ -490,6 +492,37 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics: return TRunMetrics(False, len(self.load_storage.list_normalized_packages())) + def _maybe_trancate_staging_dataset(self, schema: Schema, job_client: JobClientBase) -> None: + """ + Truncate the staging dataset if one used, + and configuration requests truncation. + + Args: + schema (Schema): Schema to use for the staging dataset. + job_client (JobClientBase): + Job client to use for the staging dataset. + """ + if not ( + isinstance(job_client, WithStagingDataset) and self.config.truncate_staging_dataset + ): + return + + data_tables = schema.data_table_names() + tables = _extend_tables_with_table_chain( + schema, data_tables, data_tables, job_client.should_load_data_to_staging_dataset + ) + + try: + with self.get_destination_client(schema) as client: + with client.with_staging_dataset(): # type: ignore + client.initialize_storage(truncate_tables=tables) + + except Exception as exc: + logger.warn( + f"Staging dataset truncate failed due to the following error: {exc}" + " However, it didn't affect the data integrity." + ) + def get_step_info( self, pipeline: SupportsPipeline, diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index a2ea1936a9..53770f332d 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -554,6 +554,7 @@ def load( with signals.delayed_signals(): runner.run_pool(load_step.config, load_step) info: LoadInfo = self._get_step_info(load_step) + self.first_run = False return info except Exception as l_ex: diff --git a/docs/website/docs/running-in-production/running.md b/docs/website/docs/running-in-production/running.md index 253a27d942..9c52f58caa 100644 --- a/docs/website/docs/running-in-production/running.md +++ b/docs/website/docs/running-in-production/running.md @@ -108,6 +108,12 @@ behind. In `config.toml`: load.delete_completed_jobs=true ``` +Also, by default, `dlt` leaves data in staging dataset, used during merge and replace load for deduplication. In order to clear it, put the following line in `config.toml`: + +```toml +load.truncate_staging_dataset=true +``` + ## Using slack to send messages `dlt` provides basic support for sending slack messages. You can configure Slack incoming hook via diff --git a/tests/helpers/airflow_tests/test_airflow_wrapper.py b/tests/helpers/airflow_tests/test_airflow_wrapper.py index 845800e47f..533d16c998 100644 --- a/tests/helpers/airflow_tests/test_airflow_wrapper.py +++ b/tests/helpers/airflow_tests/test_airflow_wrapper.py @@ -384,7 +384,17 @@ def dag_parallel(): with mock.patch("dlt.helpers.airflow_helper.logger.warn") as warn_mock: dag_def = dag_parallel() dag_def.test() - warn_mock.assert_called_once() + warn_mock.assert_has_calls( + [ + mock.call( + "The resource resource2 in task" + " mock_data_incremental_source_resource1-resource2 is using incremental loading" + " and may modify the state. Resources that modify the state should not run in" + " parallel within the single pipeline as the state will not be correctly" + " merged. Please use 'serialize' or 'parallel-isolated' modes instead." + ) + ] + ) def test_parallel_isolated_run(): diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index a498b570a0..d98f335d16 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -10,6 +10,7 @@ from dlt.common.pipeline import SupportsPipeline from dlt.common.destination import Destination from dlt.common.destination.exceptions import DestinationHasFailedJobs +from dlt.common.destination.reference import WithStagingDataset from dlt.common.schema.exceptions import CannotCoerceColumnException from dlt.common.schema.schema import Schema from dlt.common.schema.typing import VERSION_TABLE_NAME @@ -896,6 +897,7 @@ def test_pipeline_upfront_tables_two_loads( # use staging tables for replace os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy + os.environ["TRUNCATE_STAGING_DATASET"] = "True" pipeline = destination_config.setup_pipeline( "test_pipeline_upfront_tables_two_loads", @@ -1001,6 +1003,21 @@ def table_3(make_data=False): is True ) + job_client, _ = pipeline._get_destination_clients(schema) + + if destination_config.staging and isinstance(job_client, WithStagingDataset): + for i in range(1, 4): + with pipeline.sql_client() as client: + table_name = f"table_{i}" + + if job_client.should_load_data_to_staging_dataset( + job_client.schema.tables[table_name] + ): + with client.with_staging_dataset(staging=True): + tab_name = client.make_qualified_table_name(table_name) + with client.execute_query(f"SELECT * FROM {tab_name}") as cur: + assert len(cur.fetchall()) == 0 + # @pytest.mark.skip(reason="Finalize the test: compare some_data values to values from database") # @pytest.mark.parametrize( diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index a828de40fd..1c4383405b 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -5,9 +5,9 @@ import logging import os import random +import threading from time import sleep from typing import Any, Tuple, cast -import threading from tenacity import retry_if_exception, Retrying, stop_after_attempt import pytest @@ -2230,3 +2230,33 @@ def stateful_resource(): assert len(fs_client.list_table_files("_dlt_loads")) == 2 assert len(fs_client.list_table_files("_dlt_version")) == 1 assert len(fs_client.list_table_files("_dlt_pipeline_state")) == 1 + + +@pytest.mark.parametrize("truncate", (True, False)) +def test_staging_dataset_truncate(truncate) -> None: + dlt.config["truncate_staging_dataset"] = truncate + + @dlt.resource(write_disposition="merge", merge_key="id") + def test_data(): + yield [{"field": 1, "id": 1}, {"field": 2, "id": 2}, {"field": 3, "id": 3}] + + pipeline = dlt.pipeline( + pipeline_name="test_staging_cleared", + destination="duckdb", + full_refresh=True, + ) + + info = pipeline.run(test_data, table_name="staging_cleared") + assert_load_info(info) + + with pipeline.sql_client() as client: + with client.execute_query( + f"SELECT * FROM {pipeline.dataset_name}_staging.staging_cleared" + ) as cur: + if truncate: + assert len(cur.fetchall()) == 0 + else: + assert len(cur.fetchall()) == 3 + + with client.execute_query(f"SELECT * FROM {pipeline.dataset_name}.staging_cleared") as cur: + assert len(cur.fetchall()) == 3 From 920d41a773879acd26c24445ee8fd127385c434f Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Thu, 16 May 2024 12:39:07 -0400 Subject: [PATCH 5/6] Add recommended_file_size cap to limit data writer file size (#1368) --- dlt/common/data_writers/buffered.py | 3 ++ dlt/common/destination/capabilities.py | 2 + dlt/destinations/impl/bigquery/__init__.py | 2 + tests/common/data_writers/utils.py | 6 +-- .../data_writers/test_buffered_writer.py | 38 ++++++++++++++++++- 5 files changed, 47 insertions(+), 4 deletions(-) diff --git a/dlt/common/data_writers/buffered.py b/dlt/common/data_writers/buffered.py index fdd5b50111..bd32c68c49 100644 --- a/dlt/common/data_writers/buffered.py +++ b/dlt/common/data_writers/buffered.py @@ -55,7 +55,10 @@ def __init__( self.closed_files: List[DataWriterMetrics] = [] # all fully processed files # buffered items must be less than max items in file self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items) + # Explicitly configured max size supersedes destination limit self.file_max_bytes = file_max_bytes + if self.file_max_bytes is None and _caps: + self.file_max_bytes = _caps.recommended_file_size self.file_max_items = file_max_items # the open function is either gzip.open or open self.open = ( diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index e74f5a980d..089b4a1d5e 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -29,6 +29,8 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): preferred_loader_file_format: TLoaderFileFormat = None supported_loader_file_formats: Sequence[TLoaderFileFormat] = None + recommended_file_size: Optional[int] = None + """Recommended file size in bytes when writing extract/load files""" preferred_staging_file_format: Optional[TLoaderFileFormat] = None supported_staging_file_formats: Sequence[TLoaderFileFormat] = None escape_identifier: Callable[[str], str] = None diff --git a/dlt/destinations/impl/bigquery/__init__.py b/dlt/destinations/impl/bigquery/__init__.py index d33466ed5e..39322b43a0 100644 --- a/dlt/destinations/impl/bigquery/__init__.py +++ b/dlt/destinations/impl/bigquery/__init__.py @@ -12,6 +12,8 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supported_loader_file_formats = ["jsonl", "parquet"] caps.preferred_staging_file_format = "parquet" caps.supported_staging_file_formats = ["parquet", "jsonl"] + # BQ limit is 4GB but leave a large headroom since buffered writer does not preemptively check size + caps.recommended_file_size = int(1024 * 1024 * 1024) caps.escape_identifier = escape_bigquery_identifier caps.escape_literal = None caps.format_datetime_literal = format_bigquery_datetime_literal diff --git a/tests/common/data_writers/utils.py b/tests/common/data_writers/utils.py index 2cb440bde1..e6e377b7d0 100644 --- a/tests/common/data_writers/utils.py +++ b/tests/common/data_writers/utils.py @@ -1,5 +1,5 @@ import os -from typing import Type +from typing import Type, Optional from dlt.common.data_writers.buffered import BufferedDataWriter from dlt.common.data_writers.writers import TWriter, ALL_WRITERS @@ -18,8 +18,8 @@ def get_writer( writer: Type[TWriter], buffer_max_items: int = 10, - file_max_items: int = 10, - file_max_bytes: int = None, + file_max_items: Optional[int] = 10, + file_max_bytes: Optional[int] = None, disable_compression: bool = False, caps: DestinationCapabilitiesContext = None, ) -> BufferedDataWriter[TWriter]: diff --git a/tests/extract/data_writers/test_buffered_writer.py b/tests/extract/data_writers/test_buffered_writer.py index 82b81a1cd7..b6da132de9 100644 --- a/tests/extract/data_writers/test_buffered_writer.py +++ b/tests/extract/data_writers/test_buffered_writer.py @@ -2,6 +2,7 @@ import pytest import time from typing import Iterator, Type +from uuid import uuid4 from dlt.common.data_writers.exceptions import BufferedDataWriterClosed from dlt.common.data_writers.writers import ( @@ -11,7 +12,7 @@ JsonlWriter, ALL_WRITERS, ) -from dlt.common.destination.capabilities import TLoaderFileFormat +from dlt.common.destination.capabilities import TLoaderFileFormat, DestinationCapabilitiesContext from dlt.common.schema.utils import new_column from dlt.common.storages.file_storage import FileStorage @@ -330,3 +331,38 @@ def test_special_write_rotates(disable_compression: bool, writer_type: Type[Data metrics = writer.import_file( "tests/extract/cases/imported.any", DataWriterMetrics("", 1, 231, 0, 0) ) + + +@pytest.mark.parametrize( + "disable_compression", [True, False], ids=["no_compression", "compression"] +) +@pytest.mark.parametrize("writer_type", ALL_OBJECT_WRITERS) +def test_rotation_on_destination_caps_recommended_file_size( + disable_compression: bool, writer_type: Type[DataWriter] +) -> None: + caps = DestinationCapabilitiesContext.generic_capabilities() + caps.recommended_file_size = int(250 * 1024) + columns = {"id": new_column("id", "text")} + with get_writer( + writer_type, + disable_compression=disable_compression, + buffer_max_items=100, + file_max_items=None, + file_max_bytes=None, + caps=caps, + ) as writer: + for i in range(8): + # Data chunk approximately 40kb serialized + items = [{"id": str(uuid4())} for _ in range(1000)] + writer.write_data_item(items, columns) + if i < 5: + assert not writer.closed_files + + if i > 5: + # We should have written atleast 250kb by now and have rotated the file + assert len(writer.closed_files) == 1 + + # Check the files that were written are all within the recommended size + 1 chunk + assert len(writer.closed_files) == 2 + for file in writer.closed_files: + assert file.file_size < caps.recommended_file_size + 1024 * 50 From 5b0afa490112e842baa497583138bac3ce169699 Mon Sep 17 00:00:00 2001 From: rudolfix Date: Thu, 16 May 2024 21:14:29 +0200 Subject: [PATCH 6/6] limits mssql query size to fit network buffer (#1372) --- dlt/destinations/impl/mssql/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dlt/destinations/impl/mssql/__init__.py b/dlt/destinations/impl/mssql/__init__.py index e9d9fe24fd..f7768d9238 100644 --- a/dlt/destinations/impl/mssql/__init__.py +++ b/dlt/destinations/impl/mssql/__init__.py @@ -17,7 +17,8 @@ def capabilities() -> DestinationCapabilitiesContext: # https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN caps.max_identifier_length = 128 caps.max_column_identifier_length = 128 - caps.max_query_length = 4 * 1024 * 64 * 1024 + # A SQL Query can be a varchar(max) but is shown as limited to 65,536 * Network Packet + caps.max_query_length = 65536 * 10 caps.is_max_query_length_in_bytes = True caps.max_text_data_type_length = 2**30 - 1 caps.is_max_text_data_type_length_in_bytes = False