diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index c6ca1660f4..fb40c3bd55 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -1,4 +1,5 @@ import inspect +import time from functools import partial from typing import ( AsyncIterable, @@ -11,6 +12,7 @@ Union, Any, Optional, + Literal, ) from typing_extensions import TypeVar, Self @@ -345,8 +347,14 @@ def add_filter( self._pipe.insert_step(FilterItem(item_filter), insert_at) return self - def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # noqa: A003 - """Adds a limit `max_items` to the resource pipe. + def add_limit( + self: TDltResourceImpl, + max_items: Optional[int] = None, + max_time: Optional[float] = None, + min_wait: Optional[float] = None, + ) -> TDltResourceImpl: # noqa: A003 + """Adds a limit `max_items` to the resource pipe + This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging. @@ -356,7 +364,9 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. Args: - max_items (int): The maximum number of items to yield + max_items (int): The maximum number of items to yield, set to None for no limit + max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit + min_wait (float): The minimum number of seconds to wait between iterations (usedful for rate limiting) Returns: "DltResource": returns self """ @@ -368,10 +378,22 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no def _gen_wrap(gen: TPipeStep) -> TPipeStep: """Wrap a generator to take the first `max_items` records""" + if max_items >= 0 or max_time and not self.incremental: + from dlt.common import logger + + logger.warning( + f"You have added a max_items or max_time limit to resource {self.name}, but no" + " incremental was declared." + ) + # zero items should produce empty generator if max_items == 0: return + # vars needed for max time and rate limiting + start_time: float = time.time() + last_iteration: float = start_time + count = 0 is_async_gen = False if callable(gen): @@ -384,7 +406,10 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: try: for i in gen: # type: ignore # TODO: help me fix this later + # return item to caller yield i + + # evaluate stop conditions if i is not None: count += 1 # async gen yields awaitable so we must count one awaitable more @@ -392,6 +417,21 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: # new awaitable will be cancelled if count == max_items + int(is_async_gen): return + # if we crossed the max time, we will stop + if max_time and time.time() - start_time > max_time: + return + + # apply rate limiting + if min_wait: + while (last_iteration + min_wait) - time.time() > 0: + # we give control back to the pipe iterator + yield None + time.sleep(0.1) + + # remember last iteration time + if min_wait: + last_iteration = time.time() + finally: if inspect.isgenerator(gen): gen.close() diff --git a/tests/extract/test_limits.py b/tests/extract/test_limits.py new file mode 100644 index 0000000000..4e09a9c86a --- /dev/null +++ b/tests/extract/test_limits.py @@ -0,0 +1,122 @@ +import dlt +import itertools +import pytest +import asyncio +import os +import time + + +@pytest.fixture(autouse=True) +def set_round_robin(): + """this can be removed after the round robin PR is merged to devel""" + os.environ["EXTRACT__NEXT_ITEM_MODE"] = "round_robin" + yield + del os.environ["EXTRACT__NEXT_ITEM_MODE"] + + +def test_item_limit_infinite_counter() -> None: + r = dlt.resource(itertools.count(), name="infinity").add_limit(10) + assert list(r) == list(range(10)) + + +def test_item_limit_source() -> None: + os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo" + + def mul_c(item): + yield from "A" * (item + 2) + + @dlt.source + def infinite_source(): + for idx in range(3): + r = dlt.resource(itertools.count(), name=f"infinity_{idx}").add_limit(10) + yield r + yield r | dlt.transformer(name=f"mul_c_{idx}")(mul_c) + + # transformer is not limited to 2 elements, infinite resource is, we have 3 resources + assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3 + + +@pytest.mark.parametrize("limit", (None, -1, 0, 10)) +def test_item_limit_edge_cases(limit: int) -> None: + r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore + + @dlt.resource() + async def r_async(): + for i in range(20): + await asyncio.sleep(0.01) + yield i + + sync_list = list(r) + async_list = list(r_async().add_limit(limit)) + + if limit == 10: + assert sync_list == list(range(10)) + # we have edge cases where the async list will have one extra item + # possibly due to timing issues, maybe some other implementation problem + assert (async_list == list(range(10))) or (async_list == list(range(11))) + elif limit in [None, -1]: + assert sync_list == async_list == list(range(20)) + elif limit == 0: + assert sync_list == async_list == [] + else: + raise AssertionError(f"Unexpected limit: {limit}") + + +def test_time_limit() -> None: + @dlt.resource() + def r(): + for i in range(100): + time.sleep(0.1) + yield i + + @dlt.resource() + async def r_async(): + for i in range(100): + await asyncio.sleep(0.1) + yield i + + sync_list = list(r().add_limit(max_time=1)) + async_list = list(r_async().add_limit(max_time=1)) + + # we should have extracted 10 items within 1 second, sleep is included in the resource + allowed_results = [ + list(range(12)), + list(range(11)), + list(range(10)), + list(range(9)), + list(range(8)), + ] + assert sync_list in allowed_results + assert async_list in allowed_results + + +def test_min_wait() -> None: + @dlt.resource() + def r(): + for i in range(100): + yield i + + @dlt.resource() + async def r_async(): + for i in range(100): + yield i + + sync_list = list(r().add_limit(max_time=1, min_wait=0.2)) + async_list = list(r_async().add_limit(max_time=1, min_wait=0.2)) + + # we should have extracted about 5 items within 1 second, sleep is done via min_wait + allowed_results = [ + list(range(3)), + list(range(4)), + list(range(5)), + list(range(6)), + list(range(7)), + ] + assert sync_list in allowed_results + assert async_list in allowed_results + + +# TODO: Test behavior in pipe iterator with more than one resource with different extraction modes set. We want to see if an overall rate limiting can be achieved with fifo which +# will be useful for APIs. Also round robin should apply rate limiting individually and not get stuck on one iterator sleeping. + +# it would also be nice to be able to test the logger warnings if no incremental is present diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 3d021d5d10..820ab84f3a 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -830,52 +830,6 @@ def test_add_transformer_right_pipe() -> None: iter([1, 2, 3]) | dlt.resource(lambda i: i * 3, name="lambda") -def test_limit_infinite_counter() -> None: - r = dlt.resource(itertools.count(), name="infinity").add_limit(10) - assert list(r) == list(range(10)) - - -@pytest.mark.parametrize("limit", (None, -1, 0, 10)) -def test_limit_edge_cases(limit: int) -> None: - r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore - - @dlt.resource() - async def r_async(): - for i in range(20): - await asyncio.sleep(0.01) - yield i - - sync_list = list(r) - async_list = list(r_async().add_limit(limit)) - - if limit == 10: - assert sync_list == list(range(10)) - # we have edge cases where the async list will have one extra item - # possibly due to timing issues, maybe some other implementation problem - assert (async_list == list(range(10))) or (async_list == list(range(11))) - elif limit in [None, -1]: - assert sync_list == async_list == list(range(20)) - elif limit == 0: - assert sync_list == async_list == [] - else: - raise AssertionError(f"Unexpected limit: {limit}") - - -def test_limit_source() -> None: - def mul_c(item): - yield from "A" * (item + 2) - - @dlt.source - def infinite_source(): - for idx in range(3): - r = dlt.resource(itertools.count(), name=f"infinity_{idx}").add_limit(10) - yield r - yield r | dlt.transformer(name=f"mul_c_{idx}")(mul_c) - - # transformer is not limited to 2 elements, infinite resource is, we have 3 resources - assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3 - - def test_source_state() -> None: @dlt.source def test_source(expected_state):