Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Upstream] [COR-2297/] Fix nested offloaded type validation (#552) #5996

Merged

Conversation

pmahindrakar-oss
Copy link
Contributor

The following workflow works when we are not offloading literals in flytekit

import logging
from typing import List
from flytekit import map_task, task, workflow,LaunchPlan

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("flytekit")
logger.setLevel(logging.DEBUG)

@task(cache=True, cache_version="1.1")
def my_30mb_task(i: str) -> str:
    return f"Hello world {i}" * 30 * 100 * 1024

@task(cache=True, cache_version="1.1")
def generate_strs(count: int) -> List[str]:
    return ["a"] * count

@workflow
def my_30mb_wf(mbs: int) -> List[str]:
  strs = generate_strs(count=mbs)
  return map_task(my_30mb_task)(i=strs)

@workflow
def big_inputs_wf(input: List[str]):
   noop()

@task(cache=True, cache_version="1.1")
def noop():
    ...

big_inputs_wf_lp = LaunchPlan.get_or_create(name="big_inputs_wf_lp", workflow=big_inputs_wf)

@workflow
def ref_wf(mbs: int):
  big_inputs_wf_lp(input=my_30mb_wf(mbs))

Without flytekit offloading the return type is OffloadedLiteral{inferredType:{Collection{String}} and when checked against big_inputs_wf launchplan which needs Collection{String} , the LiteralTypeToLiteral returns the inferredType : Collection{String}

If we enable offloading in flytekit, the returned data from map task is Collection{OffloadedLiteral<{inferredType:{Collection{String}}}

When passing this Input to big_inputs_wf which takes Collection{String} then the type check fails due to LiteralTypeToLiteral returning Collection{OffloadedLiteral{inferredType:{Collection{String}}} as Collection{Collection{String}}

Flytekit handles this case by special casing Collection{OffloadedLiteral} and similar special casing is needed in flyte code base

Tested this by deploying this PR changes

https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/akxs97cdmkmxhhqp228x/nodes

Earlier it would fail like this https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/ap4thjp5528kjfspcsds/nodes

[UserError] failed to launch workflow, caused by: rpc error: code = InvalidArgument desc = invalid input input wrong type. Expected collection_type:{simple:STRING}, but got collection_type:{collection_type:{simple:STRING}}

Rollout to canary and then all prod byoc and serverless tenants

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See this guide.

  • To be upstreamed to OSS

TODO: Link Linear issue(s) using magic words. fixes will move to merged status, while ref will only link the PR.

  • Added tests
  • Ran a deploy dry run and shared the terraform plan
  • Added logging and metrics
  • Updated dashboards and alerts
  • Updated documentation

Tracking issue

Why are the changes needed?

What changes were proposed in this pull request?

How was this patch tested?

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

The following workflow works when we are not offloading  literals in flytekit

```
import logging
from typing import List
from flytekit import map_task, task, workflow,LaunchPlan

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("flytekit")
logger.setLevel(logging.DEBUG)

@task(cache=True, cache_version="1.1")
def my_30mb_task(i: str) -> str:
    return f"Hello world {i}" * 30 * 100 * 1024

@task(cache=True, cache_version="1.1")
def generate_strs(count: int) -> List[str]:
    return ["a"] * count

@workflow
def my_30mb_wf(mbs: int) -> List[str]:
  strs = generate_strs(count=mbs)
  return map_task(my_30mb_task)(i=strs)

@workflow
def big_inputs_wf(input: List[str]):
   noop()

@task(cache=True, cache_version="1.1")
def noop():
    ...

big_inputs_wf_lp = LaunchPlan.get_or_create(name="big_inputs_wf_lp", workflow=big_inputs_wf)

@workflow
def ref_wf(mbs: int):
  big_inputs_wf_lp(input=my_30mb_wf(mbs))
```

Without flytekit offloading the return type is OffloadedLiteral{inferredType:{Collection{String}} and when checked against big_inputs_wf launchplan which needs Collection{String} , the LiteralTypeToLiteral returns the inferredType : Collection{String}

If we enable offloading in flytekit, the returned data from map task is
Collection{OffloadedLiteral<{inferredType:{Collection{String}}}

When passing this Input to big_inputs_wf which takes Collection{String} then the type check fails due to LiteralTypeToLiteral returning Collection{OffloadedLiteral{inferredType:{Collection{String}}} as Collection{Collection{String}}

Flytekit handles this case by special casing Collection{OffloadedLiteral} and similar special casing is needed in flyte code base

Tested this by deploying this PR changes

https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/akxs97cdmkmxhhqp228x/nodes

Earlier it would fail like this https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/ap4thjp5528kjfspcsds/nodes
```
[UserError] failed to launch workflow, caused by: rpc error: code = InvalidArgument desc = invalid input input wrong type. Expected collection_type:{simple:STRING}, but got collection_type:{collection_type:{simple:STRING}}
```

Rollout to canary and then all prod byoc and serverless tenants

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

*TODO: Link Linear issue(s) using [magic words](https://linear.app/docs/github#magic-words). `fixes` will move to merged status, while `ref` will only link the PR.*

* [X] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
Copy link

codecov bot commented Nov 12, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 36.95%. Comparing base (3c3ae05) to head (7f0db4a).
Report is 1 commits behind head on master.

Additional details and impacted files
@@           Coverage Diff           @@
##           master    #5996   +/-   ##
=======================================
  Coverage   36.95%   36.95%           
=======================================
  Files        1310     1310           
  Lines      131451   131464   +13     
=======================================
+ Hits        48575    48586   +11     
- Misses      78655    78657    +2     
  Partials     4221     4221           
Flag Coverage Δ
unittests-datacatalog 51.58% <ø> (ø)
unittests-flyteadmin 54.05% <ø> (-0.03%) ⬇️
unittests-flytecopilot 22.23% <ø> (ø)
unittests-flytectl 62.44% <ø> (+0.04%) ⬆️
unittests-flyteidl 6.95% <ø> (ø)
unittests-flyteplugins 53.83% <ø> (ø)
unittests-flytepropeller 43.10% <100.00%> (+0.02%) ⬆️
unittests-flytestdlib 55.31% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@eapolinario eapolinario merged commit f20b8aa into master Nov 12, 2024
50 of 51 checks passed
@eapolinario eapolinario deleted the union/upstream-dfa9801330da1f631cb4ca749a9c23c8adaff452 branch November 12, 2024 20:51
eapolinario added a commit that referenced this pull request Nov 21, 2024
eapolinario added a commit that referenced this pull request Nov 22, 2024
…" (#6045)

* Revert "[COR-2297/] Fix nested offloaded type validation (#552) (#5996)"

This reverts commit f20b8aa.

Signed-off-by: Eduardo Apolinario <[email protected]>

* Fix lint errors.

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants