Skip to content

Commit

Permalink
In-process Python executor should not retry
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Jul 25, 2023
1 parent 8d39bf6 commit 9a1b19c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
2 changes: 0 additions & 2 deletions cubed/runtime/executors/python.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from typing import Any, Callable, Optional, Sequence

from networkx import MultiDiGraph
from tenacity import retry, stop_after_attempt

from cubed.core.array import Callback, Spec, TaskEndEvent
from cubed.core.plan import visit_nodes
from cubed.primitive.types import CubedPipeline
from cubed.runtime.types import DagExecutor


@retry(reraise=True, stop=stop_after_attempt(3))
def exec_stage_func(func: Callable[..., Any], *args, **kwargs):
return func(*args, **kwargs)

Expand Down
10 changes: 9 additions & 1 deletion cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cubed.extensions.timeline import TimelineVisualizationCallback
from cubed.extensions.tqdm import TqdmProgressBar
from cubed.primitive.blockwise import apply_blockwise
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor
from cubed.tests.utils import (
ALL_EXECUTORS,
MAIN_EXECUTORS,
Expand Down Expand Up @@ -467,7 +468,12 @@ def mock_apply_blockwise(*args, **kwargs):
return apply_blockwise(*args, **kwargs)


@pytest.mark.skipif(
platform.system() == "Windows", reason="measuring memory does not run on windows"
)
def test_retries(mocker, spec):
# Use AsyncPythonDagExecutor since PythonDagExecutor doesn't support retries
executor = AsyncPythonDagExecutor()
# Inject faults into the primitive layer
mocker.patch(
"cubed.primitive.blockwise.apply_blockwise", side_effect=mock_apply_blockwise
Expand All @@ -476,7 +482,9 @@ def test_retries(mocker, spec):
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
c = xp.add(a, b)
assert_array_equal(c.compute(), np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]))
assert_array_equal(
c.compute(executor=executor), np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]])
)


@pytest.mark.skipif(
Expand Down

0 comments on commit 9a1b19c

Please sign in to comment.