From 05d0c55a89f7c3136375996ea519804e0c6311c3 Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 26 Jan 2024 12:22:51 +0100 Subject: [PATCH] update evaluation order tests --- dlt/extract/pipe.py | 5 ++-- tests/extract/test_extract_pipe.py | 31 +++++++++++++++------ tests/pipeline/test_resources_evaluation.py | 4 +-- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 2a1666564b..0835ee41ad 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -817,7 +817,7 @@ def _get_source_item(self) -> ResolvablePipeItem: # while we have more new sources than available future slots, we do strict fifo where we # only ever check the first source, this is to prevent an uncontrolled number of sources # being created in certain scenarios - force_strict_fifo = (sources_count - self._initial_sources_count) > self.max_parallel_items + force_strict_fifo = (sources_count - self._initial_sources_count) >= self.max_parallel_items try: # always reset to start of list for fifo mode if self._next_item_mode == "fifo": @@ -826,9 +826,11 @@ def _get_source_item(self) -> ResolvablePipeItem: while True: # in strict fifo mode we never check more than the top most source if force_strict_fifo: + print("strict") self._current_source_index = 0 else: self._current_source_index = (self._current_source_index + 1) % sources_count + print(self._current_source_index) # if we have checked all sources once and all returned None, then we can sleep a bit if self._current_source_index == first_evaluated_index: sleep(self.futures_poll_interval) @@ -858,7 +860,6 @@ def _get_source_item(self) -> ResolvablePipeItem: # we need to decrease the index to keep the round robin order if self._next_item_mode == "round_robin": self._current_source_index -= 1 - return self._get_source_item() except (PipelineException, ExtractorException, DltSourceException, PipeException): raise diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index 2dab094ebf..f7869bc78b 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -16,19 +16,19 @@ def test_next_item_mode() -> None: def nested_gen_level_2(): - yield from [88, None, 89] + yield from [6, None, 7] def nested_gen(): - yield from [55, 56, None, 77, nested_gen_level_2()] + yield from [3, 4, None, 5, nested_gen_level_2(), 8, 9] def source_gen1(): - yield from [1, 2, nested_gen(), 3, 4] + yield from [1, 2, nested_gen(), 10, 11] def source_gen2(): - yield from range(11, 16) + yield from [12, 13] def source_gen3(): - yield from range(20, 22) + yield from [14, 15] def get_pipes(): return [ @@ -39,13 +39,26 @@ def get_pipes(): # default mode is "fifo" _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo")) + # items will be in order of the pipes, nested iterator items appear inline, None triggers a bit of rotation + assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15] + + # force strict mode, no rotation at all when crossing the initial source count + _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo", max_parallel_items=1)) # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation - assert [pi.item for pi in _l] == [1, 2, 55, 56, 3, 77, 88, 89, 4, 11, 12, 13, 14, 15, 20, 21] + assert [pi.item for pi in _l] == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] - # round robin mode + # round robin eval _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin")) - # items will be round robin, nested iterators are integrated into the round robin - assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 3, 13, 55, 4, 14, 56, 15, 77, 88, 89] + # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation + assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 10, 3, 11, 4, 5, 8, 6, 9, 7] + + # round robin with max parallel items triggers strict fifo in some cases (after gen2 and 3 are exhausted we already have the first yielded gen, + # items appear in order as sources are processed strictly from front) + _l = list( + PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin", max_parallel_items=1) + ) + # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation + assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 4, 5, 6, 7, 8, 9, 10, 11] def test_rotation_on_none() -> None: diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index 2731686e5a..f61b4b374a 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -1,11 +1,11 @@ from typing import Any -import dlt, asyncio, pytest, os, threading, inspect, time -from functools import wraps +import dlt, asyncio, pytest, os def test_async_iterator_resource() -> None: # define an asynchronous iterator + @dlt.resource() class AsyncIterator: def __init__(self): self.counter = 0