Skip to content

Commit

Permalink
update evaluation order tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jan 26, 2024
1 parent 3787c63 commit 05d0c55
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
5 changes: 3 additions & 2 deletions dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ def _get_source_item(self) -> ResolvablePipeItem:
# while we have more new sources than available future slots, we do strict fifo where we
# only ever check the first source, this is to prevent an uncontrolled number of sources
# being created in certain scenarios
force_strict_fifo = (sources_count - self._initial_sources_count) > self.max_parallel_items
force_strict_fifo = (sources_count - self._initial_sources_count) >= self.max_parallel_items
try:
# always reset to start of list for fifo mode
if self._next_item_mode == "fifo":
Expand All @@ -826,9 +826,11 @@ def _get_source_item(self) -> ResolvablePipeItem:
while True:
# in strict fifo mode we never check more than the top most source
if force_strict_fifo:
print("strict")
self._current_source_index = 0
else:
self._current_source_index = (self._current_source_index + 1) % sources_count
print(self._current_source_index)
# if we have checked all sources once and all returned None, then we can sleep a bit
if self._current_source_index == first_evaluated_index:
sleep(self.futures_poll_interval)
Expand Down Expand Up @@ -858,7 +860,6 @@ def _get_source_item(self) -> ResolvablePipeItem:
# we need to decrease the index to keep the round robin order
if self._next_item_mode == "round_robin":
self._current_source_index -= 1

return self._get_source_item()
except (PipelineException, ExtractorException, DltSourceException, PipeException):
raise
Expand Down
31 changes: 22 additions & 9 deletions tests/extract/test_extract_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@

def test_next_item_mode() -> None:
def nested_gen_level_2():
yield from [88, None, 89]
yield from [6, None, 7]

def nested_gen():
yield from [55, 56, None, 77, nested_gen_level_2()]
yield from [3, 4, None, 5, nested_gen_level_2(), 8, 9]

def source_gen1():
yield from [1, 2, nested_gen(), 3, 4]
yield from [1, 2, nested_gen(), 10, 11]

def source_gen2():
yield from range(11, 16)
yield from [12, 13]

def source_gen3():
yield from range(20, 22)
yield from [14, 15]

def get_pipes():
return [
Expand All @@ -39,13 +39,26 @@ def get_pipes():

# default mode is "fifo"
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo"))
# items will be in order of the pipes, nested iterator items appear inline, None triggers a bit of rotation
assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15]

# force strict mode, no rotation at all when crossing the initial source count
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo", max_parallel_items=1))
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 2, 55, 56, 3, 77, 88, 89, 4, 11, 12, 13, 14, 15, 20, 21]
assert [pi.item for pi in _l] == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

# round robin mode
# round robin eval
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin"))
# items will be round robin, nested iterators are integrated into the round robin
assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 3, 13, 55, 4, 14, 56, 15, 77, 88, 89]
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 10, 3, 11, 4, 5, 8, 6, 9, 7]

# round robin with max parallel items triggers strict fifo in some cases (after gen2 and 3 are exhausted we already have the first yielded gen,
# items appear in order as sources are processed strictly from front)
_l = list(
PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin", max_parallel_items=1)
)
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 4, 5, 6, 7, 8, 9, 10, 11]


def test_rotation_on_none() -> None:
Expand Down
4 changes: 2 additions & 2 deletions tests/pipeline/test_resources_evaluation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Any

import dlt, asyncio, pytest, os, threading, inspect, time
from functools import wraps
import dlt, asyncio, pytest, os


def test_async_iterator_resource() -> None:
# define an asynchronous iterator
@dlt.resource()
class AsyncIterator:
def __init__(self):
self.counter = 0
Expand Down

0 comments on commit 05d0c55

Please sign in to comment.