Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Map task caching failures #2113

Merged
merged 4 commits into from
Jan 22, 2024
Merged

[Bug] Map task caching failures #2113

merged 4 commits into from
Jan 22, 2024

Conversation

pvditt
Copy link
Contributor

@pvditt pvditt commented Jan 18, 2024

Tracking issue

flyteorg/flyte#4731

flyteorg/flyte#4702

Why are the changes needed?

Legacy map tasks and new map tasks include a generated task interface as part of task names to support partially binding different parameters of the same mappable task. This introduces issues such as guaranteed cache misses when collection interface has types that include non-deterministic elements.

Example where the output contains a memory address causing for guaranteed cache misses:

@task()
def process_data(index: int) -> Annotated[pd.DataFrame, Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]]:
    ...

The same can be done for inputs.


There's an also issue with map task container args being non-deterministic if there are multiple bounded inputs (flyteorg/flyte#4702) of which can result in an error.

What changes were proposed in this pull request?

Utilize an ordered list of the bounded input names in favor of the entire interface. This is similar to this but disregards any Typed values, opting to only keep the keys/names.

Note, this will cause for all cache enabled map tasks to have cache misses on first runs as task names will all change.

How was this patch tested?

Setup process

Confirmed that intputs/outputs w/ memory address don't impact caching

def hash_pandas_dataframe(df: pd.DataFrame) -> str:
    return str(pd.util.hash_pandas_object(df))


@task()
def mem_output(index: int) -> Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]:
    target = [index] * 3
    df = pd.DataFrame({"target": target})
    return df


@task()
def mem_input(df: Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]) -> int:
    return df["target"].tolist()[0]


@workflow
def caching_df(
    l: List[int] = [1, 2, 3, 4],
) -> None:
    from flytekit.experimental import map_task
    partial_task = functools.partial(mem_output)
    outputs = map_task(partial_task, metadata=TaskMetadata(cache=True, cache_version="2"))(index=l)
    partial_task2 = functools.partial(mem_input)
    map_task(partial_task2, metadata=TaskMetadata(cache=True, cache_version="2"))(df=outputs)

Confirmed if partially binding different parameters for the same mapped task still worked:

@task
def square_add(i: int, j: int) -> List[int]:
   return [i*i + j]


@workflow
def wf(xs: List[int] = [1, 1, 1], ys: List[int] = [2, 2, 2]):
   square_fixed = functools.partial(square_add, j=10)
   add_fixed = functools.partial(square_add, i=2)
   map_task(square_fixed, metadata=TaskMetadata(cache=True, cache_version="1.0.0"))(i=xs)
   map_task(add_fixed, metadata=TaskMetadata(cache=True, cache_version="1.0.0"))(j=ys)

Screenshots

Screenshot 2024-01-17 at 5 57 40 PM

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Copy link

welcome bot commented Jan 18, 2024

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

Copy link

codecov bot commented Jan 18, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (7996c2e) 82.58% compared to head (09781b2) 85.78%.
Report is 3 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2113      +/-   ##
==========================================
+ Coverage   82.58%   85.78%   +3.19%     
==========================================
  Files         233      313      +80     
  Lines       19541    23514    +3973     
  Branches     3512     3515       +3     
==========================================
+ Hits        16138    20171    +4033     
+ Misses       2824     2734      -90     
- Partials      579      609      +30     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Paul Dittamo <[email protected]>
Copy link
Contributor

@wild-endeavor wild-endeavor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also update the args also actually?

Another issue is that when flytekit creates the command:

f'{",".join(t.bound_inputs)}',

the bound vars can show up in different orders. Can we order these first to make sure the args are always the same?

Same for the old map task:

f'{",".join(t.bound_inputs)}',

@wild-endeavor
Copy link
Contributor

if you sort then this should resolve flyteorg/flyte#4702 as well @pvditt

@pvditt
Copy link
Contributor Author

pvditt commented Jan 18, 2024

@wild-endeavor thanks for pointing that out. Will update this PR

@pvditt pvditt requested a review from wild-endeavor January 18, 2024 23:42
@@ -77,8 +77,9 @@ def __init__(
f = actual_task.lhs
else:
_, mod, f, _ = tracker.extract_task_module(cast(PythonFunctionTask, actual_task).task_function)
sorted_bounded_inputs = ",".join(sorted(self._bound_inputs))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: could also just turn self._bound_inputs into a sorted list as opposed to set - don't feel too strongly either way

@pvditt pvditt merged commit cba830e into master Jan 22, 2024
84 checks passed
Copy link

welcome bot commented Jan 22, 2024

Congrats on merging your first pull request! 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants