Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for stream pipeline decorator #543

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

rayrayraykk
Copy link
Collaborator

@rayrayraykk rayrayraykk commented Feb 28, 2025


name: add support for stream pipeline decorator
about: make the agentscope app a generator with simple decorator

Description

This pull request aims to enhance the agentscope application by introducing support for a stream pipeline decorator, which facilitates better compatibility with other API server pipelines. The main goal is to enable the agentscope app to return data in a streaming manner, allowing it to integrate more effectively with various API servers that require or benefit from streaming data capabilities.

Key Features

Pipeline Decorator: Implements a @pipeline decorator that transforms the application into a generator, enabling streaming data output.

Example Usage:

# -*- coding: utf-8 -*-
"""A simple example for conversation between user and assistant agent."""
import os
import agentscope
from agentscope.agents import DialogAgent
from agentscope.agents.user_agent import UserAgent
from agentscope.pipelines.functional import sequentialpipeline
from agentscope.utils.common import pipeline


@pipeline
def main() -> None:
    """A basic conversation demo"""

    agentscope.init(
        model_configs=[
            {
                "model_type": "dashscope_chat",
                "config_name": "qwen-max",
                "model_name": "qwen-max",
                "api_key": os.getenv("DASHSCOPE_API_KEY"),
                "stream": True,
            },
        ],
        project="Multi-Agent Conversation",
        save_api_invoke=True,
    )

    # Init two agents
    dialog_agent = DialogAgent(
        name="Assistant",
        sys_prompt="You're a helpful assistant.",
        model_config_name="qwen-max",  # replace by your model config name
    )
    user_agent = UserAgent()

    # start the conversation between user and assistant
    x = None
    while x is None or x.content != "exit":
        x = sequentialpipeline([user_agent, dialog_agent], x)


if __name__ == "__main__":
    for index, msg in enumerate(main()):
        print(index, msg.name, msg.content)

Checklist

Please check the following items before code is ready to be reviewed.

  • Code has passed all tests
  • Docstrings have been added/updated in Google Style
  • Documentation has been updated
  • Code is ready for review

@rayrayraykk
Copy link
Collaborator Author

@pan-x-c Please help me to check the compatibility in distributed mode, thanks.

def wrapper(*args: Any, **kwargs: Any) -> Generator:
from ..logging import get_msg_instances, clear_msg_instances

thread_id = "pipeline" + str(uuid.uuid4())
Copy link
Collaborator

@DavdGao DavdGao Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use a thread here?

Copy link
Collaborator Author

@rayrayraykk rayrayraykk Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the generator serves as an API, we need to continue running the main program as a child thread and push messages to a request-isolated place (i.e., _MSG_INSTANCE[thread_id]). Then, we use the main thread to retrieve the information and return it as a yield.

Copy link
Collaborator

@DavdGao DavdGao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see inline comments.

Besides, if we only want to obtain the application-level streaming output, how about use a hook function within the agent? So that

  1. we don't need to use threading (which maybe dangerous in dsitributed mode).
  2. we can extend it to tts easily.
  3. it's explicit for the developers and users
    E.g.
agent = Agent(
    # xxx
)

agent.register_speak_pre_hook()
agent.register_speak_post_hook()
agent.register_reply_pre_hook()
agent.register_reply_post_hook()
# ...

# for application level output
def send_to_somewhere(agent, input):
    """
    Args: 
        agent (`AgentBase`):
            The agent module itself
        input (`message | Generator`):
            # ...
    """
    # ...

agent.register_speak_pre_hook(
    send_to_somewhere
)

So that we can re-use this module when we implement tts functionality (e.g. speak out the message when calling the speak function)

@rayrayraykk
Copy link
Collaborator Author

Please see inline comments.

Besides, if we only want to obtain the application-level streaming output, how about use a hook function within the agent? So that

  1. we don't need to use threading (which maybe dangerous in dsitributed mode).
  2. we can extend it to tts easily.
  3. it's explicit for the developers and users
    E.g.
agent = Agent(
    # xxx
)

