-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial make auto cache plugin #2912
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Daniel Sola <[email protected]>
Signed-off-by: Daniel Sola <[email protected]>
@eapolinario @cosmicBboy Here is a draft PR doing roughly what we talked about. Would be great to get your opinions before I implement more hashing methods. A couple questions:
|
Given a function, generates a version hash based on its source code and the salt. | ||
""" | ||
|
||
def __init__(self, salt: str = "salt") -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can the default value be ""
?
cache_versions = [item.get_version() for item in cache] | ||
task_hash = "".join(cache_versions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eapolinario does the cache version string have a character limit? If so we may need to re-hash the concatenated hashes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I had a feeling this would be an issue. I just left the long join for simplicity and to not introduce a hashing function in flytekit/core/task.py
, but happy to hash that.
Just putting some thoughts down: From the current design the order of the cache objects matters, which I'm okay with... I'd like to document the pattern to define one caching policy and re-using it like so: cache_policy = [CacheFunctionBody(), CachePrivateModules(), ...]
@task(cache=cache_policy)
def my_task(): ... I wonder if it would be useful to expose a # multiple cache objects with separate salts
cache_policy = [CacheFunctionBody(salt="a"), CachePrivateModules(salt="a"), ...]
# I just want to invalidate the cache, but it feels weird and arbitrary to pick one object to bump
cache_policy = [CacheFunctionBody(salt="a"), CachePrivateModules(salt="b"), ...] Doing this feels more natural cache_policy = [CacheFunctionBody(), CachePrivateModules(), Salt("a")]
# bump cache
cache_policy = [CacheFunctionBody(), CachePrivateModules(), Salt("b")] I still think it's nice to have a |
@@ -174,7 +175,7 @@ def task( | |||
def task( | |||
_task_function: Optional[Callable[P, FuncOut]] = None, | |||
task_config: Optional[T] = None, | |||
cache: bool = False, | |||
cache: Union[bool, list[AutoCache]] = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also accept a single AutoCache
object?
Thanks so much! Definitely agree re. a single salt. One idea I had was to use a dataclass where there is a list of Also, I like that pattern of defining the cache policy as a constant that should be defined outside of the task decorator and reused. +1 to that. |
@@ -343,10 +344,16 @@ def launch_dynamically(): | |||
""" | |||
|
|||
def wrapper(fn: Callable[P, Any]) -> PythonFunctionTask[T]: | |||
if isinstance(cache, list) and all(isinstance(item, AutoCache) for item in cache): | |||
cache_versions = [item.get_version() for item in cache] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to pass the function here
@dansola another idea is to have a class CachePolicy:
def __init__(self, *args, salt=""):
self.cache_objects = args
self.salt = salt
def get_version(self, func):
# use self.cache_objects to generate hashes
cache_policy = CachePolicy(
CacheFunctionBody(),
CachePrivateModules(),
...,
salt="mysalt"
)
@task(cache=cache_policy)
def task_fn(): ... edit: |
also if we go with the |
Yup I really like that idea. I added it to the PR where I make a new implementation for image caching here: #2944 Right now all the PRs are branching off each other. It's probably cleaner to consolidate. |
Why are the changes needed?
Make caching easier to use in flytekit by reducing cognitive burden of specifying cache versions
What changes were proposed in this pull request?
AutoCache
protocol added to flytekit with asalt
argument and aget_version
method. The task function is passed toget_version
in the task decorator to determine a cache version. Then the new cache version follows the same path a user created cache version would.auto_cache
plugin created which will contain implementations ofAutoCache
. The first one isCacheFunctionBody
which just checks the function body and ignores formatting and comments.How was this patch tested?
Unit tests added which verify versions are consistent and change when we expect. Since changes to the function name will cause a different hash, we move dummy functions to a separate directory and import them so we can keep the name the same but test that the hash changes with the contents change.
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link