diff --git a/RELEASE.md b/RELEASE.md index 913a8ccd46..6bf75bc4cd 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -7,6 +7,8 @@ ## Bug fixes and other changes * Updated `_LazyDataset` representation when printing `KedroDataCatalog`. * Fixed `MemoryDataset` to infer `assign` copy mode for Ibis Tables, which previously would be inferred as `deepcopy`. +* Changed the execution of `SequentialRunner` to not use an executor pool to ensure it's single threaded. + ## Breaking changes to the API ## Documentation changes * Added documentation for Kedro's support for Delta Lake versioning. diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index d7fb86733a..e9a478e260 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -189,8 +189,8 @@ def run_only_missing( return self.run(to_rerun, catalog, hook_manager) @abstractmethod # pragma: no cover - def _get_executor(self, max_workers: int) -> Executor: - """Abstract method to provide the correct executor (e.g., ThreadPoolExecutor or ProcessPoolExecutor).""" + def _get_executor(self, max_workers: int) -> Executor | None: + """Abstract method to provide the correct executor (e.g., ThreadPoolExecutor, ProcessPoolExecutor or None if running sequentially).""" pass @abstractmethod # pragma: no cover @@ -226,7 +226,30 @@ def _run( done = None max_workers = self._get_required_workers_count(pipeline) - with self._get_executor(max_workers) as pool: + pool = self._get_executor(max_workers) + if pool is None: + for exec_index, node in enumerate(nodes): + try: + Task( + node=node, + catalog=catalog, + hook_manager=hook_manager, + is_async=self._is_async, + session_id=session_id, + ).execute() + done_nodes.add(node) + except Exception: + self._suggest_resume_scenario(pipeline, done_nodes, catalog) + raise + self._logger.info("Completed node: %s", node.name) + self._logger.info( + "Completed %d out of %d tasks", len(done_nodes), len(nodes) + ) + self._release_datasets(node, catalog, load_counts, pipeline) + + return # Exit early since everything runs sequentially + + with pool as executor: while True: ready = {n for n in todo_nodes if node_dependencies[n] <= done_nodes} todo_nodes -= ready @@ -238,9 +261,9 @@ def _run( is_async=self._is_async, session_id=session_id, ) - if isinstance(pool, ProcessPoolExecutor): + if isinstance(executor, ProcessPoolExecutor): task.parallel = True - futures.add(pool.submit(task)) + futures.add(executor.submit(task)) if not futures: if todo_nodes: self._raise_runtime_error(todo_nodes, done_nodes, ready, done) diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index 8e0fc92377..6cc8cac445 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -5,10 +5,6 @@ from __future__ import annotations -from concurrent.futures import ( - Executor, - ThreadPoolExecutor, -) from typing import TYPE_CHECKING, Any from kedro.runner.runner import AbstractRunner @@ -47,10 +43,8 @@ def __init__( is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns ) - def _get_executor(self, max_workers: int) -> Executor: - return ThreadPoolExecutor( - max_workers=1 - ) # Single-threaded for sequential execution + def _get_executor(self, max_workers: int) -> None: + return None def _run( self,