-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Chatgpt Agent #1822
Chatgpt Agent #1822
Conversation
For the databricks agent
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #1822 +/- ##
===========================================
- Coverage 85.94% 18.71% -67.24%
===========================================
Files 308 332 +24
Lines 22946 31410 +8464
Branches 3468 3084 -384
===========================================
- Hits 19721 5878 -13843
- Misses 2621 25448 +22827
+ Partials 604 84 -520 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Future Outlier <[email protected]>
…-Outlier/flytekit into chatgpt-agent-sync-plugin
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
…-Outlier/flytekit into chatgpt-agent-sync-plugin
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
…nto chatgpt-agent-sync-plugin
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
…nto chatgpt-agent-sync-plugin
Signed-off-by: Future Outlier <[email protected]>
""" | ||
raise NotImplementedError | ||
|
||
def get_custom(self, settings: SerializationSettings = None) -> Dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have to use pickle?
cc @pingsutw do you know? is this to maintain simply python coversion? This is potentially dangerous as it may break across python versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pickle is faster, we've talked about this before and Kevin told me that it might be ok.
Will there be multiple python versions usecases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we cannot control backend and flytekit version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, I fix it!
Thank you very much for your advice!
…nto chatgpt-agent-sync-plugin Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
…nto chatgpt-agent-sync-plugin
Signed-off-by: Future Outlier <[email protected]>
def __init__(self): | ||
super().__init__(task_type=TASK_TYPE, asynchronous=True) | ||
|
||
async def async_create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would change this to SyncAgentBase as follows
class SyncAgentBase(AgentBase):
@final
async def async_create( self,
context: grpc.ServicerContext, io_ctx: IOContext, task_template: TaskTemplate,
) -> CreateTaskResponse:
do(context, output_prefix, task_template, inputs)
def async do(context, io_ctx, task_template):
python_interface_inputs = {
name: TypeEngine.guess_python_type(lt.type) for name, lt in task_template.interface.inputs.items()
}
ctx = FlyteContextManager.current_context()
native_inputs = {}
if inputs:
native_inputs = TypeEngine.literal_map_to_kwargs(ctx, inputs, python_interface_inputs)
meta = task_template.custom
task_module = importlib.import_module(name=meta[TASK_MODULE])
task_def = getattr(task_module, meta[TASK_NAME])
config = jsonpickle.decode(meta[TASK_CONFIG_PKL]) if meta.get(TASK_CONFIG_PKL) else None
return task_def(TASK_TYPE, config=config).execute(**native_inputs)
async def execute(**kwargs):
raise NotImplementedError()
Checkout the final decorator - https://docs.python.org/3.8/library/typing.html#typing.final
Independently, lets are make the signature of all the get/create/delete methods simpler. Think, if we have to refactor the signature in the future how can you do it easily?
@dataclass
class IOContext():
inputs: LiteralMap
output_prefix: str
OR change the signature to have **kwargs
so that we maintain ease of refactoring
def __init__(self, *args, **kwargs):
...
async def create(ctx, inputs, outputs, task_template, **kwargs):
...
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
no changes here yet? |
I will update it later. |
Signed-off-by: Future Outlier <[email protected]>
I am considering the following changes.
Note: |
Signed-off-by: Future Outlier <[email protected]>
This PR will be deprecated. |
TL;DR
DoTask
functionTaskExecutor
class, a base agent for all plugins withDoTask
functionCreate a ChatGPT plugin by OpenAI API
Support local execution for do task function
Do refactor for base agent
Add 2 variable in
base_agent.py
.ASYNC_PLUGIN
andSYNC_PLUGIN
. They are use inruntime flavor
fromTaskMetadata
Use the
runtime_flavor
inTaskMetadata
(It hasn't been used before in flytekit)Change the usecase of
runtime_flavor
by adding commentsChange the usecase to the plugin routing mechanism
runtime_flavor
paramters in all agent task and webapi taskHere's the list.
plugins/flytekit-airflow
plugins/flytekit-aws-athena
plugins/flytekit-aws-batch
plugins/flytekit-bigquery
plugins/flytekit-mmcloud
plugins/flytekit-snowflake
plugins/flytekit-spark
flytekit/sensor
I get the list from
find . | grep agent.py
under theflytekit
folderNote: ChatGPT doensn't need it because the
external api task
has done it for it.Add test for the above changes
Finish above without introducing breaking changes
Currently, ChatGPT timeout works when retrying the second time in an API call, which means that if the query takes you 40seconds, you have to wait 40seconds.
This is not ideal, we will improve it when a new server integrates into Flyte Backend System. Currently, we can set the timeout settings in the config-map.
Additional Information
Note: the timeout of the ChatGPT API will check the logic when retry,
so I highly recommend users change the timeout of the
DoTask
function to 40 seconds,which will probably not exceed the limit, and we will introduce a
api_task_server
to fix this situation.Note: If we use aiohttp to prevent the timeout by setting the timeout time, it is doable,
but in my current experiment,
openai.acreate
is faster than using aiohttp (I haven't figure it out).Here is the proof:
https://hackmd.io/Fmx3DHGtQBSqYydcrdF_Cg
Type
Are all requirements met?
How to test it?
Configuration Example
Run in Remote Environment
You can test the agent with Dockerfile.
Screen Shot
Note: You can find that the ChatGPT task is an
api_task
.How it works at high level
Tracking Issue
flyteorg/flyte#3936