diff --git a/dlt/extract/items.py b/dlt/extract/items.py index cc4e366744..db53794f75 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -245,19 +245,15 @@ def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: class LimitItem(ItemTransform[TDataItem]): placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental - def __init__( - self, max_items: Optional[int], max_time: Optional[float], min_wait: Optional[float] - ) -> None: + def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None: self.max_items = max_items if max_items is not None else -1 self.max_time = max_time - self.min_wait = min_wait def bind(self, pipe: SupportsPipe) -> "LimitItem": self.gen = pipe.gen self.count = 0 self.exhausted = False self.start_time = time.time() - self.last_call_time = 0.0 return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: @@ -274,15 +270,4 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: return None self.count += 1 - # if we have a min wait and the last iteration was less than min wait ago, - # we sleep on this thread a bit - if self.min_wait and (time.time() - self.last_call_time) < self.min_wait: - # NOTE: this should be interruptable? - # NOTE: this is sleeping on the main thread, we should carefully document this! - time.sleep(self.min_wait - (time.time() - self.last_call_time)) - - # remember last iteration time - if self.min_wait: - self.last_call_time = time.time() - return item diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index cd40ccb87b..39206f6d50 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -345,7 +345,6 @@ def add_limit( self: TDltResourceImpl, max_items: Optional[int] = None, max_time: Optional[float] = None, - min_wait: Optional[float] = None, ) -> TDltResourceImpl: # noqa: A003 """Adds a limit `max_items` to the resource pipe. @@ -359,7 +358,6 @@ def add_limit( Args: max_items (int): The maximum number of items to yield, set to None for no limit max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit - min_wait (float): The minimum number of seconds to wait between iterations (useful for rate limiting) Returns: "DltResource": returns self """ @@ -372,7 +370,7 @@ def add_limit( else: # remove existing limit if any self._pipe.remove_by_type(LimitItem) - self.add_step(LimitItem(max_items=max_items, max_time=max_time, min_wait=min_wait)) + self.add_step(LimitItem(max_items=max_items, max_time=max_time)) return self diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 946b0ee2b8..80fb8017ad 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -941,32 +941,6 @@ async def r_async(): assert async_list in allowed_results -def test_limit_min_wait() -> None: - @dlt.resource() - def r(): - for i in range(100): - yield i - - @dlt.resource() - async def r_async(): - for i in range(100): - yield i - - sync_list = list(r().add_limit(max_time=1, min_wait=0.2)) - async_list = list(r_async().add_limit(max_time=1, min_wait=0.2)) - - # we should have extracted about 5 items within 1 second, sleep is done via min_wait - allowed_results = [ - list(range(3)), - list(range(4)), - list(range(5)), - list(range(6)), - list(range(7)), - ] - assert sync_list in allowed_results - assert async_list in allowed_results - - def test_source_state() -> None: @dlt.source def test_source(expected_state):