Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace multiprocessing pool with futures executors #719

Merged
merged 6 commits into from
Oct 30, 2023

Conversation

steinitzu
Copy link
Collaborator

@steinitzu steinitzu commented Oct 27, 2023

Resolves: #699

Added NullExecutor fallback implementation as well which just runs the task in the same thread and wraps in a future. So we have the same interface in single-threaded mode and don't have to check whether a pool is there.

@netlify
Copy link

netlify bot commented Oct 27, 2023

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit e478f89
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/65401b53eb32330008fcfa2a

Comment on lines 49 to 54
def test_fail_on_process_worker_started_early() -> None:
# process pool cannot be started before class instance is created: mapping not exist in worker
with Pool(4) as p:
with ProcessPoolExecutor(4) as p:
r = _TestRunnableWorkerMethod(4)
with pytest.raises(KeyError):
r._run(p)
p.close()
p.shutdown(wait=True)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix This test fails, doesn't raise and I'm not totally clear on what it's doing. Looks like the executor initializes the process pool lazily on first tasks, so maybe this order doesn't matter now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if processes are lazily instantiated then yes. just remove the with raises and test if the run was successful.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is neat

@@ -59,7 +85,7 @@ def _run_func() -> bool:
if pool:
logger.info("Closing processing pool")
# terminate pool and do not join
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove outdated comments

@@ -59,7 +85,7 @@ def _run_func() -> bool:
if pool:
logger.info("Closing processing pool")
# terminate pool and do not join
pool.terminate()
pool.shutdown(wait=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope that will never lock. the process pool was locking sometimes. impossible to debug

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the old one used a semaphore under the hood in the stdlib. And if the system sigkilled a child process, perhaps due to memory, or a process failed, it could lock. On kubernetes where you have limited CPU, os.cpucount returns incorrect value. Over provisioning procs would stall out the program too.

dlt/common/runners/pool_runner.py Outdated Show resolved Hide resolved
@@ -17,9 +18,9 @@ def test_runnable_process_pool(method: str) -> None:
# 4 tasks
r = _TestRunnableWorker(4)
# create 4 workers
with Pool(4) as p:
with ProcessPoolExecutor(4) as p:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this line still work

multiprocessing.set_start_method(method, force=True)

?

we need to make sure all methods work. on windows there's only spawn. (also make sure those tests run on windows on CI)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I updated this in tests ProcessPoolExecutor(4, mp_context=multiprocessing.get_context(method))
The global multiprocessing seems to work too but cleaner to not change global settings in tests

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still apparently the case with "spawn" that processes start upfront, so testing both

Comment on lines 49 to 54
def test_fail_on_process_worker_started_early() -> None:
# process pool cannot be started before class instance is created: mapping not exist in worker
with Pool(4) as p:
with ProcessPoolExecutor(4) as p:
r = _TestRunnableWorkerMethod(4)
with pytest.raises(KeyError):
r._run(p)
p.close()
p.shutdown(wait=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if processes are lazily instantiated then yes. just remove the with raises and test if the run was successful.

@z3z1ma
Copy link
Collaborator

z3z1ma commented Oct 29, 2023

This is very nicely done 🚀

I wonder if there is a possibility to supply a custom executor. Given that, you could parallelize normalization across nodes using something like Ray, at the expense of network. But given a large enough pool it could pay off.

@steinitzu steinitzu force-pushed the sthor/futures-execturos branch from add0724 to 7279700 Compare October 30, 2023 15:34
@steinitzu
Copy link
Collaborator Author

This is very nicely done 🚀

I wonder if there is a possibility to supply a custom executor. Given that, you could parallelize normalization across nodes using something like Ray, at the expense of network. But given a large enough pool it could pay off.

This would be cool and easy to do, anything with the same futures interface should work. Question how we would pass it. I think just an executor/pool argument in normalize, load, run would be good and that should supersede config.

@z3z1ma
Copy link
Collaborator

z3z1ma commented Oct 30, 2023

This is very nicely done 🚀
I wonder if there is a possibility to supply a custom executor. Given that, you could parallelize normalization across nodes using something like Ray, at the expense of network. But given a large enough pool it could pay off.

This would be cool and easy to do, anything with the same futures interface should work. Question how we would pass it. I think just an executor/pool argument in normalize, load, run would be good and that should supersede config.

Indeed, for example https://docs.dask.org/en/stable/futures.html# offers an Executor compatible interface out of the box that scales out to multiple nodes. Or a bespoke implementation could be supplied.

The primary consideration to take this to the next level is on data locality. Data storage should ideally leverage fsspec. So that things like from dlt.common.storages import NormalizeStorage, SchemaStorage, LoadStorage, LoadStorageConfiguration, NormalizeStorageConfiguration all leverage fsspec. This, when configured, would mean parallelization across nodes as well as persistence of pipeline state across nodes becomes trivial. The parameterization of Executor is still useful even prior to the above. But we should consider the above as deeply synergistic.

@rudolfix rudolfix marked this pull request as ready for review October 30, 2023 20:05
@rudolfix rudolfix force-pushed the sthor/futures-execturos branch from a648e07 to e478f89 Compare October 30, 2023 21:08
@rudolfix rudolfix merged commit 7c85bfe into devel Oct 30, 2023
@rudolfix rudolfix deleted the sthor/futures-execturos branch October 30, 2023 21:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

replace pool executor with thread executor in load stage
3 participants