Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Task to fails to import modules in pyflyte-map-execute mode but works in pyflyte-execute mode #4853

Open
2 tasks done
debajyoti-truefoundry opened this issue Feb 7, 2024 · 4 comments
Assignees
Labels
backlogged For internal use. Reserved for contributor team workflow. bug Something isn't working flytekit FlyteKit Python related issue

Comments

@debajyoti-truefoundry
Copy link

Describe the bug

  ~/work/pipelines/..         │I   1 import os     ■ Missing module docstring
    .git                    │    2 from functools import partial
    flyte                   │    3 from pathlib import Path
      installation          │    4 from typing import List, Tuple
      my_workflow           │    5
        __pycache__         │E   6 import flytekit     ■ Unable to import 'flytekit'
        my_module           │E   7 from flytekit import (     ■ Unable to import 'flytekit'
          __pycache__       │    8     ImageSpec,
           __init__.py       │    9     PodTemplate,
           my_module.py      │   10     Resources,
         __init__.py         │   11     conditional,
         register.sh         │   12     map_task,
         run.sh              │   13     task,
         train_model_workflow│   14     workflow,
       .gitignore            │   15 )
       config.yaml           │E  16 from flytekit.types.directory import FlyteDirectory     ■ Unable to import 'flytekit.types.directory'
       requirements.txt      │E  17 from kubernetes import client     ■ Unable to import 'kubernetes'
                              │   18 from my_workflow.my_module.my_module import random

This is what my current folder structure looks like, and on line 18, I have imported a module.

My workflow file looks like this,

import os
from functools import partial
from pathlib import Path
from typing import List, Tuple

import flytekit
from flytekit import (
    ImageSpec,
    PodTemplate,
    Resources,
    conditional,
    map_task,
    task,
    workflow,
)
from flytekit.types.directory import FlyteDirectory
from kubernetes import client
from my_workflow.my_module.my_module import random

nvml_image_spec = ImageSpec(
    base_image="python:3.9-slim",
    packages=["flytekit==1.10.3", "pynvml==11.5.0"],
    registry="redacted",
    name="redacted-pytorch",
    source_root="..",
)


normal_image = ImageSpec(
    base_image="python:3.9-slim",
    packages=["flytekit==1.10.3"],
    registry="redacted",
    name="redacted-normal",
    source_root="..",
)

cpu_resource = Resources(mem="300Mi", cpu="0.1")
gpu_resource = Resources(mem="900Mi", cpu="0.1", gpu="1")
gpu_pod_template = PodTemplate(
    pod_spec=client.V1PodSpec(
        containers=[],
        affinity=client.V1Affinity(
            node_affinity=client.V1NodeAffinity(
                required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
                    node_selector_terms=[
                        client.V1NodeSelectorTerm(
                            match_expressions=[
                                client.V1NodeSelectorRequirement(
                                    key="karpenter.k8s.aws/instance-family",
                                    operator="In",
                                    values=["g4dn"],
                                )
                            ]
                        )
                    ]
                )
            )
        ),
    )
)


@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def should_train_tokenizer(tokenizer: str) -> bool:
    return not bool(tokenizer)


@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def train_tokenizer() -> str:
    return "trained_tokenizer"


@task(
    container_image=nvml_image_spec,
    requests=gpu_resource,
    limits=gpu_resource,
    pod_template=gpu_pod_template,
    # accelerator=flytekit.extras.accelerators.T4,
)
def train_model(tokenizer: str) -> FlyteDirectory:
    from pynvml import nvmlInit, nvmlDeviceGetCount

    nvmlInit()
    assert nvmlDeviceGetCount() > 0

    random.random()
    working_dir = flytekit.current_context().working_directory
    local_dir = Path(os.path.join(working_dir, "csv_files"))
    local_dir.mkdir(exist_ok=True)

    with open(os.path.join(local_dir, "model"), "w", encoding="utf-8") as f:
        f.write(tokenizer)

    return FlyteDirectory(path=str(local_dir))


@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def validate_model(model: FlyteDirectory, tokenizer: str, validation_data: str) -> bool:
    print(validation_data)
    model_path = os.path.join(model, "model")
    with open(model_path, "r", encoding="utf-8") as f:
        return f.read() == tokenizer


@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def all_good(validations: List[bool]) -> bool:
    return all(validations)


@workflow
def train_tokenizer_and_model() -> Tuple[FlyteDirectory, str]:
    tokenizer = train_tokenizer()
    model = train_model(tokenizer=tokenizer)
    return model, tokenizer


