Skip to content

Commit

Permalink
set default next item mode to round robin (#1482)
Browse files Browse the repository at this point in the history
* set default next item mode to round robin

* fix sources tests

* fix some existing tests to work again

* parametrize one test
  • Loading branch information
sh-rp authored Jun 19, 2024
1 parent b267c70 commit d4b0bd0
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 15 deletions.
7 changes: 3 additions & 4 deletions dlt/extract/pipe_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class PipeIteratorConfiguration(BaseConfiguration):
workers: int = 5
futures_poll_interval: float = 0.01
copy_on_fork: bool = False
next_item_mode: str = "fifo"

next_item_mode: str = "round_robin"
__section__: ClassVar[str] = known_sections.EXTRACT

def __init__(
Expand Down Expand Up @@ -82,7 +81,7 @@ def from_pipe(
max_parallel_items: int = 20,
workers: int = 5,
futures_poll_interval: float = 0.01,
next_item_mode: TPipeNextItemMode = "fifo",
next_item_mode: TPipeNextItemMode = "round_robin",
) -> "PipeIterator":
# join all dependent pipes
if pipe.parent:
Expand All @@ -109,7 +108,7 @@ def from_pipes(
workers: int = 5,
futures_poll_interval: float = 0.01,
copy_on_fork: bool = False,
next_item_mode: TPipeNextItemMode = "fifo",
next_item_mode: TPipeNextItemMode = "round_robin",
) -> "PipeIterator":
# print(f"max_parallel_items: {max_parallel_items} workers: {workers}")
sources: List[SourcePipeItem] = []
Expand Down
8 changes: 3 additions & 5 deletions docs/website/docs/reference/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,11 @@ call `pipeline.activate()` to inject the right context into current thread.
## Resources extraction, `fifo` vs. `round robin`

When extracting from resources, you have two options to determine what the order of queries to your
resources are: `fifo` and `round_robin`.
resources are: `round_robin` and `fifo`.

`fifo` is the default option and will result in every resource being fully extracted before the next
resource is extracted in the order that you added them to your source.
`round_robin` is the default option and will result in extraction of one item from the first resource, then one item from the second resource etc, doing as many rounds as necessary until all resources are fully extracted. If you want to extract resources in parallel, you will need to keep `round_robin`.

`round_robin` will result in extraction of one item from the first resource, then one item from the
second resource etc, doing as many rounds as necessary until all resources are fully extracted.
`fifo` is an option for sequential extraction. It will result in every resource being fully extracted until the resource generator is expired, or a configured limit is reached, then the next resource will be evaluated. Resources are extracted in the order that you added them to your source.

You can change this setting in your `config.toml` as follows:

Expand Down
8 changes: 6 additions & 2 deletions tests/extract/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,10 @@ def standalone_transformer_returns(item: TDataItem, init: int = dlt.config.value
return "A" * item * init


def test_standalone_transformer() -> None:
@pytest.mark.parametrize("next_item_mode", ["fifo", "round_robin"])
def test_standalone_transformer(next_item_mode: str) -> None:
os.environ["EXTRACT__NEXT_ITEM_MODE"] = next_item_mode

assert not isinstance(standalone_transformer, DltResource)
assert callable(standalone_transformer)
assert standalone_transformer.__doc__ == """Has fine transformer docstring"""
Expand All @@ -824,7 +827,8 @@ def test_standalone_transformer() -> None:
bound_tx(1)
assert isinstance(bound_tx, DltResource)
# the resource sets the start of the range of transformer + transformer init
assert list(standalone_signature(1, 3) | bound_tx) == [6, 7, 8, 9, 7, 8, 9]
exp_result = [6, 7, 7, 8, 8, 9, 9] if next_item_mode == "round_robin" else [6, 7, 8, 9, 7, 8, 9]
assert list(standalone_signature(1, 3) | bound_tx) == exp_result

# wrong params to transformer
with pytest.raises(TypeError):
Expand Down
11 changes: 8 additions & 3 deletions tests/extract/test_extract_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_pipes():
Pipe.from_data("data3", source_gen3()),
]

# default mode is "fifo"
# test both modes
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo"))
# items will be in order of the pipes, nested iterator items appear inline, None triggers a bit of rotation
assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15]
Expand All @@ -53,6 +53,11 @@ def get_pipes():
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 10, 4, 11, 5, 6, 8, 9, 7]

# default is round robin, should have same result without explicit
_l = list(PipeIterator.from_pipes(get_pipes()))
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 10, 4, 11, 5, 6, 8, 9, 7]

# round robin with max parallel items triggers strict fifo in some cases (after gen2 and 3 are exhausted we already have the first yielded gen,
# items appear in order as sources are processed strictly from front)
_l = list(
Expand Down Expand Up @@ -460,7 +465,7 @@ def test_yield_map_step() -> None:
p = Pipe.from_data("data", [1, 2, 3])
# this creates number of rows as passed by the data
p.append_step(YieldMapItem(lambda item: (yield from [f"item_{x}" for x in range(item)])))
assert _f_items(list(PipeIterator.from_pipe(p))) == [
assert _f_items(list(PipeIterator.from_pipe(p, next_item_mode="fifo"))) == [
"item_0",
"item_0",
"item_1",
Expand All @@ -476,7 +481,7 @@ def test_yield_map_step() -> None:
p.append_step(
YieldMapItem(lambda item, meta: (yield from [f"item_{meta}_{x}" for x in range(item)]))
)
assert _f_items(list(PipeIterator.from_pipe(p))) == [
assert _f_items(list(PipeIterator.from_pipe(p, next_item_mode="fifo"))) == [
"item_A_0",
"item_B_0",
"item_B_1",
Expand Down
8 changes: 8 additions & 0 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@
)


@pytest.fixture(autouse=True)
def switch_to_fifo():
"""most of the following tests rely on the old default fifo next item mode"""
os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo"
yield
del os.environ["EXTRACT__NEXT_ITEM_MODE"]


def test_detect_incremental_arg() -> None:
def incr_1(incremental: dlt.sources.incremental): # type: ignore[type-arg]
pass
Expand Down
10 changes: 9 additions & 1 deletion tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
import asyncio

import dlt
import dlt, os
from dlt.common.configuration.container import Container
from dlt.common.exceptions import DictValidationException, PipelineStateNotAvailable
from dlt.common.pipeline import StateInjectableContext, source_state
Expand All @@ -31,6 +31,14 @@
from dlt.extract.pipe import Pipe


@pytest.fixture(autouse=True)
def switch_to_fifo():
"""most of the following tests rely on the old default fifo next item mode"""
os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo"
yield
del os.environ["EXTRACT__NEXT_ITEM_MODE"]


def test_call_data_resource() -> None:
with pytest.raises(TypeError):
DltResource.from_data([1], name="t")()
Expand Down

0 comments on commit d4b0bd0

Please sign in to comment.