Skip to content

Commit

Permalink
first implementation of limits with some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 13, 2024
1 parent 0ea9de7 commit 5fc2324
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 49 deletions.
46 changes: 43 additions & 3 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import time
from functools import partial
from typing import (
AsyncIterable,
Expand All @@ -11,6 +12,7 @@
Union,
Any,
Optional,
Literal,
)
from typing_extensions import TypeVar, Self

Expand Down Expand Up @@ -345,8 +347,14 @@ def add_filter(
self._pipe.insert_step(FilterItem(item_filter), insert_at)
return self

def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # noqa: A003
"""Adds a limit `max_items` to the resource pipe.
def add_limit(
self: TDltResourceImpl,
max_items: Optional[int] = None,
max_time: Optional[float] = None,
min_wait: Optional[float] = None,
) -> TDltResourceImpl: # noqa: A003
"""Adds a limit `max_items` to the resource pipe
This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging.
Expand All @@ -356,7 +364,9 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no
3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic.
Args:
max_items (int): The maximum number of items to yield
max_items (int): The maximum number of items to yield, set to None for no limit
max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit
min_wait (float): The minimum number of seconds to wait between iterations (usedful for rate limiting)
Returns:
"DltResource": returns self
"""
Expand All @@ -368,10 +378,22 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no
def _gen_wrap(gen: TPipeStep) -> TPipeStep:
"""Wrap a generator to take the first `max_items` records"""

if max_items >= 0 or max_time and not self.incremental:
from dlt.common import logger

logger.warning(
f"You have added a max_items or max_time limit to resource {self.name}, but no"
" incremental was declared."
)

# zero items should produce empty generator
if max_items == 0:
return

# vars needed for max time and rate limiting
start_time: float = time.time()
last_iteration: float = start_time

count = 0
is_async_gen = False
if callable(gen):
Expand All @@ -384,14 +406,32 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep:

try:
for i in gen: # type: ignore # TODO: help me fix this later
# return item to caller
yield i

# evaluate stop conditions
if i is not None:
count += 1
# async gen yields awaitable so we must count one awaitable more
# so the previous one is evaluated and yielded.
# new awaitable will be cancelled
if count == max_items + int(is_async_gen):
return
# if we crossed the max time, we will stop
if max_time and time.time() - start_time > max_time:
return

# apply rate limiting
if min_wait:
while (last_iteration + min_wait) - time.time() > 0:
# we give control back to the pipe iterator
yield None
time.sleep(0.1)

# remember last iteration time
if min_wait:
last_iteration = time.time()

finally:
if inspect.isgenerator(gen):
gen.close()
Expand Down
122 changes: 122 additions & 0 deletions tests/extract/test_limits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import dlt
import itertools
import pytest
import asyncio
import os
import time


@pytest.fixture(autouse=True)
def set_round_robin():
"""this can be removed after the round robin PR is merged to devel"""
os.environ["EXTRACT__NEXT_ITEM_MODE"] = "round_robin"
yield
del os.environ["EXTRACT__NEXT_ITEM_MODE"]


def test_item_limit_infinite_counter() -> None:
r = dlt.resource(itertools.count(), name="infinity").add_limit(10)
assert list(r) == list(range(10))


def test_item_limit_source() -> None:
os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo"

def mul_c(item):
yield from "A" * (item + 2)

@dlt.source
def infinite_source():
for idx in range(3):
r = dlt.resource(itertools.count(), name=f"infinity_{idx}").add_limit(10)
yield r
yield r | dlt.transformer(name=f"mul_c_{idx}")(mul_c)

# transformer is not limited to 2 elements, infinite resource is, we have 3 resources
assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3


@pytest.mark.parametrize("limit", (None, -1, 0, 10))
def test_item_limit_edge_cases(limit: int) -> None:
r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore

@dlt.resource()
async def r_async():
for i in range(20):
await asyncio.sleep(0.01)
yield i

sync_list = list(r)
async_list = list(r_async().add_limit(limit))

if limit == 10:
assert sync_list == list(range(10))
# we have edge cases where the async list will have one extra item
# possibly due to timing issues, maybe some other implementation problem
assert (async_list == list(range(10))) or (async_list == list(range(11)))
elif limit in [None, -1]:
assert sync_list == async_list == list(range(20))
elif limit == 0:
assert sync_list == async_list == []
else:
raise AssertionError(f"Unexpected limit: {limit}")


def test_time_limit() -> None:
@dlt.resource()
def r():
for i in range(100):
time.sleep(0.1)
yield i

@dlt.resource()
async def r_async():
for i in range(100):
await asyncio.sleep(0.1)
yield i

sync_list = list(r().add_limit(max_time=1))
async_list = list(r_async().add_limit(max_time=1))

# we should have extracted 10 items within 1 second, sleep is included in the resource
allowed_results = [
list(range(12)),
list(range(11)),
list(range(10)),
list(range(9)),
list(range(8)),
]
assert sync_list in allowed_results
assert async_list in allowed_results


def test_min_wait() -> None:
@dlt.resource()
def r():
for i in range(100):
yield i

@dlt.resource()
async def r_async():
for i in range(100):
yield i

sync_list = list(r().add_limit(max_time=1, min_wait=0.2))
async_list = list(r_async().add_limit(max_time=1, min_wait=0.2))

# we should have extracted about 5 items within 1 second, sleep is done via min_wait
allowed_results = [
list(range(3)),
list(range(4)),
list(range(5)),
list(range(6)),
list(range(7)),
]
assert sync_list in allowed_results
assert async_list in allowed_results


# TODO: Test behavior in pipe iterator with more than one resource with different extraction modes set. We want to see if an overall rate limiting can be achieved with fifo which
# will be useful for APIs. Also round robin should apply rate limiting individually and not get stuck on one iterator sleeping.

# it would also be nice to be able to test the logger warnings if no incremental is present
46 changes: 0 additions & 46 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,52 +830,6 @@ def test_add_transformer_right_pipe() -> None:
iter([1, 2, 3]) | dlt.resource(lambda i: i * 3, name="lambda")


def test_limit_infinite_counter() -> None:
r = dlt.resource(itertools.count(), name="infinity").add_limit(10)
assert list(r) == list(range(10))


@pytest.mark.parametrize("limit", (None, -1, 0, 10))
def test_limit_edge_cases(limit: int) -> None:
r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore

@dlt.resource()
async def r_async():
for i in range(20):
await asyncio.sleep(0.01)
yield i

sync_list = list(r)
async_list = list(r_async().add_limit(limit))

if limit == 10:
assert sync_list == list(range(10))
# we have edge cases where the async list will have one extra item
# possibly due to timing issues, maybe some other implementation problem
assert (async_list == list(range(10))) or (async_list == list(range(11)))
elif limit in [None, -1]:
assert sync_list == async_list == list(range(20))
elif limit == 0:
assert sync_list == async_list == []
else:
raise AssertionError(f"Unexpected limit: {limit}")


def test_limit_source() -> None:
def mul_c(item):
yield from "A" * (item + 2)

@dlt.source
def infinite_source():
for idx in range(3):
r = dlt.resource(itertools.count(), name=f"infinity_{idx}").add_limit(10)
yield r
yield r | dlt.transformer(name=f"mul_c_{idx}")(mul_c)

# transformer is not limited to 2 elements, infinite resource is, we have 3 resources
assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3


def test_source_state() -> None:
@dlt.source
def test_source(expected_state):
Expand Down

0 comments on commit 5fc2324

Please sign in to comment.