-
Notifications
You must be signed in to change notification settings - Fork 674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flytepropeller] [compiler] Support non-Any Python types as Any input in workflows #5408
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #5408 +/- ##
==========================================
- Coverage 61.10% 61.07% -0.04%
==========================================
Files 793 793
Lines 51164 51232 +68
==========================================
+ Hits 31265 31288 +23
- Misses 17027 17068 +41
- Partials 2872 2876 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
97406d8
to
ef74a47
Compare
Should we try to add a new type scalar Any here? |
Signed-off-by: Future-Outlier <[email protected]>
Not sure if we should treat message AnyType {
LiteralType variant = 1;
}
|
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Hold on - I am not sure we should do this? |
@kumare3 isn't this necessary for gradual typing that serializes as pickle (at least for Python)? |
My concern is - how do we coerce the type into another type? I think once something is any it has to be any all the way through. |
At deserialization time, once we unpickle the object, can't we do from flytekit import task, workflow
from typing import Any
@task
def foo(a: Any) -> Any:
if type(a) == int:
return a + 1
return 0
@task
def bar(a: int) -> int:
return a * 2
@workflow
def wf(a: int) -> int:
x = foo(a=a) # here x is Any
return bar(a=x) # unpickle `x` and try to and coerce to `int` at deserialization time This will work for most cases but not for things like files, directories, very large dataframes (which would be crazy to annotate with Any) |
I'm personally also not convinced we need to support |
I will list why we need this and how we can do it today. |
@kumare3 @pingsutw @cosmicBboy @fg91 @EngHabu Why do we need this PR?In the upcoming agent, LangChain, we will use However, our ideal LangChain workflow might look like this: import os
from typing import Any, Union
from flytekit import workflow
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts.prompt import PromptTemplate
from langchain_openai import ChatOpenAI
api_key = os.environ.get("OPENAI_API_KEY")
model = ChatOpenAI(
model="gpt-3.5-turbo",
openai_api_key=api_key,
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
)
prompt = PromptTemplate(
input_variables=["question"],
template="Question: {question}?",
)
output_parser = StrOutputParser()
@workflow
def wf(input: str) -> Union[str, Any]:
message = prompt(input=input)
o0 = model(input=message)
o1 = output_parser(input=o0)
return o1 It's necessary for users to input Reference for Flytekit LangChain task implementation: https://github.com/flyteorg/flytekit/pull/2436/files#diff-76c7b754bdfc3be1caaba940a66376edd7f443185fe492e7ac16a142cf63c70dR54-R55 How can we support this in Flytekit?
class TypeEngine(typing.Generic[T]):
@classmethod
def to_literal(cls, ctx: FlyteContext, python_val: typing.Any, python_type: Type, expected: LiteralType) -> Literal:
# The `metadata` field is added because of the artifact feature
try:
metadata.update({"python_dotted_path": f"{python_type.__module__}.{python_type.__qualname__}"})
lv.set_metadata(metadata=metadata)
except AttributeError as e:
logger.warning(f"Attribute error occurred: {e}")
class FlytePickleTransformer(TypeTransformer[FlytePickle]):
def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
try:
uri = lv.scalar.blob.uri
return FlytePickle.from_pickle(uri)
except Exception as e:
from pydoc import locate
metadata = lv.metadata
if metadata and metadata.get("python_dotted_path"):
python_dotted_path = metadata.get("python_dotted_path")
py_type = locate(python_dotted_path)
if py_type != typing.Any:
return TypeEngine.to_python_value(ctx, lv, py_type)
raise e PR reference: flyteorg/flytekit#2432 Pros
Cons
|
06/06/2024 Contributors meetup notes: while this change is not considered completely necessary, more follow-up discussion to come. |
Hi, folks.
for example: So the example @cosmicboys created should fail. @task
def foo(a: Any) -> Any:
if type(a) == int:
return a + 1
return 0
@task
def bar(a: int) -> int:
return a * 2
@workflow
def wf(a: int) -> int:
x = foo(a=a) # here x is Any
return bar(a=x) # here should show compile error
We can use the field metadata in our literal value. For example: def to_literal:
# lv is LiteralValue
lv.metadata.update({"python_type": str(python_type)}) FlytePickleTransformer def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
metadata = lv.metadata
try:
uri = lv.scalar.blob.uri
if lv.scalar.blob.metadata.type.format == self.PYTHON_PICKLE_FORMAT:
return FlytePickle.from_pickle(uri)
elif lv.scalar.blob.metadata.type.dimensionality == BlobType.BlobDimensionality.MULTIPART:
return TypeEngine.to_python_value(ctx, lv, FlyteDirectory)
elif lv.scalar.blob.metadata.type.dimensionality == BlobType.BlobDimensionality.SINGLE:
return TypeEngine.to_python_value(ctx, lv, FlyteFile)
except Exception as e:
metadata = lv.metadata
if metadata and metadata.get("python_type"):
python_type = metadata.get("python_type")
py_type = eval(python_type) # turn 'list[int]' to list[int]
if py_type != typing.Any:
return TypeEngine.to_python_value(ctx, lv, py_type)
raise e
def get_literal_type(self, t: Type[T]) -> LiteralType:
lt = LiteralType(
blob=_core_types.BlobType(
format=self.PYTHON_PICKLE_FORMAT, dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE
)
)
lt.metadata = {"python_class_name": str(t)}
lt.metadata = {"isAny": str(t == typing.Any)} # check downstream type is any or not in propeller's compiler
return lt Does this implementation look good to you all? |
Tracking issue
#5366
Why are the changes needed?
We want to allow the Any type to accept all kinds of inputs
and outputs, not just the types we currently can't handle.For example, workflow like this.
Potential Issues
Note: This PR only supports
Any
in python, if we want to support java, we will need to add more code.What changes were proposed in this pull request?
isTypeAny
to check ifupstreamType
ordownstreamType
in the workflow isAny
in Python.isTypeAny
function. IforupstreamType
downstreamType
isAny
in Python, the function should returntrue
, indicating that the types are castable and the compilation passes.How was this patch tested?
unit test and remote cluster
Setup process
Screenshots
Check all the applicable boxes
Related PRs
flyteorg/flytekit#2432