Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Custom PythonTask fails with: invalid TaskSpecification, unable to determine Pod configuration #4985

Open
ksundeepsatya opened this issue Feb 29, 2024 · 3 comments
Assignees
Labels
bug Something isn't working flytekit FlyteKit Python related issue waiting for reporter Used for when we need input from the bug reporter

Comments

@ksundeepsatya
Copy link

I am trying out user container task plugins example from Flyte docs . This works fine in local testing but, when I try it out on flyte remote (Flyte demo) it fails. This is the error:

Error

Workflow[flytesnacks:development:flyte_tmp.pre_built_container.my_workflow] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [container]: [BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configuration

pod doesn't even launch. I am on flyte 1.10.2.

Code

Code I have (note: this is same as the one in the docs):

import typing
from datetime import timedelta
from time import sleep

from flytekit import TaskMetadata, task, workflow
from flytekit.extend import Interface, PythonTask, context_manager

class WaitForObjectStoreFile(PythonTask):
    """
    Add documentation here for your plugin.
    This plugin creates an object store file sensor that waits and exits only when the file exists.
    """

    _VAR_NAME: str = "path"

    def __init__(
        self,
        name: str,
        poll_interval: timedelta = timedelta(seconds=10),
        **kwargs,
    ):
        super(WaitForObjectStoreFile, self).__init__(
            task_type="python-task",
            name=name,
            task_config=None,
            interface=Interface(inputs={self._VAR_NAME: str}, outputs={self._VAR_NAME: str}),
            **kwargs,
        )
        self._poll_interval = poll_interval

    def execute(self, **kwargs) -> typing.Any:
        # No need to check for existence, as that is guaranteed.
        path = kwargs[self._VAR_NAME]
        ctx = context_manager.FlyteContext.current_context()
        user_context = ctx.user_space_params
        while True:
            user_context.logging.info(f"Sensing file in path {path}...")
            if ctx.file_access.exists(path):
                user_context.logging.info(f"file in path {path} exists!")
                return path
            user_context.logging.warning(f"file in path {path} does not exists!")
            sleep(self._poll_interval.seconds)


sensor = WaitForObjectStoreFile(
    name="my-objectstore-sensor",
    metadata=TaskMetadata(retries=10, timeout=timedelta(minutes=20)),
    poll_interval=timedelta(seconds=1),
)


@task
def print_file(path: str) -> str:
    print(path)
    return path


@workflow
def my_workflow(path: str) -> str:
    return print_file(path=sensor(path=path))


if __name__ == "__main__":
    f = "/tmp/some-file"
    with open(f, "w") as w:
        w.write("Hello World!")

    print(my_workflow(path=f))

steps to reproduce:

  • flytectl demo start
  • Create a file workflow.py and add above code
  • pyflyte run --remote workflow.py my_workflow --path some-file
Copy link

welcome bot commented Feb 29, 2024

Thank you for opening your first issue here! 🛠

Copy link

dosubot bot commented Feb 29, 2024

🤖

Hey @ksundeepsatya, great to see you diving into the depths of Flyte again! Hope all's been well on your end.

Based on the error message you provided, it seems like there's an issue with how the task's pod configuration is specified or interpreted by the Flyte system in a remote environment. This error typically occurs when the task's specification does not include all the necessary information for the Flyte system to correctly create a Kubernetes pod for executing the task, or when there is a mismatch in the expected configuration.

Here are a few areas to check in your task specification:

  1. Container and K8sPod Specification: Ensure that your task's specification correctly defines either a Container or a K8sPod configuration. For a PythonTask, you would typically use a container specification that includes the necessary image, command, arguments, and resources. If you are using a custom Kubernetes pod configuration (K8sPod), ensure that the pod_spec is correctly defined and marshalled into the protobuf struct.

  2. Resources: Verify that the task's Resources are correctly specified, including both requests and limits for CPU, memory, and any other required resources. The error could occur if the resources are not correctly defined or if there's a mismatch in the expected format.

  3. ExtendedResources: If your task requires specialized resources, such as GPU accelerators, ensure that these are correctly specified in the ExtendedResources field. Misconfiguration here could lead to issues in pod creation.

  4. Task Metadata: Check the TaskMetadata for any fields that might influence pod creation, such as security_context, interruptible_value, and pod_template_name. An incorrectly specified security context or the use of an undefined pod template name could cause the system to fail to determine the correct pod configuration.

  5. Compatibility and Versioning: Ensure that the version of Flyte you are deploying to remotely supports all the features and fields you are using in your task specification. Incompatibilities between the local testing environment and the remote Flyte cluster version could lead to discrepancies in behavior.

To resolve the issue, review your task's specification against the FlyteIDL definition to ensure all required fields are correctly populated and that there are no mismatches in resource definitions or pod configurations. Additionally, consulting the Flyte documentation for the specific version of Flyte you are using remotely can help identify any version-specific requirements or changes in the task specification process.

You can refer to the FlyteIDL definition for more details.

I hope this helps! If you have any more questions or need further clarification, feel free to ask.

Sources

About Dosu

This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@dosubot dosubot bot added the bug Something isn't working label Feb 29, 2024
@eapolinario eapolinario added the flytekit FlyteKit Python related issue label Feb 29, 2024
@eapolinario eapolinario self-assigned this Feb 29, 2024
@eapolinario
Copy link
Contributor

@ksundeepsatya , can you try using a PythonInstanceTask instead of PythonTask?

Essentially define WaitForObjectStoreFile as:

T = typing.TypeVar("T")

class WaitForObjectStoreFile(PythonInstanceTask[T]):

This will make it so that the execute method runs in a pod, but I'm not sure I understand your use case yet. Can you talk about it?

@eapolinario eapolinario added the waiting for reporter Used for when we need input from the bug reporter label Apr 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working flytekit FlyteKit Python related issue waiting for reporter Used for when we need input from the bug reporter
Projects
None yet
Development

No branches or pull requests

2 participants