agent.register_speak_pre_hook()
agent.register_speak_post_hook()
agent.register_reply_pre_hook()
agent.register_reply_post_hook()
# ...

# for application level output
def send_to_somewhere(agent, input):
    """
    Args: 
        agent (`AgentBase`):
            The agent module itself
        input (`message | Generator`):
            # ...
    """
    # ...

agent.register_speak_pre_hook(
    send_to_somewhere
)

So that we can re-use this module when we implement tts functionality (e.g. speak out the message when calling the speak function)

Please see inline comments.

Besides, if we only want to obtain the application-level streaming output, how about use a hook function within the agent? So that

  1. we don't need to use threading (which maybe dangerous in dsitributed mode).
  2. we can extend it to tts easily.
  3. it's explicit for the developers and users
    E.g.
agent = Agent(
    # xxx
)

agent.register_speak_pre_hook()
agent.register_speak_post_hook()
agent.register_reply_pre_hook()
agent.register_reply_post_hook()
# ...

# for application level output
def send_to_somewhere(agent, input):
    """
    Args: 
        agent (`AgentBase`):
            The agent module itself
        input (`message | Generator`):
            # ...
    """
    # ...

agent.register_speak_pre_hook(
    send_to_somewhere
)

So that we can re-use this module when we implement tts functionality (e.g. speak out the message when calling the speak function)

Using the hook function to send messages is a good approach. However, to convert the application into a generator, it may still require concurrent threads or processes (one send, one get). Additionally, the model's response generator can only be yielded once, which already occurs in the speak function. To implement this change, we would need to modify the existing structure to accommodate this adjustment.

@DavdGao
Copy link
Collaborator

DavdGao commented Mar 7, 2025

Please see inline comments.
Besides, if we only want to obtain the application-level streaming output, how about use a hook function within the agent? So that

  1. we don't need to use threading (which maybe dangerous in dsitributed mode).
  2. we can extend it to tts easily.
  3. it's explicit for the developers and users
    E.g.
agent = Agent(
    # xxx
)

agent.register_speak_pre_hook()
agent.register_speak_post_hook()
agent.register_reply_pre_hook()
agent.register_reply_post_hook()
# ...

# for application level output
def send_to_somewhere(agent, input):
    """
    Args: 
        agent (`AgentBase`):
            The agent module itself
        input (`message | Generator`):
            # ...
    """
    # ...

agent.register_speak_pre_hook(
    send_to_somewhere
)

So that we can re-use this module when we implement tts functionality (e.g. speak out the message when calling the speak function)

Please see inline comments.
Besides, if we only want to obtain the application-level streaming output, how about use a hook function within the agent? So that

  1. we don't need to use threading (which maybe dangerous in dsitributed mode).
  2. we can extend it to tts easily.
  3. it's explicit for the developers and users
    E.g.
agent = Agent(
    # xxx
)

agent.register_speak_pre_hook()
agent.register_speak_post_hook()
agent.register_reply_pre_hook()
agent.register_reply_post_hook()
# ...

# for application level output
def send_to_somewhere(agent, input):
    """
    Args: 
        agent (`AgentBase`):
            The agent module itself
        input (`message | Generator`):
            # ...
    """
    # ...

agent.register_speak_pre_hook(
    send_to_somewhere
)

So that we can re-use this module when we implement tts functionality (e.g. speak out the message when calling the speak function)

Using the hook function to send messages is a good approach. However, to convert the application into a generator, it may still require concurrent threads or processes (one send, one get). Additionally, the model's response generator can only be yielded once, which already occurs in the speak function. To implement this change, we would need to modify the existing structure to accommodate this adjustment.

In this case, we just need to create a "during"(maybe some other name) hook.

def speak(self, msg: Union[Msg, Generator]):
    if isinstance(msg, Generator):
        for chunk in msg:
            // call hook function
            for func in self.__hooks_during_speak:
                func(self, chunk)
            // normal processing
            log_stream_msg(xxx)
            # ...

@rayrayraykk
Copy link
Collaborator Author

After discussion, we should implement pushing msg to somewhere in a hook-like function whin the AgentBase. I' will do it after hongyi's PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants