From 3738c29b467f8a9e9807a0c9304f91c4e479f8c1 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 12 Dec 2024 08:42:27 +0100 Subject: [PATCH] fix limiting bug and update docs --- dlt/extract/items.py | 16 ++++++++++++---- docs/website/docs/general-usage/resource.md | 17 ++++++++++++++++- docs/website/docs/general-usage/source.md | 14 +++++++++++++- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/dlt/extract/items.py b/dlt/extract/items.py index db53794f75..b3279b1d61 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -257,17 +257,25 @@ def bind(self, pipe: SupportsPipe) -> "LimitItem": return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - # detect when the limit is reached, time or yield count - if (self.count == self.max_items) or ( - self.max_time and time.time() - self.start_time > self.max_time + self.count += 1 + + # detect when the limit is reached, max time or yield count + if ( + (self.count == self.max_items) + or (self.max_time and time.time() - self.start_time > self.max_time) + or self.max_items == 0 ): self.exhausted = True if inspect.isgenerator(self.gen): self.gen.close() + # if max items is not 0, we return the last item + # otherwise never return anything + if self.max_items != 0: + return item + # do not return any late arriving items if self.exhausted: return None - self.count += 1 return item diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 199eaf9b5d..b8d51caf75 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -405,11 +405,26 @@ dlt.pipeline(destination="duckdb").run(my_resource().add_limit(10)) The code above will extract `15*10=150` records. This is happening because in each iteration, 15 records are yielded, and we're limiting the number of iterations to 10. ::: -Some constraints of `add_limit` include: +Altenatively you can also apply a time limit to the resource. The code below will run the extraction for 10 seconds and extract how ever many items are yielded in that time. In combination with incrementals, this can be useful for batched loading or for loading on machines that have a run time limit. + +```py +dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_time=10)) +``` + +You can also apply a combination of both limits. In this case the extraction will stop as soon as either limit is reached. + +```py +dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_items=10, max_time=10)) +``` + + +Some notes about the `add_limit`: 1. `add_limit` does not skip any items. It closes the iterator/generator that produces data after the limit is reached. 2. You cannot limit transformers. They should process all the data they receive fully to avoid inconsistencies in generated datasets. 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. +4. Calling add limit on a resource will replace any previously set limits settings. +5. For time-limited resources, the timer starts when the first item is processed. When resources are processed sequentially (FIFO mode), each resource's time limit applies also sequentially. In the default round robin mode, the time limits will usually run concurrently. :::tip If you are parameterizing the value of `add_limit` and sometimes need it to be disabled, you can set `None` or `-1` to disable the limiting. diff --git a/docs/website/docs/general-usage/source.md b/docs/website/docs/general-usage/source.md index a5f1f04dee..92ebeedce5 100644 --- a/docs/website/docs/general-usage/source.md +++ b/docs/website/docs/general-usage/source.md @@ -108,8 +108,20 @@ load_info = pipeline.run(pipedrive_source().add_limit(10)) print(load_info) ``` +You can also apply a time limit to the source: + +```py +pipeline.run(pipedrive_source().add_limit(max_time=10)) +``` + +Or limit by both, the limit that is reached first will stop the extraction: + +```py +pipeline.run(pipedrive_source().add_limit(max_items=10, max_time=10)) +``` + :::note -Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached. +Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached. Please read in more detail about the `add_limit` on the resource page. ::: Find more on sampling data [here](resource.md#sample-from-large-data).