Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-processing pickle issue with pickling driver within Hamilton #1093

Closed
skrawcz opened this issue Aug 13, 2024 · 1 comment · Fixed by #1100
Closed

Multi-processing pickle issue with pickling driver within Hamilton #1093

skrawcz opened this issue Aug 13, 2024 · 1 comment · Fixed by #1100
Assignees
Labels
bug Something isn't working

Comments

@skrawcz
Copy link
Collaborator

skrawcz commented Aug 13, 2024

Current behavior

This breaks on the below code -- it's hamilton within hamilton.

Stack Traces

Pickle error on Module Type.

Steps to replicate behavior

The mapper-worker-reducer pattern works well on multithreadingexecutor, but not work for multiprocessingexecutor. The error is cannot pickle 'module' object. Here is a minimal reproduce environment:

import worker

def double(a:int) -> int:
    return a*2
import mapper

from hamilton.htypes import Parallelizable, Collect
from typing import Any

def mapper(
    drivers: list,
    inputs: list,
    final_vars: list = [],
) -> Parallelizable[dict]:
    for dr, input_ in zip(drivers, inputs):
        yield {
            "dr": dr,
            "final_vars": final_vars or dr.list_available_variables(),
            "input": input_,
        }


def worker(mapper: dict) -> dict:
    _dr = mapper["dr"]
    _inputs = mapper["input"]
    _final_var = mapper["final_vars"]
    return _dr.execute(final_vars=_final_var, inputs=_inputs)


def reducer(worker: Collect[dict]) -> Any:

    return worker
from hamilton import driver
from hamilton.execution import executors

drivers = []
inputs = []
for i in range(4):
    dr = driver.Builder().with_modules(worker).build()
    drivers.append(dr)
    inputs.append({'a': i})


dr = (
    driver.Builder()
    .with_modules(mapper)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_remote_executor(executors.MultiProcessingExecutor(8))
    .build()
)
dr.execute(
    final_vars=["reducer"],
    inputs={"drivers": drivers, "inputs": inputs, "final_vars": ['double']},
)

Library & System Information

Latest.

Expected behavior

This work.

Additional context

Add any other context about the problem here.

@skrawcz skrawcz added the triage label for issues that need to be triaged. label Aug 13, 2024
@skrawcz
Copy link
Collaborator Author

skrawcz commented Aug 13, 2024

From MolCrafts/molexp#8

@skrawcz skrawcz added bug Something isn't working and removed triage label for issues that need to be triaged. labels Aug 18, 2024
@skrawcz skrawcz self-assigned this Aug 18, 2024
skrawcz added a commit that referenced this issue Aug 18, 2024
This fixes #1093.

Modules were not picklable. So this fixes that by serializing
their fully qualified names, and then when the driver object
is deserialized, they are reinstantiated as module objects.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant