Skip to content

Commit

Permalink
Merge branch 'main' into thiagohora/OPIK-904_split_find_project_into_…
Browse files Browse the repository at this point in the history
…find_and_stats_endpoint
  • Loading branch information
thiagohora authored Jan 30, 2025
2 parents 79756fe + 52a9f91 commit 3480e68
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 27 deletions.
20 changes: 19 additions & 1 deletion sdks/python/src/opik/integrations/bedrock/chunks_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
LOGGER = logging.getLogger(__name__)


def aggregate(items: List[Dict[str, Any]]) -> Dict[str, Any]:
def aggregate_converse_stream_chunks(items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Implementation is based on the following AWS example (see the section `Conversation with streaming example`).
https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference-examples.html
Expand Down Expand Up @@ -35,3 +35,21 @@ def aggregate(items: List[Dict[str, Any]]) -> Dict[str, Any]:
result["metrics"]["latencyMs"] = metadata["metrics"]["latencyMs"]

return result


def aggregate_invoke_agent_chunks(items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
The implementation was supposed to follow Amazon's documentation,
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent-runtime/client/invoke_agent.html
but at the time of writing this code, the `completion` payload only contains `chunks` and nothing else.
So, a simpler parsing approach was used.
"""
merged_chunks = b""

for item in items:
if "chunk" in item:
merged_chunks += item["chunk"]["bytes"]

result: Dict[str, Any] = {"output": merged_chunks.decode("utf-8")}

return result
21 changes: 6 additions & 15 deletions sdks/python/src/opik/integrations/bedrock/converse_decorator.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
import logging
from typing import List, Any, Dict, Optional, Callable, Tuple, Union, TypedDict, cast
from opik import dict_utils
from opik.decorator import base_track_decorator, arguments_helpers

from . import stream_wrappers
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast

from botocore import eventstream
from opik import dict_utils
from opik.decorator import arguments_helpers, base_track_decorator
from . import helpers, stream_wrappers

LOGGER = logging.getLogger(__name__)

KWARGS_KEYS_TO_LOG_AS_INPUTS = ["messages", "system", "toolConfig", "guardrailConfig"]
RESPONSE_KEYS_TO_LOG_AS_OUTPUTS = ["output"]

BedrockResponseWithStream = Dict[str, Any]


class ConverseStreamOutput(TypedDict):
stream: eventstream.EventStream
ResponseMetadata: Dict[str, Any]


class BedrockConverseDecorator(base_track_decorator.BaseTrackDecorator):
"""
Expand Down Expand Up @@ -85,7 +76,7 @@ def _generators_handler( # type: ignore
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], Any]],
) -> Union[
ConverseStreamOutput,
helpers.ConverseStreamOutput,
None,
]:
DECORATED_FUNCTION_IS_NOT_EXPECTED_TO_RETURN_GENERATOR = (
Expand All @@ -111,7 +102,7 @@ def _generators_handler( # type: ignore
)

output["stream"] = wrapped_stream
return cast(ConverseStreamOutput, output)
return cast(helpers.ConverseStreamOutput, output)

STREAM_NOT_FOUND = None

Expand Down
8 changes: 8 additions & 0 deletions sdks/python/src/opik/integrations/bedrock/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Any, Dict, TypedDict

from botocore import eventstream


class ConverseStreamOutput(TypedDict):
stream: eventstream.EventStream
ResponseMetadata: Dict[str, Any]
101 changes: 101 additions & 0 deletions sdks/python/src/opik/integrations/bedrock/invoke_agent_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast

from opik import dict_utils
from opik.decorator import arguments_helpers, base_track_decorator
from . import helpers, stream_wrappers

LOGGER = logging.getLogger(__name__)

KWARGS_KEYS_TO_LOG_AS_INPUTS = ["inputText"]
RESPONSE_KEYS_TO_LOG_AS_OUTPUTS = ["output"]


class BedrockInvokeAgentDecorator(base_track_decorator.BaseTrackDecorator):
"""
An implementation of BaseTrackDecorator designed specifically for tracking
calls of AWS bedrock client `invoke_agent` function.
Besides special processing for input arguments and response content, it
overrides _generators_handler() method to work correctly with bedrock's streams
"""

def _start_span_inputs_preprocessor(
self,
func: Callable,
track_options: arguments_helpers.TrackOptions,
args: Optional[Tuple],
kwargs: Optional[Dict[str, Any]],
) -> arguments_helpers.StartSpanParameters:
assert (
kwargs is not None
), "Expected kwargs to be not None in BedrockRuntime.Client.invoke_agent(**kwargs)"

name = track_options.name if track_options.name is not None else func.__name__
input, metadata = dict_utils.split_dict_by_keys(
kwargs, KWARGS_KEYS_TO_LOG_AS_INPUTS
)
metadata["created_from"] = "bedrock"
tags = ["bedrock"]

result = arguments_helpers.StartSpanParameters(
name=name,
input=input,
type=track_options.type,
tags=tags,
metadata=metadata,
project_name=track_options.project_name,
)

return result

def _end_span_inputs_preprocessor(
self, output: Any, capture_output: bool
) -> arguments_helpers.EndSpanParameters:
output, metadata = dict_utils.split_dict_by_keys(
output, RESPONSE_KEYS_TO_LOG_AS_OUTPUTS
)
result = arguments_helpers.EndSpanParameters(
output=output,
metadata=metadata,
)

return result

def _generators_handler( # type: ignore
self,
output: Any,
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], Any]],
) -> Union[
helpers.ConverseStreamOutput,
None,
]:
DECORATED_FUNCTION_IS_NOT_EXPECTED_TO_RETURN_GENERATOR = (
generations_aggregator is None
)

if DECORATED_FUNCTION_IS_NOT_EXPECTED_TO_RETURN_GENERATOR:
return None

assert generations_aggregator is not None

if isinstance(output, dict) and "completion" in output:
span_to_end, trace_to_end = base_track_decorator.pop_end_candidates()

wrapped_stream = stream_wrappers.wrap_stream(
stream=output["completion"],
capture_output=capture_output,
span_to_end=span_to_end,
trace_to_end=trace_to_end,
generations_aggregator=generations_aggregator,
response_metadata=output["ResponseMetadata"],
finally_callback=self._after_call,
)

output["completion"] = wrapped_stream
return cast(helpers.ConverseStreamOutput, output)

STREAM_NOT_FOUND = None

return STREAM_NOT_FOUND
44 changes: 33 additions & 11 deletions sdks/python/src/opik/integrations/bedrock/opik_tracker.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,61 @@
from typing import Any, Optional
from . import converse_decorator
from . import chunks_aggregator
from typing import Optional, TYPE_CHECKING

from . import chunks_aggregator, converse_decorator, invoke_agent_decorator

def track_bedrock(client: Any, project_name: Optional[str] = None) -> Any:
if TYPE_CHECKING:
import botocore.client


def track_bedrock(
client: "botocore.client.BaseClient",
project_name: Optional[str] = None,
) -> "botocore.client.BaseClient":
"""Adds Opik tracking to an AWS Bedrock client.
Tracks calls to `converse()` and `converse_stream()` methods
Can be used within other Opik-tracked functions.
Args:
client: An instance of an AWS Bedrock client.
client: An instance of an AWS Bedrock client (botocore.client.BedrockRuntime or botocore.client.AgentsforBedrockRuntime).
project_name: The name of the project to log data.
Returns:
The modified bedrock client with Opik tracking enabled.
"""
decorator = converse_decorator.BedrockConverseDecorator()
decorator_for_converse = converse_decorator.BedrockConverseDecorator()
decorator_for_invoke_agent = invoke_agent_decorator.BedrockInvokeAgentDecorator()

if not hasattr(client.converse, "opik_tracked"):
wrapper = decorator.track(
if hasattr(client, "invoke_agent") and not hasattr(
client.invoke_agent, "opik_tracked"
):
wrapper = decorator_for_invoke_agent.track(
type="llm",
name="bedrock_invoke_agent",
project_name=project_name,
generations_aggregator=chunks_aggregator.aggregate_invoke_agent_chunks,
)
tracked_invoke_agent = wrapper(client.invoke_agent)
client.invoke_agent = tracked_invoke_agent

if hasattr(client, "converse") and not hasattr(client.converse, "opik_tracked"):
wrapper = decorator_for_converse.track(
type="llm",
name="bedrock_converse",
project_name=project_name,
)
tracked_converse = wrapper(client.converse)
client.converse = tracked_converse

if not hasattr(client.converse_stream, "opik_tracked"):
stream_wrapper = decorator.track(
if hasattr(client, "converse_stream") and not hasattr(
client.converse_stream, "opik_tracked"
):
stream_wrapper = decorator_for_converse.track(
type="llm",
name="bedrock_converse_stream",
project_name=project_name,
generations_aggregator=chunks_aggregator.aggregate,
generations_aggregator=chunks_aggregator.aggregate_converse_stream_chunks,
)
tracked_converse_stream = stream_wrapper(client.converse_stream)
client.converse_stream = tracked_converse_stream

return client

0 comments on commit 3480e68

Please sign in to comment.