Skip to content

Commit

Permalink
remove rate-limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Dec 12, 2024
1 parent 159b34c commit c662da1
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 45 deletions.
17 changes: 1 addition & 16 deletions dlt/extract/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,19 +245,15 @@ def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]:
class LimitItem(ItemTransform[TDataItem]):
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental

def __init__(
self, max_items: Optional[int], max_time: Optional[float], min_wait: Optional[float]
) -> None:
def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None:
self.max_items = max_items if max_items is not None else -1
self.max_time = max_time
self.min_wait = min_wait

def bind(self, pipe: SupportsPipe) -> "LimitItem":
self.gen = pipe.gen
self.count = 0
self.exhausted = False
self.start_time = time.time()
self.last_call_time = 0.0
return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
Expand All @@ -274,15 +270,4 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
return None
self.count += 1

# if we have a min wait and the last iteration was less than min wait ago,
# we sleep on this thread a bit
if self.min_wait and (time.time() - self.last_call_time) < self.min_wait:
# NOTE: this should be interruptable?
# NOTE: this is sleeping on the main thread, we should carefully document this!
time.sleep(self.min_wait - (time.time() - self.last_call_time))

# remember last iteration time
if self.min_wait:
self.last_call_time = time.time()

return item
4 changes: 1 addition & 3 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ def add_limit(
self: TDltResourceImpl,
max_items: Optional[int] = None,
max_time: Optional[float] = None,
min_wait: Optional[float] = None,
) -> TDltResourceImpl: # noqa: A003
"""Adds a limit `max_items` to the resource pipe.
Expand All @@ -359,7 +358,6 @@ def add_limit(
Args:
max_items (int): The maximum number of items to yield, set to None for no limit
max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit
min_wait (float): The minimum number of seconds to wait between iterations (useful for rate limiting)
Returns:
"DltResource": returns self
"""
Expand All @@ -372,7 +370,7 @@ def add_limit(
else:
# remove existing limit if any
self._pipe.remove_by_type(LimitItem)
self.add_step(LimitItem(max_items=max_items, max_time=max_time, min_wait=min_wait))
self.add_step(LimitItem(max_items=max_items, max_time=max_time))

return self

Expand Down
26 changes: 0 additions & 26 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,32 +941,6 @@ async def r_async():
assert async_list in allowed_results


def test_limit_min_wait() -> None:
@dlt.resource()
def r():
for i in range(100):
yield i

@dlt.resource()
async def r_async():
for i in range(100):
yield i

sync_list = list(r().add_limit(max_time=1, min_wait=0.2))
async_list = list(r_async().add_limit(max_time=1, min_wait=0.2))

# we should have extracted about 5 items within 1 second, sleep is done via min_wait
allowed_results = [
list(range(3)),
list(range(4)),
list(range(5)),
list(range(6)),
list(range(7)),
]
assert sync_list in allowed_results
assert async_list in allowed_results


def test_source_state() -> None:
@dlt.source
def test_source(expected_state):
Expand Down

0 comments on commit c662da1

Please sign in to comment.