Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change runner logic to not create pool for sequential runner #4502

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
## Bug fixes and other changes
* Updated `_LazyDataset` representation when printing `KedroDataCatalog`.
* Fixed `MemoryDataset` to infer `assign` copy mode for Ibis Tables, which previously would be inferred as `deepcopy`.
* Changed the execution of `SequentialRunner` to not use an executor pool to ensure it's single threaded.

## Breaking changes to the API
## Documentation changes
* Added documentation for Kedro's support for Delta Lake versioning.
Expand Down
33 changes: 28 additions & 5 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ def run_only_missing(
return self.run(to_rerun, catalog, hook_manager)

@abstractmethod # pragma: no cover
def _get_executor(self, max_workers: int) -> Executor:
"""Abstract method to provide the correct executor (e.g., ThreadPoolExecutor or ProcessPoolExecutor)."""
def _get_executor(self, max_workers: int) -> Executor | None:
"""Abstract method to provide the correct executor (e.g., ThreadPoolExecutor, ProcessPoolExecutor or None if running sequentially)."""
pass

@abstractmethod # pragma: no cover
Expand Down Expand Up @@ -226,7 +226,30 @@ def _run(
done = None
max_workers = self._get_required_workers_count(pipeline)

with self._get_executor(max_workers) as pool:
pool = self._get_executor(max_workers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me from a logical point of view.

But I would suggest moving this into SequentialRunner._run. Otherwise, we modify the behaviour of the base class based on what is inherited from it, which is not entirely correct from the implementation point of view and AbstractRunner._run becomes too long. I understand that it will require some duplication, but in the SequentialRunner._run method, we can add a note explaining why we keep the implementation like that. But adding it to AbstractRunner._run will overload it even more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a very strong opinion on this, but my counter argument is then wouldn't it be confusing that the thread and parallel logic is in the AbstractRunner._run method but sequential isn't?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Elena that, from a Pythonic perspective, common logic should be placed in _run() within the Abstract class. If there are exceptions, we should override the common logic with specific behavior, which is how runners worked previously. However, I thought the goal of the previous PR, which Merel is currently modifying, was to centralise the runner's logic within the Abstract class.

We had already decided that the _run() function in the abstract class would rely on _get_executor(), which would be implemented specifically in different subclasses. I don't see any issues with this approach. For me, the main question is how large and readable AbstractRunner._run() will be. As Merel pointed out, consolidating all the running logic in one place will be beneficial, which was also the intention of the previous PR.

if pool is None:
for exec_index, node in enumerate(nodes):
try:
Task(
node=node,
catalog=catalog,
hook_manager=hook_manager,
is_async=self._is_async,
session_id=session_id,
).execute()
done_nodes.add(node)
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes, catalog)
raise
self._logger.info("Completed node: %s", node.name)
self._logger.info(
"Completed %d out of %d tasks", len(done_nodes), len(nodes)
)
self._release_datasets(node, catalog, load_counts, pipeline)

return # Exit early since everything runs sequentially

with pool as executor:
while True:
ready = {n for n in todo_nodes if node_dependencies[n] <= done_nodes}
todo_nodes -= ready
Expand All @@ -238,9 +261,9 @@ def _run(
is_async=self._is_async,
session_id=session_id,
)
if isinstance(pool, ProcessPoolExecutor):
if isinstance(executor, ProcessPoolExecutor):
task.parallel = True
futures.add(pool.submit(task))
futures.add(executor.submit(task))
if not futures:
if todo_nodes:
self._raise_runtime_error(todo_nodes, done_nodes, ready, done)
Expand Down
10 changes: 2 additions & 8 deletions kedro/runner/sequential_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@

from __future__ import annotations

from concurrent.futures import (
Executor,
ThreadPoolExecutor,
)
from typing import TYPE_CHECKING, Any

from kedro.runner.runner import AbstractRunner
Expand Down Expand Up @@ -47,10 +43,8 @@ def __init__(
is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns
)

def _get_executor(self, max_workers: int) -> Executor:
return ThreadPoolExecutor(
max_workers=1
) # Single-threaded for sequential execution
def _get_executor(self, max_workers: int) -> None:
return None

def _run(
self,
Expand Down