@workflow
def just_train_model(tokenizer: str) -> Tuple[FlyteDirectory, str]:
    model = train_model(tokenizer=tokenizer)
    return model, tokenizer


@workflow
def train(tokenizer: str = "") -> bool:
    stt = should_train_tokenizer(tokenizer=tokenizer)
    model, t = (
        conditional("train_tokenizer")
        .if_(stt.is_true())
        .then(train_tokenizer_and_model())
        .else_()
        .then(just_train_model(tokenizer=tokenizer))
    )
    validation_task = partial(validate_model, model=model, tokenizer=t)
    validations = map_task(
        validation_task,
        concurrency=2,
    )(validation_data=["foo", "bar", "baz"])
    return all_good(validations=validations)

I used the following command to run the workflow remotely.

pyflyte -v --config ../config.yaml run --remote train_model_workflow.py train --tokenizer Deb

image

As you can see in the above screenshot, the validate model map task fails. This is the log I can see in the corresponding pod

tar: Removing leading `/' from member names
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/local/bin/pyflyte-map-execute:8 in <module>                             │
│                                                                              │
│ ❱ 8 │   sys.exit(map_execute_task_cmd())                                     │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/click/core.py:1157 in __call__        │
│                                                                              │
│ ❱ 1157 │   │   return self.main(*args, **kwargs)                             │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/click/core.py:1078 in main            │
│                                                                              │
│ ❱ 1078 │   │   │   │   │   rv = self.invoke(ctx)                             │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/click/core.py:1434 in invoke          │
│                                                                              │
│ ❱ 1434 │   │   │   return ctx.invoke(self.callback, **ctx.params)            │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/click/core.py:783 in invoke           │
│                                                                              │
│ ❱  783 │   │   │   │   return __callback(*args, **kwargs)                    │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/bin/entrypoint.py:577 in     │
│ map_execute_task_cmd                                                         │
│                                                                              │
│ ❱ 577 │   _execute_map_task(                                                 │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py:143 in  │
│ f                                                                            │
│                                                                              │
│ ❱ 143 │   │   │   return outer_f(inner_f, args, kwargs)                      │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py:173 in  │
│ system_entry_point                                                           │
│                                                                              │
│ ❱ 173 │   │   │   │   return wrapped(*args, **kwargs)                        │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/bin/entrypoint.py:418 in     │
│ _execute_map_task                                                            │
│                                                                              │
│ ❱ 418 │   │   map_task = mtr.load_task(loader_args=resolver_args, max_concur │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/utils.py:309 in wrapper │
│                                                                              │
│ ❱ 309 │   │   │   │   return func(*args, **kwargs)                           │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/map_task.py:401 in      │
│ load_task                                                                    │
│                                                                              │
│ ❱ 401 │   │   _task_def = resolver_obj.load_task(loader_args=resolver_args)  │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/utils.py:309 in wrapper │
│                                                                              │
│ ❱ 309 │   │   │   │   return func(*args, **kwargs)                           │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/python_auto_container.p │
│ y:248 in load_task                                                           │
│                                                                              │
│ ❱ 248 │   │   task_module = importlib.import_module(name=task_module)  # typ │
│                                                                              │
│ /usr/local/lib/python3.9/importlib/__init__.py:127 in import_module          │
│                                                                              │
│ ❱ 127 │   return _bootstrap._gcd_import(name[level:], package, level)        │
│ in _gcd_import:1030                                                          │
│ in _find_and_load:1007                                                       │
│ in _find_and_load_unlocked:986                                               │
│ in _load_unlocked:680                                                        │
│ in exec_module:850                                                           │
│ in _call_with_frames_removed:228                                             │
│                                                                              │
│ /my_workflow/train_model_workflow.py:18 in <module>                          │
│                                                                              │
│ ❱  18 from my_workflow.my_module.my_module import random                     │
╰──────────────────────────────────────────────────────────────────────────────╯
ModuleNotFoundError: No module named 'my_workflow.my_module'
Getting s3://metadata/flytesnacks/development/ZRPWNGFGX2H5F5QHTYL7AZSPOY======/script_mode.tar.gz to ./

I cannot understand why the should_train_tokenizer task was executed successfully, but the map task failed.

command for the failed task,

pyflyte-fast-execute --additional-distribution s3://metadata/flytesnacks/development/ZRPWNGFGX2H5F5QHTYL7AZSPOY======/script_mode.tar.gz --dest-dir . -- pyflyte-map-execute --inputs s3://metadata/metadata/propeller/flytesnacks-development-ac6x425ghlsgbkl7gcg5/n2/data/inputs.pb --output-prefix s3://metadata/metadata/propeller/flytesnacks-development-ac6x425ghlsgbkl7gcg5/n2/data/0 --raw-output-data-prefix s3://userdata/data/31/ac6x425ghlsgbkl7gcg5-n2-0/1/0 --checkpoint-path s3://userdata/data/31/ac6x425ghlsgbkl7gcg5-n2-0/1/0/_flytecheckpoints --prev-checkpoint "" --resolver MapTaskResolver -- vars model,tokenizer resolver flytekit.core.python_auto_container.default_task_resolver task-module my_workflow.train_model_workflow task-name validate_model

Expected behavior

The map task should not fail.

Additional context to reproduce

No response

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@debajyoti-truefoundry debajyoti-truefoundry added bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers labels Feb 7, 2024
Copy link

welcome bot commented Feb 7, 2024

Thank you for opening your first issue here! 🛠

Copy link

dosubot bot commented Feb 7, 2024

🤖

Hey @debajyoti-truefoundry, great to see you back! Hope you're doing well.

Based on the information you've provided, it seems like the issue might be related to the way your modules are being imported or located in the pyflyte-map-execute mode. The pyflyte-map-execute mode should not change the way Python handles imports, so it's possible that the issue might be related to the file structure or the way the modules are being referenced.

The error message ModuleNotFoundError: No module named 'my_workflow.my_module' suggests that Python is unable to find the module my_module in the my_workflow package. This could be due to a number of reasons:

  1. The my_workflow package or the my_module module does not exist in the location where Python is looking for it.
  2. The my_workflow package is not properly installed or is not in the Python path.
  3. The my_workflow package does not contain an __init__.py file, which is required for Python to recognize it as a package.

Here are a few things you could try:

  • Check the file structure and make sure that the my_workflow package and the my_module module exist in the expected locations.
  • Make sure that the my_workflow package is properly installed and is in the Python path. You can check the Python path by running import sys; print(sys.path) in your Python environment.
  • Check if the my_workflow package contains an __init__.py file. If it doesn't, you can create one (it can be an empty file).

If none of these suggestions solve the issue, it would be helpful if you could provide more information about your file structure and how you're importing the my_workflow package and the my_module module. This would help in identifying the cause of the issue.

For more information on how Python imports work, you can refer to the Python documentation on modules.

I hope this helps! If you have any more questions or need further clarification, feel free to ask.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@debajyoti-truefoundry
Copy link
Author

I did not face this problem when I registered the workflow and ran from UI.

pyflyte --pkgs my_workflow package -f --source .

flytectl register files \
    --project flytesnacks \
    --domain development \
    --archive flyte-package.tgz \
    --version "$(git rev-parse HEAD)"

image

This is the command that was used at that time in the Pod.

pyflyte-map-execute --inputs s3://metadata/metadata/propeller/flytesnacks-development-abkhjcvsfw42kvttmzkv/n2/data/inputs.pb --output-prefix s3://metadata/metadata/propeller/flytesnacks-development-abkhjcvsfw42kvttmzkv/n2/data/0 --raw-output-data-prefix s3://userdata/data/q1/abkhjcvsfw42kvttmzkv-n2-0/1/0 --checkpoint-path s3://userdata/data/q1/abkhjcvsfw42kvttmzkv-n2-0/1/0/_flytecheckpoints --prev-checkpoint "" --resolver MapTaskResolver -- vars model,tokenizer resolver flytekit.core.python_auto_container.default_task_resolver task-module my_workflow.train_model_workflow task-name validate_model

@eapolinario eapolinario removed the untriaged This issues has not yet been looked at by the Maintainers label Feb 8, 2024
@eapolinario eapolinario self-assigned this Feb 8, 2024
@eapolinario eapolinario added flytekit FlyteKit Python related issue backlogged For internal use. Reserved for contributor team workflow. labels Feb 15, 2024
@DouglasLivingstone
Copy link

The issue here appears to be that the --additional-distribution passed to pyflyte-fast-execute is not used for map_tasks, so the tasks end up running the original version of the code, without the updates from the additional distribution. For new workflow files, this leads to the ModuleNotFoundError, and for changed workflow files it means that the old code runs instead of the new code.

I've run into this when using a Docker container by setting --image in the pyflyte register call (the code in the image runs, not my local changes), but it seems like you're having the same problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backlogged For internal use. Reserved for contributor team workflow. bug Something isn't working flytekit FlyteKit Python related issue
Projects
None yet
Development

No branches or pull requests

3 participants