Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async workflow & task #2611

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

novahow
Copy link
Contributor

@novahow novahow commented Jul 25, 2024

Tracking issue

closes flyteorg/flyte#1840

Why are the changes needed?

support async task. pythonfunctiontask , pythonfunctionworkflow and map_task are supported. Conditional workflows are currently not supported

import asyncio
from flytekit.experimental import eager
from flytekit import task, workflow, map_task
import time
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote

@task(enable_deck=True)
async def async_add_one(x: int) -> int:
    await asyncio.sleep(1)
    return x + 1

@task
async def multi_add(models: list[int]) -> list[int]:
    coros = [async_add_one(x=x) for x in models]
    res = await asyncio.gather(*coros)
    return res

@workflow
async def adv_async_p(models: list[int] = [1,2,3]) -> list[int]:
    coros = map_task(async_add_one)(x=models)
    new_models = await coros
    res2 = await multi_add(models=new_models)
    return res2


@workflow
async def sub3_wf(models: list[int] = [1,2,3]) -> list[int]:
    await async_add_one(x=models[0])
    return models

@workflow
async def sub_adv_p(models: list[int] = [1,2,3]) -> list[int]:
    ann = await sub3_wf(models=models)
    coro0 = async_add_one(x=ann[0])
    coro1 = async_add_one(x=ann[1])
    res = await asyncio.gather(coro0, coro1)
    return res
    print(res)
    return wtf_resolve(x=res)

@workflow
async def adv_p(models: list[int] = [1,2,3]) -> list[int]:
    res = await sub_adv_p(models=models)
    return res

What changes were proposed in this pull request?

Added an awaitable attribute in Promise and VoidPromise

How was this patch tested?

added unit tests on map_task and promises.

Setup process

Screenshots

image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

@novahow novahow changed the title Feature/flyte run Support async workflow & task Jul 25, 2024
@novahow novahow force-pushed the feature/flyteRun branch 2 times, most recently from e0aeaf3 to 9bc6d89 Compare August 15, 2024 12:46
@novahow novahow marked this pull request as ready for review August 15, 2024 12:47
@kumare3
Copy link
Contributor

kumare3 commented Sep 25, 2024

OMG!!!!!!!!!!! This is amazing stuff. But we have to take closer look at the code. @thomasjpfan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core][Flytekit] support async methods in python
2 participants