Skip to content

Commit

Permalink
fix(media): add timeout to reference resolution (#1037)
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp authored Dec 11, 2024
1 parent 7e1ce87 commit e7189a0
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 9 deletions.
76 changes: 72 additions & 4 deletions langfuse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from langfuse.api.resources.utils.resources.pagination.types.meta_response import (
MetaResponse,
)
from langfuse.api.resources.media import GetMediaResponse
from langfuse.model import (
ChatMessageDict,
ChatPromptClient,
Expand All @@ -74,6 +75,7 @@
from langfuse.environment import get_common_release_envs
from langfuse.logging import clean_logger
from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails
from langfuse.media import LangfuseMedia
from langfuse.request import LangfuseClient
from langfuse.types import MaskFunction, ScoreDataType, SpanLevel
from langfuse.utils import _convert_usage_input, _create_prompt_context, _get_timestamp
Expand Down Expand Up @@ -111,6 +113,13 @@ class FetchObservationResponse:
data: Observation


@dataclass
class FetchMediaResponse:
"""Response object for fetch_media method."""

data: GetMediaResponse


@dataclass
class FetchSessionsResponse:
"""Response object for fetch_sessions method."""
Expand Down Expand Up @@ -885,19 +894,78 @@ def fetch_observation(
handle_fern_exception(e)
raise e

def fetch_media(self, id: str):
def fetch_media(self, id: str) -> FetchMediaResponse:
"""Get media content by ID.
Args:
id: The identifier of the media content to fetch.
Returns:
Media object
FetchMediaResponse: The media data of the given id on `data`.
Raises:
Exception: If the media content could not be found or if an error occurred during the request.
Exception: If the media content with the given id could not be found within the authenticated project or if an error occurred during the request.
"""
try:
return FetchMediaResponse(data=self.client.media.get(id))
except Exception as e:
handle_fern_exception(e)
raise e

def resolve_media_references(
self,
*,
obj: Any,
resolve_with: Literal["base64_data_uri"],
max_depth: int = 10,
content_fetch_timeout_seconds: int = 10,
):
"""Replace media reference strings in an object with base64 data URIs.
This method recursively traverses an object (up to max_depth) looking for media reference strings
in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using
the provided Langfuse client and replaces the reference string with a base64 data URI.
If fetching media content fails for a reference string, a warning is logged and the reference
string is left unchanged.
Args:
obj: The object to process. Can be a primitive value, array, or nested object.
If the object has a __dict__ attribute, a dict will be returned instead of the original object type.
resolve_with: The representation of the media content to replace the media reference string with.
Currently only "base64_data_uri" is supported.
max_depth: int: The maximum depth to traverse the object. Default is 10.
content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 10.
Returns:
A deep copy of the input object with all media references replaced with base64 data URIs where possible.
If the input object has a __dict__ attribute, a dict will be returned instead of the original object type.
Example:
obj = {
"image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@",
"nested": {
"pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@"
}
}
result = await LangfuseMedia.resolve_media_references(obj, langfuse_client)
# Result:
# {
# "image": "...",
# "nested": {
# "pdf": "data:application/pdf;base64,JVBERi0xLjcK..."
# }
# }
"""
return self.client.media.get(id)
return LangfuseMedia.resolve_media_references(
langfuse_client=self,
obj=obj,
resolve_with=resolve_with,
max_depth=max_depth,
content_fetch_timeout_seconds=content_fetch_timeout_seconds,
)

def get_observation(
self,
Expand Down
9 changes: 6 additions & 3 deletions langfuse/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def resolve_media_references(
langfuse_client: Any,
resolve_with: Literal["base64_data_uri"],
max_depth: int = 10,
content_fetch_timeout_seconds: int = 10,
) -> T:
"""Replace media reference strings in an object with base64 data URIs.
Expand Down Expand Up @@ -258,7 +259,7 @@ def traverse(obj: Any, depth: int) -> Any:
if depth > max_depth:
return obj

# Handle string with potential media references
# Handle string
if isinstance(obj, str):
regex = r"@@@langfuseMedia:.+?@@@"
reference_string_matches = re.findall(regex, obj)
Expand All @@ -275,8 +276,10 @@ def traverse(obj: Any, depth: int) -> Any:
)
media_data = langfuse_client.fetch_media(
parsed_media_reference["media_id"]
).data
media_content = requests.get(
media_data.url, timeout=content_fetch_timeout_seconds
)
media_content = requests.get(media_data.url)
if not media_content.ok:
raise Exception("Failed to fetch media content")

Expand All @@ -289,7 +292,7 @@ def traverse(obj: Any, depth: int) -> Any:
base64_data_uri
)
except Exception as e:
logging.warning(
LangfuseMedia._log.warning(
f"Error fetching media content for reference string {reference_string}: {e}"
)
# Do not replace the reference string if there's an error
Expand Down
4 changes: 2 additions & 2 deletions tests/test_media.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ def test_replace_media_reference_string_in_object(tmp_path):
)

# Resolve media references back to base64
resolved_trace = LangfuseMedia.resolve_media_references(
obj=fetched_trace, langfuse_client=langfuse, resolve_with="base64_data_uri"
resolved_trace = langfuse.resolve_media_references(
obj=fetched_trace, resolve_with="base64_data_uri"
)

# Verify resolved base64 matches original
Expand Down

0 comments on commit e7189a0

Please sign in to comment.