Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OPIK-686] [SDK] [FR]: Add Support for Bedrock Invoke_Agent API #1166

Merged
merged 5 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion sdks/python/src/opik/integrations/bedrock/chunks_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
LOGGER = logging.getLogger(__name__)


def aggregate(items: List[Dict[str, Any]]) -> Dict[str, Any]:
def aggregate_converse_stream_chunks(items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Implementation is based on the following AWS example (see the section `Conversation with streaming example`).
https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference-examples.html
Expand Down Expand Up @@ -35,3 +35,21 @@ def aggregate(items: List[Dict[str, Any]]) -> Dict[str, Any]:
result["metrics"]["latencyMs"] = metadata["metrics"]["latencyMs"]

return result


def aggregate_invoke_agent_chunks(items: List[Dict[str, Any]]) -> Dict[str, Any]:
japdubengsub marked this conversation as resolved.
Show resolved Hide resolved
"""
The implementation was supposed to follow Amazon's documentation,
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent-runtime/client/invoke_agent.html
but at the time of writing this code, the `completion` payload only contains `chunks` and nothing else.
So, a simpler parsing approach was used.
"""
merged_chunks = b""

for item in items:
if "chunk" in item:
merged_chunks += item["chunk"]["bytes"]

result: Dict[str, Any] = {"output": merged_chunks.decode("utf-8")}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to track not only the output.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent-runtime/client/invoke_agent.html take a look at response format

I can see that there is also usage information. Pay attention that the agent has multiple steps of foundation model invocation - preprocessing, postprocessing, orchestration, each of them has their own usage.


return result
21 changes: 6 additions & 15 deletions sdks/python/src/opik/integrations/bedrock/converse_decorator.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
import logging
from typing import List, Any, Dict, Optional, Callable, Tuple, Union, TypedDict, cast
from opik import dict_utils
from opik.decorator import base_track_decorator, arguments_helpers

from . import stream_wrappers
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast

from botocore import eventstream
from opik import dict_utils
from opik.decorator import arguments_helpers, base_track_decorator
from . import helpers, stream_wrappers

LOGGER = logging.getLogger(__name__)

KWARGS_KEYS_TO_LOG_AS_INPUTS = ["messages", "system", "toolConfig", "guardrailConfig"]
RESPONSE_KEYS_TO_LOG_AS_OUTPUTS = ["output"]

BedrockResponseWithStream = Dict[str, Any]


class ConverseStreamOutput(TypedDict):
stream: eventstream.EventStream
ResponseMetadata: Dict[str, Any]


class BedrockConverseDecorator(base_track_decorator.BaseTrackDecorator):
"""
Expand Down Expand Up @@ -85,7 +76,7 @@ def _generators_handler( # type: ignore
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], Any]],
) -> Union[
ConverseStreamOutput,
helpers.ConverseStreamOutput,
None,
]:
DECORATED_FUNCTION_IS_NOT_EXPECTED_TO_RETURN_GENERATOR = (
Expand All @@ -111,7 +102,7 @@ def _generators_handler( # type: ignore
)

output["stream"] = wrapped_stream
return cast(ConverseStreamOutput, output)
return cast(helpers.ConverseStreamOutput, output)

STREAM_NOT_FOUND = None

Expand Down
8 changes: 8 additions & 0 deletions sdks/python/src/opik/integrations/bedrock/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Any, Dict, TypedDict

from botocore import eventstream


class ConverseStreamOutput(TypedDict):
stream: eventstream.EventStream
ResponseMetadata: Dict[str, Any]
101 changes: 101 additions & 0 deletions sdks/python/src/opik/integrations/bedrock/invoke_agent_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast

from opik import dict_utils
from opik.decorator import arguments_helpers, base_track_decorator
from . import helpers, stream_wrappers

LOGGER = logging.getLogger(__name__)

KWARGS_KEYS_TO_LOG_AS_INPUTS = ["inputText"]
RESPONSE_KEYS_TO_LOG_AS_OUTPUTS = ["output"]


class BedrockInvokeAgentDecorator(base_track_decorator.BaseTrackDecorator):
"""
An implementation of BaseTrackDecorator designed specifically for tracking
calls of AWS bedrock client `invoke_agent` function.
Besides special processing for input arguments and response content, it
overrides _generators_handler() method to work correctly with bedrock's streams
"""

def _start_span_inputs_preprocessor(
self,
func: Callable,
track_options: arguments_helpers.TrackOptions,
args: Optional[Tuple],
kwargs: Optional[Dict[str, Any]],
) -> arguments_helpers.StartSpanParameters:
assert (
kwargs is not None
), "Expected kwargs to be not None in BedrockRuntime.Client.invoke_agent(**kwargs)"

name = track_options.name if track_options.name is not None else func.__name__
input, metadata = dict_utils.split_dict_by_keys(
kwargs, KWARGS_KEYS_TO_LOG_AS_INPUTS
)
metadata["created_from"] = "bedrock"
tags = ["bedrock"]

result = arguments_helpers.StartSpanParameters(
name=name,
input=input,
type=track_options.type,
tags=tags,
metadata=metadata,
project_name=track_options.project_name,
)

return result

def _end_span_inputs_preprocessor(
self, output: Any, capture_output: bool
) -> arguments_helpers.EndSpanParameters:
output, metadata = dict_utils.split_dict_by_keys(
output, RESPONSE_KEYS_TO_LOG_AS_OUTPUTS
)
result = arguments_helpers.EndSpanParameters(
output=output,
metadata=metadata,
)

return result

def _generators_handler( # type: ignore
self,
output: Any,
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], Any]],
) -> Union[
helpers.ConverseStreamOutput,
None,
]:
DECORATED_FUNCTION_IS_NOT_EXPECTED_TO_RETURN_GENERATOR = (
generations_aggregator is None
)

if DECORATED_FUNCTION_IS_NOT_EXPECTED_TO_RETURN_GENERATOR:
return None

assert generations_aggregator is not None

if isinstance(output, dict) and "completion" in output:
span_to_end, trace_to_end = base_track_decorator.pop_end_candidates()

wrapped_stream = stream_wrappers.wrap_stream(
stream=output["completion"],
capture_output=capture_output,
span_to_end=span_to_end,
trace_to_end=trace_to_end,
generations_aggregator=generations_aggregator,
response_metadata=output["ResponseMetadata"],
finally_callback=self._after_call,
)

output["completion"] = wrapped_stream
return cast(helpers.ConverseStreamOutput, output)

STREAM_NOT_FOUND = None

return STREAM_NOT_FOUND
44 changes: 33 additions & 11 deletions sdks/python/src/opik/integrations/bedrock/opik_tracker.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,61 @@
from typing import Any, Optional
from . import converse_decorator
from . import chunks_aggregator
from typing import Optional, TYPE_CHECKING

from . import chunks_aggregator, converse_decorator, invoke_agent_decorator

def track_bedrock(client: Any, project_name: Optional[str] = None) -> Any:
if TYPE_CHECKING:
import botocore.client


def track_bedrock(
client: "botocore.client.BaseClient",
project_name: Optional[str] = None,
) -> "botocore.client.BaseClient":
"""Adds Opik tracking to an AWS Bedrock client.
Tracks calls to `converse()` and `converse_stream()` methods
Can be used within other Opik-tracked functions.
Args:
client: An instance of an AWS Bedrock client.
client: An instance of an AWS Bedrock client (botocore.client.BedrockRuntime or botocore.client.AgentsforBedrockRuntime).
project_name: The name of the project to log data.
Returns:
The modified bedrock client with Opik tracking enabled.
"""
decorator = converse_decorator.BedrockConverseDecorator()
decorator_for_converse = converse_decorator.BedrockConverseDecorator()
decorator_for_invoke_agent = invoke_agent_decorator.BedrockInvokeAgentDecorator()

if not hasattr(client.converse, "opik_tracked"):
wrapper = decorator.track(
if hasattr(client, "invoke_agent") and not hasattr(
client.invoke_agent, "opik_tracked"
):
wrapper = decorator_for_invoke_agent.track(
type="llm",
name="bedrock_invoke_agent",
project_name=project_name,
generations_aggregator=chunks_aggregator.aggregate_invoke_agent_chunks,
)
tracked_invoke_agent = wrapper(client.invoke_agent)
client.invoke_agent = tracked_invoke_agent

if hasattr(client, "converse") and not hasattr(client.converse, "opik_tracked"):
wrapper = decorator_for_converse.track(
type="llm",
name="bedrock_converse",
project_name=project_name,
)
tracked_converse = wrapper(client.converse)
client.converse = tracked_converse

if not hasattr(client.converse_stream, "opik_tracked"):
stream_wrapper = decorator.track(
if hasattr(client, "converse_stream") and not hasattr(
client.converse_stream, "opik_tracked"
):
stream_wrapper = decorator_for_converse.track(
type="llm",
name="bedrock_converse_stream",
project_name=project_name,
generations_aggregator=chunks_aggregator.aggregate,
generations_aggregator=chunks_aggregator.aggregate_converse_stream_chunks,
)
tracked_converse_stream = stream_wrapper(client.converse_stream)
client.converse_stream = tracked_converse_stream

return client