Skip to content

Commit

Permalink
add more limit functions from branch
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Dec 11, 2024
1 parent 95e0e78 commit 159b34c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 13 deletions.
29 changes: 26 additions & 3 deletions dlt/extract/items.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import inspect
import time

from abc import ABC, abstractmethod
from typing import (
Any,
Expand Down Expand Up @@ -243,23 +245,44 @@ def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]:
class LimitItem(ItemTransform[TDataItem]):
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental

def __init__(self, max_items: int) -> None:
def __init__(
self, max_items: Optional[int], max_time: Optional[float], min_wait: Optional[float]
) -> None:
self.max_items = max_items if max_items is not None else -1
self.max_time = max_time
self.min_wait = min_wait

def bind(self, pipe: SupportsPipe) -> "LimitItem":
self.gen = pipe.gen
self.count = 0
self.exhausted = False
self.start_time = time.time()
self.last_call_time = 0.0
return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
# detect when the limit is reached
if self.count == self.max_items:
# detect when the limit is reached, time or yield count
if (self.count == self.max_items) or (
self.max_time and time.time() - self.start_time > self.max_time
):
self.exhausted = True
if inspect.isgenerator(self.gen):
self.gen.close()

# do not return any late arriving items
if self.exhausted:
return None
self.count += 1

# if we have a min wait and the last iteration was less than min wait ago,
# we sleep on this thread a bit
if self.min_wait and (time.time() - self.last_call_time) < self.min_wait:
# NOTE: this should be interruptable?
# NOTE: this is sleeping on the main thread, we should carefully document this!
time.sleep(self.min_wait - (time.time() - self.last_call_time))

# remember last iteration time
if self.min_wait:
self.last_call_time = time.time()

return item
27 changes: 17 additions & 10 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,20 +341,27 @@ def add_filter(
self._pipe.insert_step(FilterItem(item_filter), insert_at)
return self

def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # noqa: A003
def add_limit(
self: TDltResourceImpl,
max_items: Optional[int] = None,
max_time: Optional[float] = None,
min_wait: Optional[float] = None,
) -> TDltResourceImpl: # noqa: A003
"""Adds a limit `max_items` to the resource pipe.
This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging.
This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging.
Notes:
1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records.
3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic.
Notes:
1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records.
3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic.
Args:
max_items (int): The maximum number of items to yield
Returns:
"DltResource": returns self
max_items (int): The maximum number of items to yield, set to None for no limit
max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit
min_wait (float): The minimum number of seconds to wait between iterations (useful for rate limiting)
Returns:
"DltResource": returns self
"""

if self.is_transformer:
Expand All @@ -365,7 +372,7 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no
else:
# remove existing limit if any
self._pipe.remove_by_type(LimitItem)
self.add_step(LimitItem(max_items))
self.add_step(LimitItem(max_items=max_items, max_time=max_time, min_wait=min_wait))

return self

Expand Down
57 changes: 57 additions & 0 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import itertools
import time

from typing import Iterator

import pytest
Expand Down Expand Up @@ -910,6 +912,61 @@ def infinite_source():
assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3


def test_limit_max_time() -> None:
@dlt.resource()
def r():
for i in range(100):
time.sleep(0.1)
yield i

@dlt.resource()
async def r_async():
for i in range(100):
await asyncio.sleep(0.1)
yield i

sync_list = list(r().add_limit(max_time=1))
async_list = list(r_async().add_limit(max_time=1))

# we should have extracted 10 items within 1 second, sleep is included in the resource
# we allow for some variance in the number of items, as the sleep is not super precise
allowed_results = [
list(range(12)),
list(range(11)),
list(range(10)),
list(range(9)),
list(range(8)),
]
assert sync_list in allowed_results
assert async_list in allowed_results


def test_limit_min_wait() -> None:
@dlt.resource()
def r():
for i in range(100):
yield i

@dlt.resource()
async def r_async():
for i in range(100):
yield i

sync_list = list(r().add_limit(max_time=1, min_wait=0.2))
async_list = list(r_async().add_limit(max_time=1, min_wait=0.2))

# we should have extracted about 5 items within 1 second, sleep is done via min_wait
allowed_results = [
list(range(3)),
list(range(4)),
list(range(5)),
list(range(6)),
list(range(7)),
]
assert sync_list in allowed_results
assert async_list in allowed_results


def test_source_state() -> None:
@dlt.source
def test_source(expected_state):
Expand Down

0 comments on commit 159b34c

Please sign in to comment.