Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OPIK-680] [FR]: Cost Tracking for VertexAI and LangChain #1055

Merged
merged 17 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lib-langchain-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
strategy:
fail-fast: true
matrix:
python_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python_version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- name: Check out code
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/src/opik/api_objects/opik_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@ def span(
output=output,
metadata=metadata,
tags=tags,
usage=parsed_usage.supported_usage,
usage=parsed_usage.full_usage
if provider == "google_vertexai"
else parsed_usage.supported_usage,
model=model,
provider=provider,
error_info=error_info,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging
from typing import Any, Dict, Optional, TYPE_CHECKING, Tuple, cast

from opik import logging_messages
from opik.types import LLMUsageInfo, UsageDict
from opik.validation import usage as usage_validator

if TYPE_CHECKING:
from langchain_core.tracers.schemas import Run

LOGGER = logging.getLogger(__name__)


def get_llm_usage_info(run_dict: Optional[Dict[str, Any]] = None) -> LLMUsageInfo:
if run_dict is None:
return LLMUsageInfo()

usage_dict = _try_get_token_usage(run_dict)
provider, model = _get_provider_and_model(run_dict)

return LLMUsageInfo(provider=provider, model=model, usage=usage_dict)


def _try_get_token_usage(run_dict: Dict[str, Any]) -> Optional[UsageDict]:
try:
usage_metadata = run_dict["outputs"]["generations"][-1][-1]["generation_info"][
"usage_metadata"
]

token_usage = UsageDict(
completion_tokens=usage_metadata["candidates_token_count"],
prompt_tokens=usage_metadata["prompt_token_count"],
total_tokens=usage_metadata["total_token_count"],
)
token_usage.update(usage_metadata)

if usage_validator.UsageValidator(token_usage).validate().ok():
return cast(UsageDict, token_usage)

return None
except Exception:
LOGGER.warning(
logging_messages.FAILED_TO_EXTRACT_TOKEN_USAGE_FROM_PRESUMABLY_LANGCHAIN_GOOGLE_LLM_RUN,
exc_info=True,
)
return None


def is_google_run(run: "Run") -> bool:
try:
if run.serialized is None:
return False

provider = run.metadata.get("ls_provider", "")
is_google = "google" in provider.lower()

return is_google

except Exception:
LOGGER.debug(
"Failed to check if Run instance is from Google LLM, returning False.",
exc_info=True,
)
return False


def _get_provider_and_model(
run_dict: Dict[str, Any],
) -> Tuple[Optional[str], Optional[str]]:
"""
Fetches the provider and model information from a given run dictionary.
"""
provider = None
model = None

if metadata := run_dict["extra"].get("metadata"):
provider = metadata.get("ls_provider")
model = metadata.get("ls_model_name")

return provider, model
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import dataclasses
import logging
from typing import Any, Dict, Optional, TYPE_CHECKING, Tuple, cast

from opik import logging_messages
from opik.types import UsageDict
from opik.types import LLMUsageInfo, UsageDict
from opik.validation import usage as usage_validator

if TYPE_CHECKING:
Expand All @@ -13,21 +12,14 @@
LOGGER = logging.getLogger(__name__)


@dataclasses.dataclass
class LLMUsageInfo:
provider: Optional[str] = None
model: Optional[str] = None
token_usage: Optional[UsageDict] = None


def get_llm_usage_info(run_dict: Optional[Dict[str, Any]] = None) -> LLMUsageInfo:
if run_dict is None:
return LLMUsageInfo()

usage_dict = _try_get_token_usage(run_dict)
provider, model = _get_provider_and_model(run_dict)

return LLMUsageInfo(provider=provider, model=model, token_usage=usage_dict)
return LLMUsageInfo(provider=provider, model=model, usage=usage_dict)


def _try_get_token_usage(run_dict: Dict[str, Any]) -> Optional[UsageDict]:
Expand Down
17 changes: 12 additions & 5 deletions sdks/python/src/opik/integrations/langchain/opik_tracer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import logging
from typing import Any, Dict, List, Literal, Optional, Set, TYPE_CHECKING
from opik.types import ErrorInfoDict

from opik.types import ErrorInfoDict, LLMUsageInfo

from langchain_core import language_models
from langchain_core.tracers import BaseTracer

from opik import dict_utils, opik_context
from opik.api_objects import opik_client, span, trace
from . import base_llm_patcher, openai_run_helpers, opik_encoder_extension
from . import (
base_llm_patcher,
google_run_helpers,
openai_run_helpers,
opik_encoder_extension,
)
from ...api_objects import helpers

if TYPE_CHECKING:
Expand Down Expand Up @@ -232,15 +238,16 @@ def _process_end_span(self, run: "Run") -> None:
# Langchain will call _persist_run for us
else:
span_data = self._span_data_map[run.id]
usage_info: LLMUsageInfo = LLMUsageInfo()

if openai_run_helpers.is_openai_run(run):
usage_info = openai_run_helpers.get_llm_usage_info(run_dict)
else:
usage_info = openai_run_helpers.get_llm_usage_info()
elif google_run_helpers.is_google_run(run):
usage_info = google_run_helpers.get_llm_usage_info(run_dict)

span_data.init_end_time().update(
output=run_dict["outputs"],
usage=usage_info.token_usage,
usage=usage_info.usage,
provider=usage_info.provider,
model=usage_info.model,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
import dataclasses
from typing import Any, Dict, Optional

from llama_index.core import Settings
from llama_index.core.base.llms.types import ChatResponse
from llama_index.core.callbacks import schema as llama_index_schema

from opik.types import UsageDict


@dataclasses.dataclass
class LLMUsageInfo:
provider: Optional[str] = None
model: Optional[str] = None
usage: Optional[UsageDict] = None
from opik.types import LLMUsageInfo


def get_span_input_from_events(
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/src/opik/logging_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
"Failed to extract token usage from presumably OpenAI LLM langchain run."
)

FAILED_TO_EXTRACT_TOKEN_USAGE_FROM_PRESUMABLY_LANGCHAIN_GOOGLE_LLM_RUN = (
"Failed to extract token usage from presumably Google LLM langchain run."
)

UNEXPECTED_EXCEPTION_ON_SPAN_CREATION_FOR_TRACKED_FUNCTION = "Unexpected exception happened when tried to create a span for function %s.\nInputs: %s\nError message: %s"

UNEXPECTED_EXCEPTION_ON_SPAN_FINALIZATION_FOR_TRACKED_FUNCTION = "Unexpected exception happened when tried to finalize span.\nOutput: %s\nError message: %s"
Expand Down
8 changes: 4 additions & 4 deletions sdks/python/src/opik/message_processing/messages.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import datetime
from typing import Optional, Any, Dict, List
from typing import Optional, Any, Dict, List, Union
from ..types import UsageDict, SpanType, ErrorInfoDict


Expand Down Expand Up @@ -61,7 +61,7 @@ class CreateSpanMessage(BaseMessage):
metadata: Optional[Dict[str, Any]]
tags: Optional[List[str]]
type: SpanType
usage: Optional[UsageDict]
usage: Optional[Union[UsageDict, Dict[str, int]]]
model: Optional[str]
provider: Optional[str]
error_info: Optional[ErrorInfoDict]
Expand All @@ -74,7 +74,7 @@ def as_payload_dict(self) -> Dict[str, Any]:

@dataclasses.dataclass
class UpdateSpanMessage(BaseMessage):
"Not recommended to use. Kept only for low level update operations in public API"
"""Not recommended to use. Kept only for low level update operations in public API"""

span_id: str
parent_span_id: Optional[str]
Expand All @@ -85,7 +85,7 @@ class UpdateSpanMessage(BaseMessage):
output: Optional[Dict[str, Any]]
metadata: Optional[Dict[str, Any]]
tags: Optional[List[str]]
usage: Optional[UsageDict]
usage: Optional[Union[UsageDict, Dict[str, int]]]
model: Optional[str]
provider: Optional[str]
error_info: Optional[ErrorInfoDict]
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/src/opik/types.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import sys

from typing import Literal, Optional
Expand Down Expand Up @@ -77,3 +78,10 @@ class ErrorInfoDict(TypedDict):

traceback: str
"""Exception traceback"""


@dataclasses.dataclass
class LLMUsageInfo:
provider: Optional[str] = None
model: Optional[str] = None
usage: Optional[UsageDict] = None
17 changes: 17 additions & 0 deletions sdks/python/tests/library_integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,20 @@ def ensure_openai_configured():

if not ("OPENAI_API_KEY" in os.environ and "OPENAI_ORG_ID" in os.environ):
raise Exception("OpenAI not configured!")


@pytest.fixture
def gcp_e2e_test_credentials():
gcp_credentials_file_name = "gcp_credentials.json"

gcp_credentials = os.environ["GCP_E2E_TEST_CREDENTIALS"]

with open(gcp_credentials_file_name, mode="wt") as file:
file.write(gcp_credentials)

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_credentials_file_name

yield

del os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
os.remove(gcp_credentials_file_name)
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
langchain_community
langchain_google_vertexai
langchain_openai
Loading
Loading