From ced93bc809b6a452e78bced0761f201738cff917 Mon Sep 17 00:00:00 2001 From: sarath2496 Date: Thu, 31 Oct 2024 11:12:06 -0500 Subject: [PATCH 1/6] liteLLM bug --- agentops/llm_tracker.py | 647 +++++++++++++++++++++++++++++++++++ tests/test_llmintegration.py | 49 +++ 2 files changed, 696 insertions(+) create mode 100644 agentops/llm_tracker.py create mode 100644 tests/test_llmintegration.py diff --git a/agentops/llm_tracker.py b/agentops/llm_tracker.py new file mode 100644 index 00000000..2f8ea58f --- /dev/null +++ b/agentops/llm_tracker.py @@ -0,0 +1,647 @@ +import functools +import sys +from importlib import import_module +from importlib.metadata import version +from packaging.version import Version, parse +from .log_config import logger +from .event import LLMEvent, ActionEvent, ToolEvent, ErrorEvent +from .helpers import get_ISO_time, check_call_stack_for_agent_id +import inspect +from typing import Optional +import pprint + +original_create = None +original_create_async = None + + +class LlmTracker: + SUPPORTED_APIS = { + "litellm": {"1.3.1": ("openai_chat_completions.completion",)}, + "openai": { + "1.0.0": ("chat.completions.create",), + "0.0.0": ( + "ChatCompletion.create", + "ChatCompletion.acreate", + ), + }, + "cohere": { + "5.4.0": ("chat", "chat_stream"), + }, + } + + def __init__(self, client): + self.client = client + self.completion = "" + self.llm_event: Optional[LLMEvent] = None + self.active_providers = {} # Track active providers + self.original_methods = {} # Store original methods per provider + + def _handle_response_v0_openai(self, response, kwargs, init_timestamp): + """Handle responses for OpenAI versions v1.0.0""" + from openai import Stream, AsyncStream + from openai.types.chat import ChatCompletionChunk + from openai.resources import AsyncCompletions + + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + + def handle_stream_chunk(chunk: ChatCompletionChunk): + # NOTE: prompt/completion usage not returned in response when streaming + # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion + if self.llm_event.returns == None: + self.llm_event.returns = chunk + + try: + accumulated_delta = self.llm_event.returns.choices[0].delta + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.model = chunk.model + self.llm_event.prompt = kwargs["messages"] + choice = chunk.choices[ + 0 + ] # NOTE: We assume for completion only choices[0] is relevant + + if choice.delta.content: + accumulated_delta.content += choice.delta.content + + if choice.delta.role: + accumulated_delta.role = choice.delta.role + + if choice.delta.tool_calls: + accumulated_delta.tool_calls = choice.delta.tool_calls + + if choice.delta.function_call: + accumulated_delta.function_call = choice.delta.function_call + + if choice.finish_reason: + # Streaming is done. Record LLMEvent + self.llm_event.returns.choices[0].finish_reason = ( + choice.finish_reason + ) + self.llm_event.completion = { + "role": accumulated_delta.role, + "content": accumulated_delta.content, + "function_call": accumulated_delta.function_call, + "tool_calls": accumulated_delta.tool_calls, + } + self.llm_event.end_timestamp = get_ISO_time() + + self.client.record(self.llm_event) + except Exception as e: + self.client.record( + ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + kwargs_str = pprint.pformat(kwargs) + chunk = pprint.pformat(chunk) + logger.warning( + f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" + f"chunk:\n {chunk}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + # if the response is a generator, decorate the generator + if isinstance(response, Stream): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # For asynchronous AsyncStream + elif isinstance(response, AsyncStream): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # For async AsyncCompletion + elif isinstance(response, AsyncCompletions): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # v1.0.0+ responses are objects + try: + self.llm_event.returns = response.model_dump() + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = kwargs["messages"] + self.llm_event.prompt_tokens = response.usage.prompt_tokens + self.llm_event.completion = response.choices[0].message.model_dump() + self.llm_event.completion_tokens = response.usage.completion_tokens + self.llm_event.model = response.model + + self.client.record(self.llm_event) + except Exception as e: + self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e)) + kwargs_str = pprint.pformat(kwargs) + response = pprint.pformat(response) + logger.warning( + f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" + f"response:\n {response}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + return response + + def _handle_response_cohere(self, response, kwargs, init_timestamp): + """Handle responses for Cohere versions >v5.4.0""" + from cohere.types.non_streamed_chat_response import NonStreamedChatResponse + from cohere.types.streamed_chat_response import ( + StreamedChatResponse, + StreamedChatResponse_CitationGeneration, + StreamedChatResponse_SearchQueriesGeneration, + StreamedChatResponse_SearchResults, + StreamedChatResponse_StreamEnd, + StreamedChatResponse_StreamStart, + StreamedChatResponse_TextGeneration, + StreamedChatResponse_ToolCallsGeneration, + ) + + # from cohere.types.chat import ChatGenerationChunk + + # NOTE: Cohere only returns one message and its role will be CHATBOT which we are coercing to "assistant" + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + + self.action_events = {} + + def handle_stream_chunk(chunk): + + # We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion + if isinstance(chunk, StreamedChatResponse_StreamStart): + self.llm_event.returns = chunk + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.model = kwargs.get("model", "command-r-plus") + self.llm_event.prompt = kwargs["message"] + self.llm_event.completion = "" + return + + try: + if isinstance(chunk, StreamedChatResponse_StreamEnd): + # StreamedChatResponse_TextGeneration = LLMEvent + self.llm_event.completion = { + "role": "assistant", + "content": chunk.response.text, + } + self.llm_event.end_timestamp = get_ISO_time() + self.client.record(self.llm_event) + + # StreamedChatResponse_SearchResults = ActionEvent + search_results = chunk.response.search_results + for search_result in search_results: + query = search_result.search_query + if query.generation_id in self.action_events: + action_event = self.action_events[query.generation_id] + search_result_dict = search_result.dict() + del search_result_dict["search_query"] + action_event.returns = search_result_dict + action_event.end_timestamp = get_ISO_time() + + # StreamedChatResponse_CitationGeneration = ActionEvent + documents = {doc["id"]: doc for doc in chunk.response.documents} + citations = chunk.response.citations + for citation in citations: + citation_id = f"{citation.start}.{citation.end}" + if citation_id in self.action_events: + action_event = self.action_events[citation_id] + citation_dict = citation.dict() + # Replace document_ids with the actual documents + citation_dict["documents"] = [ + documents[doc_id] + for doc_id in citation_dict["document_ids"] + if doc_id in documents + ] + del citation_dict["document_ids"] + + action_event.returns = citation_dict + action_event.end_timestamp = get_ISO_time() + + for key, action_event in self.action_events.items(): + self.client.record(action_event) + + elif isinstance(chunk, StreamedChatResponse_TextGeneration): + self.llm_event.completion += chunk.text + elif isinstance(chunk, StreamedChatResponse_ToolCallsGeneration): + pass + elif isinstance(chunk, StreamedChatResponse_CitationGeneration): + for citation in chunk.citations: + self.action_events[f"{citation.start}.{citation.end}"] = ( + ActionEvent( + action_type="citation", + init_timestamp=get_ISO_time(), + params=citation.text, + ) + ) + elif isinstance(chunk, StreamedChatResponse_SearchQueriesGeneration): + for query in chunk.search_queries: + self.action_events[query.generation_id] = ActionEvent( + action_type="search_query", + init_timestamp=get_ISO_time(), + params=query.text, + ) + elif isinstance(chunk, StreamedChatResponse_SearchResults): + pass + + except Exception as e: + self.client.record( + ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + kwargs_str = pprint.pformat(kwargs) + chunk = pprint.pformat(chunk) + logger.warning( + f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" + f"chunk:\n {chunk}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + # NOTE: As of Cohere==5.x.x, async is not supported + # if the response is a generator, decorate the generator + if inspect.isasyncgen(response): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + elif inspect.isgenerator(response): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # TODO: we should record if they pass a chat.connectors, because it means they intended to call a tool + # Not enough to record StreamedChatResponse_ToolCallsGeneration because the tool may have not gotten called + + try: + self.llm_event.returns = response.dict() + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = [] + if response.chat_history: + role_map = {"USER": "user", "CHATBOT": "assistant", "SYSTEM": "system"} + + for i in range(len(response.chat_history) - 1): + message = response.chat_history[i] + self.llm_event.prompt.append( + { + "role": role_map.get(message.role, message.role), + "content": message.message, + } + ) + + last_message = response.chat_history[-1] + self.llm_event.completion = { + "role": role_map.get(last_message.role, last_message.role), + "content": last_message.message, + } + self.llm_event.prompt_tokens = response.meta.tokens.input_tokens + self.llm_event.completion_tokens = response.meta.tokens.output_tokens + self.llm_event.model = kwargs.get("model", "command-r-plus") + + self.client.record(self.llm_event) + except Exception as e: + self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e)) + kwargs_str = pprint.pformat(kwargs) + response = pprint.pformat(response) + logger.warning( + f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" + f"response:\n {response}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + return response + + def _store_original_method(self, provider: str, method_name: str, method): + """Store original method with provider context""" + if provider not in self.original_methods: + self.original_methods[provider] = {} + self.original_methods[provider][method_name] = method + + def _get_original_method(self, provider: str, method_name: str): + """Retrieve original method for provider""" + return self.original_methods.get(provider, {}).get(method_name) + + def override_openai_v1_completion(self): + from openai.resources.chat import completions + + # Store with provider context + self._store_original_method('openai', 'create', completions.Completions.create) + self.active_providers['openai'] = True + + def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + # Get provider-specific original method + original = self._get_original_method('openai', 'create') + result = original(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + completions.Completions.create = patched_function + + def override_litellm_completion(self): + import litellm + + # Store with provider context + self._store_original_method('litellm', 'completion', litellm.completion) + self.active_providers['litellm'] = True + + def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + # Get provider-specific original method + original = self._get_original_method('litellm', 'completion') + result = original(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + litellm.completion = patched_function + + def override_litellm_async_completion(self): + import litellm + + # Store with provider context + self._store_original_method('litellm', 'acompletion', litellm.acompletion) + self.active_providers['litellm'] = True + + async def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + # Get provider-specific original method + original = self._get_original_method('litellm', 'acompletion') + result = await original(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + litellm.acompletion = patched_function + + def override_cohere_chat(self): + import cohere + import cohere.types + + original_chat = cohere.Client.chat + + def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = original_chat(*args, **kwargs) + return self._handle_response_cohere(result, kwargs, init_timestamp) + + # Override the original method with the patched one + cohere.Client.chat = patched_function + + def override_cohere_chat_stream(self): + import cohere + + original_chat = cohere.Client.chat_stream + + def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = original_chat(*args, **kwargs) + return self._handle_response_cohere(result, kwargs, init_timestamp) + + # Override the original method with the patched one + cohere.Client.chat_stream = patched_function + + def _override_method(self, api, method_path, module): + def handle_response(result, kwargs, init_timestamp): + if api == "openai": + return self._handle_response_v0_openai(result, kwargs, init_timestamp) + return result + + def wrap_method(original_method): + if inspect.iscoroutinefunction(original_method): + + @functools.wraps(original_method) + async def async_method(*args, **kwargs): + init_timestamp = get_ISO_time() + response = await original_method(*args, **kwargs) + return handle_response(response, kwargs, init_timestamp) + + return async_method + + else: + + @functools.wraps(original_method) + def sync_method(*args, **kwargs): + init_timestamp = get_ISO_time() + response = original_method(*args, **kwargs) + return handle_response(response, kwargs, init_timestamp) + + return sync_method + + method_parts = method_path.split(".") + original_method = functools.reduce(getattr, method_parts, module) + new_method = wrap_method(original_method) + + if len(method_parts) == 1: + setattr(module, method_parts[0], new_method) + else: + parent = functools.reduce(getattr, method_parts[:-1], module) + setattr(parent, method_parts[-1], new_method) + + def override_api(self): + """ + Overrides key methods of the specified API to record events. + """ + + for api in self.SUPPORTED_APIS: + if api in sys.modules: + module = import_module(api) + if api == "litellm": + module_version = version(api) + if module_version is None: + logger.warning( + f"Cannot determine LiteLLM version. Only LiteLLM>=1.3.1 supported." + ) + + if Version(module_version) >= parse("1.3.1"): + self.override_litellm_completion() + self.override_litellm_async_completion() + else: + logger.warning( + f"Only LiteLLM>=1.3.1 supported. v{module_version} found." + ) + continue # Continue to check other APIs instead of returning + + if api == "openai": + # Patch openai v1.0.0+ methods + if hasattr(module, "__version__"): + module_version = parse(module.__version__) + if module_version >= parse("1.0.0"): + self.override_openai_v1_completion() + self.override_openai_v1_async_completion() + else: + # Patch openai =5.4.0 supported." + ) + + if Version(module_version) >= parse("5.4.0"): + self.override_cohere_chat() + self.override_cohere_chat_stream() + else: + logger.warning( + f"Only Cohere>=5.4.0 supported. v{module_version} found." + ) + + def stop_instrumenting(self): + """Restore original methods for all providers""" + if 'openai' in self.active_providers: + from openai.resources.chat import completions + original = self._get_original_method('openai', 'create') + if original: + completions.Completions.create = original + original_async = self._get_original_method('openai', 'create_async') + if original_async: + completions.AsyncCompletions.create = original_async + + if 'litellm' in self.active_providers: + import litellm + original = self._get_original_method('litellm', 'completion') + if original: + litellm.completion = original + original_async = self._get_original_method('litellm', 'acompletion') + if original_async: + litellm.acompletion = original_async + + self.active_providers.clear() + self.original_methods.clear() + + def undo_override_openai_v1_completion(self): + global original_create + from openai.resources.chat import completions + + completions.Completions.create = original_create + + def undo_override_openai_v1_async_completion(self): + global original_create_async + from openai.resources.chat import completions + + completions.AsyncCompletions.create = original_create_async + + def override_openai_v1_async_completion(self): + from openai.resources.chat import completions + + # Store with provider context + self._store_original_method('openai', 'create_async', completions.AsyncCompletions.create) + self.active_providers['openai'] = True + + async def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + # Get provider-specific original method + original = self._get_original_method('openai', 'create_async') + result = await original(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + completions.AsyncCompletions.create = patched_function diff --git a/tests/test_llmintegration.py b/tests/test_llmintegration.py new file mode 100644 index 00000000..271157c1 --- /dev/null +++ b/tests/test_llmintegration.py @@ -0,0 +1,49 @@ +import os +import sys +from dotenv import load_dotenv +import openai +import litellm +import logging + +# Add the project root to Python path to use local agentops +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, project_root) + +import agentops + +# Load environment variables +load_dotenv() + +# Enable debug logging +agentops.logger.setLevel(logging.DEBUG) + +def test_openai_only(): + print("\n=== Testing OpenAI with local AgentOps changes ===") + + # Initialize AgentOps + agentops.init(api_key=os.getenv("AGENTOPS_API_KEY")) + + # Set API keys + openai.api_key = os.getenv("OPENAI_API_KEY") + + try: + # Make OpenAI calls + for i in range(3): + print(f"\nOpenAI Call {i+1}:") + response = openai.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{ + "role": "user", + "content": f"Say 'OpenAI Test {i+1} successful when LiteLLM is imported'" + }] + ) + print(f"Response {i+1}:", response.choices[0].message.content) + + except Exception as e: + print("\nError occurred:", str(e)) + raise e + finally: + agentops.end_session("Success") + +if __name__ == "__main__": + test_openai_only() From 9ed49bf111661b00a2ad79abdee04c8d5ba03fdb Mon Sep 17 00:00:00 2001 From: sarath2496 Date: Thu, 31 Oct 2024 11:23:06 -0500 Subject: [PATCH 2/6] changes --- agentops/llm_tracker.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/agentops/llm_tracker.py b/agentops/llm_tracker.py index 2f8ea58f..cf961623 100644 --- a/agentops/llm_tracker.py +++ b/agentops/llm_tracker.py @@ -429,21 +429,7 @@ def _get_original_method(self, provider: str, method_name: str): """Retrieve original method for provider""" return self.original_methods.get(provider, {}).get(method_name) - def override_openai_v1_completion(self): - from openai.resources.chat import completions - - # Store with provider context - self._store_original_method('openai', 'create', completions.Completions.create) - self.active_providers['openai'] = True - - def patched_function(*args, **kwargs): - init_timestamp = get_ISO_time() - # Get provider-specific original method - original = self._get_original_method('openai', 'create') - result = original(*args, **kwargs) - return self._handle_response_v1_openai(result, kwargs, init_timestamp) - completions.Completions.create = patched_function def override_litellm_completion(self): import litellm From a9a4797e200c1238ffee70cead2d6878457b932a Mon Sep 17 00:00:00 2001 From: sarath2496 Date: Thu, 31 Oct 2024 11:32:51 -0500 Subject: [PATCH 3/6] resolving conflicts --- agentops/llm_tracker.py | 6 - agentops/llm_tracker.py.new | 633 ++++++++++++++++++++++++++++++++++++ 2 files changed, 633 insertions(+), 6 deletions(-) create mode 100644 agentops/llm_tracker.py.new diff --git a/agentops/llm_tracker.py b/agentops/llm_tracker.py index cf961623..3eb7a806 100644 --- a/agentops/llm_tracker.py +++ b/agentops/llm_tracker.py @@ -604,12 +604,6 @@ def stop_instrumenting(self): self.active_providers.clear() self.original_methods.clear() - def undo_override_openai_v1_completion(self): - global original_create - from openai.resources.chat import completions - - completions.Completions.create = original_create - def undo_override_openai_v1_async_completion(self): global original_create_async from openai.resources.chat import completions diff --git a/agentops/llm_tracker.py.new b/agentops/llm_tracker.py.new new file mode 100644 index 00000000..cf961623 --- /dev/null +++ b/agentops/llm_tracker.py.new @@ -0,0 +1,633 @@ +import functools +import sys +from importlib import import_module +from importlib.metadata import version +from packaging.version import Version, parse +from .log_config import logger +from .event import LLMEvent, ActionEvent, ToolEvent, ErrorEvent +from .helpers import get_ISO_time, check_call_stack_for_agent_id +import inspect +from typing import Optional +import pprint + +original_create = None +original_create_async = None + + +class LlmTracker: + SUPPORTED_APIS = { + "litellm": {"1.3.1": ("openai_chat_completions.completion",)}, + "openai": { + "1.0.0": ("chat.completions.create",), + "0.0.0": ( + "ChatCompletion.create", + "ChatCompletion.acreate", + ), + }, + "cohere": { + "5.4.0": ("chat", "chat_stream"), + }, + } + + def __init__(self, client): + self.client = client + self.completion = "" + self.llm_event: Optional[LLMEvent] = None + self.active_providers = {} # Track active providers + self.original_methods = {} # Store original methods per provider + + def _handle_response_v0_openai(self, response, kwargs, init_timestamp): + """Handle responses for OpenAI versions v1.0.0""" + from openai import Stream, AsyncStream + from openai.types.chat import ChatCompletionChunk + from openai.resources import AsyncCompletions + + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + + def handle_stream_chunk(chunk: ChatCompletionChunk): + # NOTE: prompt/completion usage not returned in response when streaming + # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion + if self.llm_event.returns == None: + self.llm_event.returns = chunk + + try: + accumulated_delta = self.llm_event.returns.choices[0].delta + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.model = chunk.model + self.llm_event.prompt = kwargs["messages"] + choice = chunk.choices[ + 0 + ] # NOTE: We assume for completion only choices[0] is relevant + + if choice.delta.content: + accumulated_delta.content += choice.delta.content + + if choice.delta.role: + accumulated_delta.role = choice.delta.role + + if choice.delta.tool_calls: + accumulated_delta.tool_calls = choice.delta.tool_calls + + if choice.delta.function_call: + accumulated_delta.function_call = choice.delta.function_call + + if choice.finish_reason: + # Streaming is done. Record LLMEvent + self.llm_event.returns.choices[0].finish_reason = ( + choice.finish_reason + ) + self.llm_event.completion = { + "role": accumulated_delta.role, + "content": accumulated_delta.content, + "function_call": accumulated_delta.function_call, + "tool_calls": accumulated_delta.tool_calls, + } + self.llm_event.end_timestamp = get_ISO_time() + + self.client.record(self.llm_event) + except Exception as e: + self.client.record( + ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + kwargs_str = pprint.pformat(kwargs) + chunk = pprint.pformat(chunk) + logger.warning( + f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" + f"chunk:\n {chunk}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + # if the response is a generator, decorate the generator + if isinstance(response, Stream): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # For asynchronous AsyncStream + elif isinstance(response, AsyncStream): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # For async AsyncCompletion + elif isinstance(response, AsyncCompletions): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # v1.0.0+ responses are objects + try: + self.llm_event.returns = response.model_dump() + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = kwargs["messages"] + self.llm_event.prompt_tokens = response.usage.prompt_tokens + self.llm_event.completion = response.choices[0].message.model_dump() + self.llm_event.completion_tokens = response.usage.completion_tokens + self.llm_event.model = response.model + + self.client.record(self.llm_event) + except Exception as e: + self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e)) + kwargs_str = pprint.pformat(kwargs) + response = pprint.pformat(response) + logger.warning( + f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" + f"response:\n {response}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + return response + + def _handle_response_cohere(self, response, kwargs, init_timestamp): + """Handle responses for Cohere versions >v5.4.0""" + from cohere.types.non_streamed_chat_response import NonStreamedChatResponse + from cohere.types.streamed_chat_response import ( + StreamedChatResponse, + StreamedChatResponse_CitationGeneration, + StreamedChatResponse_SearchQueriesGeneration, + StreamedChatResponse_SearchResults, + StreamedChatResponse_StreamEnd, + StreamedChatResponse_StreamStart, + StreamedChatResponse_TextGeneration, + StreamedChatResponse_ToolCallsGeneration, + ) + + # from cohere.types.chat import ChatGenerationChunk + + # NOTE: Cohere only returns one message and its role will be CHATBOT which we are coercing to "assistant" + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + + self.action_events = {} + + def handle_stream_chunk(chunk): + + # We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion + if isinstance(chunk, StreamedChatResponse_StreamStart): + self.llm_event.returns = chunk + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.model = kwargs.get("model", "command-r-plus") + self.llm_event.prompt = kwargs["message"] + self.llm_event.completion = "" + return + + try: + if isinstance(chunk, StreamedChatResponse_StreamEnd): + # StreamedChatResponse_TextGeneration = LLMEvent + self.llm_event.completion = { + "role": "assistant", + "content": chunk.response.text, + } + self.llm_event.end_timestamp = get_ISO_time() + self.client.record(self.llm_event) + + # StreamedChatResponse_SearchResults = ActionEvent + search_results = chunk.response.search_results + for search_result in search_results: + query = search_result.search_query + if query.generation_id in self.action_events: + action_event = self.action_events[query.generation_id] + search_result_dict = search_result.dict() + del search_result_dict["search_query"] + action_event.returns = search_result_dict + action_event.end_timestamp = get_ISO_time() + + # StreamedChatResponse_CitationGeneration = ActionEvent + documents = {doc["id"]: doc for doc in chunk.response.documents} + citations = chunk.response.citations + for citation in citations: + citation_id = f"{citation.start}.{citation.end}" + if citation_id in self.action_events: + action_event = self.action_events[citation_id] + citation_dict = citation.dict() + # Replace document_ids with the actual documents + citation_dict["documents"] = [ + documents[doc_id] + for doc_id in citation_dict["document_ids"] + if doc_id in documents + ] + del citation_dict["document_ids"] + + action_event.returns = citation_dict + action_event.end_timestamp = get_ISO_time() + + for key, action_event in self.action_events.items(): + self.client.record(action_event) + + elif isinstance(chunk, StreamedChatResponse_TextGeneration): + self.llm_event.completion += chunk.text + elif isinstance(chunk, StreamedChatResponse_ToolCallsGeneration): + pass + elif isinstance(chunk, StreamedChatResponse_CitationGeneration): + for citation in chunk.citations: + self.action_events[f"{citation.start}.{citation.end}"] = ( + ActionEvent( + action_type="citation", + init_timestamp=get_ISO_time(), + params=citation.text, + ) + ) + elif isinstance(chunk, StreamedChatResponse_SearchQueriesGeneration): + for query in chunk.search_queries: + self.action_events[query.generation_id] = ActionEvent( + action_type="search_query", + init_timestamp=get_ISO_time(), + params=query.text, + ) + elif isinstance(chunk, StreamedChatResponse_SearchResults): + pass + + except Exception as e: + self.client.record( + ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + kwargs_str = pprint.pformat(kwargs) + chunk = pprint.pformat(chunk) + logger.warning( + f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" + f"chunk:\n {chunk}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + # NOTE: As of Cohere==5.x.x, async is not supported + # if the response is a generator, decorate the generator + if inspect.isasyncgen(response): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + elif inspect.isgenerator(response): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # TODO: we should record if they pass a chat.connectors, because it means they intended to call a tool + # Not enough to record StreamedChatResponse_ToolCallsGeneration because the tool may have not gotten called + + try: + self.llm_event.returns = response.dict() + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = [] + if response.chat_history: + role_map = {"USER": "user", "CHATBOT": "assistant", "SYSTEM": "system"} + + for i in range(len(response.chat_history) - 1): + message = response.chat_history[i] + self.llm_event.prompt.append( + { + "role": role_map.get(message.role, message.role), + "content": message.message, + } + ) + + last_message = response.chat_history[-1] + self.llm_event.completion = { + "role": role_map.get(last_message.role, last_message.role), + "content": last_message.message, + } + self.llm_event.prompt_tokens = response.meta.tokens.input_tokens + self.llm_event.completion_tokens = response.meta.tokens.output_tokens + self.llm_event.model = kwargs.get("model", "command-r-plus") + + self.client.record(self.llm_event) + except Exception as e: + self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e)) + kwargs_str = pprint.pformat(kwargs) + response = pprint.pformat(response) + logger.warning( + f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" + f"response:\n {response}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + return response + + def _store_original_method(self, provider: str, method_name: str, method): + """Store original method with provider context""" + if provider not in self.original_methods: + self.original_methods[provider] = {} + self.original_methods[provider][method_name] = method + + def _get_original_method(self, provider: str, method_name: str): + """Retrieve original method for provider""" + return self.original_methods.get(provider, {}).get(method_name) + + + + def override_litellm_completion(self): + import litellm + + # Store with provider context + self._store_original_method('litellm', 'completion', litellm.completion) + self.active_providers['litellm'] = True + + def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + # Get provider-specific original method + original = self._get_original_method('litellm', 'completion') + result = original(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + litellm.completion = patched_function + + def override_litellm_async_completion(self): + import litellm + + # Store with provider context + self._store_original_method('litellm', 'acompletion', litellm.acompletion) + self.active_providers['litellm'] = True + + async def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + # Get provider-specific original method + original = self._get_original_method('litellm', 'acompletion') + result = await original(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + litellm.acompletion = patched_function + + def override_cohere_chat(self): + import cohere + import cohere.types + + original_chat = cohere.Client.chat + + def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = original_chat(*args, **kwargs) + return self._handle_response_cohere(result, kwargs, init_timestamp) + + # Override the original method with the patched one + cohere.Client.chat = patched_function + + def override_cohere_chat_stream(self): + import cohere + + original_chat = cohere.Client.chat_stream + + def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = original_chat(*args, **kwargs) + return self._handle_response_cohere(result, kwargs, init_timestamp) + + # Override the original method with the patched one + cohere.Client.chat_stream = patched_function + + def _override_method(self, api, method_path, module): + def handle_response(result, kwargs, init_timestamp): + if api == "openai": + return self._handle_response_v0_openai(result, kwargs, init_timestamp) + return result + + def wrap_method(original_method): + if inspect.iscoroutinefunction(original_method): + + @functools.wraps(original_method) + async def async_method(*args, **kwargs): + init_timestamp = get_ISO_time() + response = await original_method(*args, **kwargs) + return handle_response(response, kwargs, init_timestamp) + + return async_method + + else: + + @functools.wraps(original_method) + def sync_method(*args, **kwargs): + init_timestamp = get_ISO_time() + response = original_method(*args, **kwargs) + return handle_response(response, kwargs, init_timestamp) + + return sync_method + + method_parts = method_path.split(".") + original_method = functools.reduce(getattr, method_parts, module) + new_method = wrap_method(original_method) + + if len(method_parts) == 1: + setattr(module, method_parts[0], new_method) + else: + parent = functools.reduce(getattr, method_parts[:-1], module) + setattr(parent, method_parts[-1], new_method) + + def override_api(self): + """ + Overrides key methods of the specified API to record events. + """ + + for api in self.SUPPORTED_APIS: + if api in sys.modules: + module = import_module(api) + if api == "litellm": + module_version = version(api) + if module_version is None: + logger.warning( + f"Cannot determine LiteLLM version. Only LiteLLM>=1.3.1 supported." + ) + + if Version(module_version) >= parse("1.3.1"): + self.override_litellm_completion() + self.override_litellm_async_completion() + else: + logger.warning( + f"Only LiteLLM>=1.3.1 supported. v{module_version} found." + ) + continue # Continue to check other APIs instead of returning + + if api == "openai": + # Patch openai v1.0.0+ methods + if hasattr(module, "__version__"): + module_version = parse(module.__version__) + if module_version >= parse("1.0.0"): + self.override_openai_v1_completion() + self.override_openai_v1_async_completion() + else: + # Patch openai =5.4.0 supported." + ) + + if Version(module_version) >= parse("5.4.0"): + self.override_cohere_chat() + self.override_cohere_chat_stream() + else: + logger.warning( + f"Only Cohere>=5.4.0 supported. v{module_version} found." + ) + + def stop_instrumenting(self): + """Restore original methods for all providers""" + if 'openai' in self.active_providers: + from openai.resources.chat import completions + original = self._get_original_method('openai', 'create') + if original: + completions.Completions.create = original + original_async = self._get_original_method('openai', 'create_async') + if original_async: + completions.AsyncCompletions.create = original_async + + if 'litellm' in self.active_providers: + import litellm + original = self._get_original_method('litellm', 'completion') + if original: + litellm.completion = original + original_async = self._get_original_method('litellm', 'acompletion') + if original_async: + litellm.acompletion = original_async + + self.active_providers.clear() + self.original_methods.clear() + + def undo_override_openai_v1_completion(self): + global original_create + from openai.resources.chat import completions + + completions.Completions.create = original_create + + def undo_override_openai_v1_async_completion(self): + global original_create_async + from openai.resources.chat import completions + + completions.AsyncCompletions.create = original_create_async + + def override_openai_v1_async_completion(self): + from openai.resources.chat import completions + + # Store with provider context + self._store_original_method('openai', 'create_async', completions.AsyncCompletions.create) + self.active_providers['openai'] = True + + async def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + # Get provider-specific original method + original = self._get_original_method('openai', 'create_async') + result = await original(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + completions.AsyncCompletions.create = patched_function From aa4e514fe19e2c1e6a0b4d67bc1d766f68bbcf6f Mon Sep 17 00:00:00 2001 From: sarath2496 Date: Thu, 31 Oct 2024 11:33:30 -0500 Subject: [PATCH 4/6] resolving conflicts --- agentops/llm_tracker.py.new | 633 ------------------------------------ 1 file changed, 633 deletions(-) delete mode 100644 agentops/llm_tracker.py.new diff --git a/agentops/llm_tracker.py.new b/agentops/llm_tracker.py.new deleted file mode 100644 index cf961623..00000000 --- a/agentops/llm_tracker.py.new +++ /dev/null @@ -1,633 +0,0 @@ -import functools -import sys -from importlib import import_module -from importlib.metadata import version -from packaging.version import Version, parse -from .log_config import logger -from .event import LLMEvent, ActionEvent, ToolEvent, ErrorEvent -from .helpers import get_ISO_time, check_call_stack_for_agent_id -import inspect -from typing import Optional -import pprint - -original_create = None -original_create_async = None - - -class LlmTracker: - SUPPORTED_APIS = { - "litellm": {"1.3.1": ("openai_chat_completions.completion",)}, - "openai": { - "1.0.0": ("chat.completions.create",), - "0.0.0": ( - "ChatCompletion.create", - "ChatCompletion.acreate", - ), - }, - "cohere": { - "5.4.0": ("chat", "chat_stream"), - }, - } - - def __init__(self, client): - self.client = client - self.completion = "" - self.llm_event: Optional[LLMEvent] = None - self.active_providers = {} # Track active providers - self.original_methods = {} # Store original methods per provider - - def _handle_response_v0_openai(self, response, kwargs, init_timestamp): - """Handle responses for OpenAI versions v1.0.0""" - from openai import Stream, AsyncStream - from openai.types.chat import ChatCompletionChunk - from openai.resources import AsyncCompletions - - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) - - def handle_stream_chunk(chunk: ChatCompletionChunk): - # NOTE: prompt/completion usage not returned in response when streaming - # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion - if self.llm_event.returns == None: - self.llm_event.returns = chunk - - try: - accumulated_delta = self.llm_event.returns.choices[0].delta - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = chunk.model - self.llm_event.prompt = kwargs["messages"] - choice = chunk.choices[ - 0 - ] # NOTE: We assume for completion only choices[0] is relevant - - if choice.delta.content: - accumulated_delta.content += choice.delta.content - - if choice.delta.role: - accumulated_delta.role = choice.delta.role - - if choice.delta.tool_calls: - accumulated_delta.tool_calls = choice.delta.tool_calls - - if choice.delta.function_call: - accumulated_delta.function_call = choice.delta.function_call - - if choice.finish_reason: - # Streaming is done. Record LLMEvent - self.llm_event.returns.choices[0].finish_reason = ( - choice.finish_reason - ) - self.llm_event.completion = { - "role": accumulated_delta.role, - "content": accumulated_delta.content, - "function_call": accumulated_delta.function_call, - "tool_calls": accumulated_delta.tool_calls, - } - self.llm_event.end_timestamp = get_ISO_time() - - self.client.record(self.llm_event) - except Exception as e: - self.client.record( - ErrorEvent(trigger_event=self.llm_event, exception=e) - ) - kwargs_str = pprint.pformat(kwargs) - chunk = pprint.pformat(chunk) - logger.warning( - f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" - f"chunk:\n {chunk}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - # if the response is a generator, decorate the generator - if isinstance(response, Stream): - - def generator(): - for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return generator() - - # For asynchronous AsyncStream - elif isinstance(response, AsyncStream): - - async def async_generator(): - async for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return async_generator() - - # For async AsyncCompletion - elif isinstance(response, AsyncCompletions): - - async def async_generator(): - async for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return async_generator() - - # v1.0.0+ responses are objects - try: - self.llm_event.returns = response.model_dump() - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = kwargs["messages"] - self.llm_event.prompt_tokens = response.usage.prompt_tokens - self.llm_event.completion = response.choices[0].message.model_dump() - self.llm_event.completion_tokens = response.usage.completion_tokens - self.llm_event.model = response.model - - self.client.record(self.llm_event) - except Exception as e: - self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e)) - kwargs_str = pprint.pformat(kwargs) - response = pprint.pformat(response) - logger.warning( - f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" - f"response:\n {response}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - return response - - def _handle_response_cohere(self, response, kwargs, init_timestamp): - """Handle responses for Cohere versions >v5.4.0""" - from cohere.types.non_streamed_chat_response import NonStreamedChatResponse - from cohere.types.streamed_chat_response import ( - StreamedChatResponse, - StreamedChatResponse_CitationGeneration, - StreamedChatResponse_SearchQueriesGeneration, - StreamedChatResponse_SearchResults, - StreamedChatResponse_StreamEnd, - StreamedChatResponse_StreamStart, - StreamedChatResponse_TextGeneration, - StreamedChatResponse_ToolCallsGeneration, - ) - - # from cohere.types.chat import ChatGenerationChunk - - # NOTE: Cohere only returns one message and its role will be CHATBOT which we are coercing to "assistant" - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) - - self.action_events = {} - - def handle_stream_chunk(chunk): - - # We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion - if isinstance(chunk, StreamedChatResponse_StreamStart): - self.llm_event.returns = chunk - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = kwargs.get("model", "command-r-plus") - self.llm_event.prompt = kwargs["message"] - self.llm_event.completion = "" - return - - try: - if isinstance(chunk, StreamedChatResponse_StreamEnd): - # StreamedChatResponse_TextGeneration = LLMEvent - self.llm_event.completion = { - "role": "assistant", - "content": chunk.response.text, - } - self.llm_event.end_timestamp = get_ISO_time() - self.client.record(self.llm_event) - - # StreamedChatResponse_SearchResults = ActionEvent - search_results = chunk.response.search_results - for search_result in search_results: - query = search_result.search_query - if query.generation_id in self.action_events: - action_event = self.action_events[query.generation_id] - search_result_dict = search_result.dict() - del search_result_dict["search_query"] - action_event.returns = search_result_dict - action_event.end_timestamp = get_ISO_time() - - # StreamedChatResponse_CitationGeneration = ActionEvent - documents = {doc["id"]: doc for doc in chunk.response.documents} - citations = chunk.response.citations - for citation in citations: - citation_id = f"{citation.start}.{citation.end}" - if citation_id in self.action_events: - action_event = self.action_events[citation_id] - citation_dict = citation.dict() - # Replace document_ids with the actual documents - citation_dict["documents"] = [ - documents[doc_id] - for doc_id in citation_dict["document_ids"] - if doc_id in documents - ] - del citation_dict["document_ids"] - - action_event.returns = citation_dict - action_event.end_timestamp = get_ISO_time() - - for key, action_event in self.action_events.items(): - self.client.record(action_event) - - elif isinstance(chunk, StreamedChatResponse_TextGeneration): - self.llm_event.completion += chunk.text - elif isinstance(chunk, StreamedChatResponse_ToolCallsGeneration): - pass - elif isinstance(chunk, StreamedChatResponse_CitationGeneration): - for citation in chunk.citations: - self.action_events[f"{citation.start}.{citation.end}"] = ( - ActionEvent( - action_type="citation", - init_timestamp=get_ISO_time(), - params=citation.text, - ) - ) - elif isinstance(chunk, StreamedChatResponse_SearchQueriesGeneration): - for query in chunk.search_queries: - self.action_events[query.generation_id] = ActionEvent( - action_type="search_query", - init_timestamp=get_ISO_time(), - params=query.text, - ) - elif isinstance(chunk, StreamedChatResponse_SearchResults): - pass - - except Exception as e: - self.client.record( - ErrorEvent(trigger_event=self.llm_event, exception=e) - ) - kwargs_str = pprint.pformat(kwargs) - chunk = pprint.pformat(chunk) - logger.warning( - f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" - f"chunk:\n {chunk}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - # NOTE: As of Cohere==5.x.x, async is not supported - # if the response is a generator, decorate the generator - if inspect.isasyncgen(response): - - async def async_generator(): - async for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return async_generator() - - elif inspect.isgenerator(response): - - def generator(): - for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return generator() - - # TODO: we should record if they pass a chat.connectors, because it means they intended to call a tool - # Not enough to record StreamedChatResponse_ToolCallsGeneration because the tool may have not gotten called - - try: - self.llm_event.returns = response.dict() - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = [] - if response.chat_history: - role_map = {"USER": "user", "CHATBOT": "assistant", "SYSTEM": "system"} - - for i in range(len(response.chat_history) - 1): - message = response.chat_history[i] - self.llm_event.prompt.append( - { - "role": role_map.get(message.role, message.role), - "content": message.message, - } - ) - - last_message = response.chat_history[-1] - self.llm_event.completion = { - "role": role_map.get(last_message.role, last_message.role), - "content": last_message.message, - } - self.llm_event.prompt_tokens = response.meta.tokens.input_tokens - self.llm_event.completion_tokens = response.meta.tokens.output_tokens - self.llm_event.model = kwargs.get("model", "command-r-plus") - - self.client.record(self.llm_event) - except Exception as e: - self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e)) - kwargs_str = pprint.pformat(kwargs) - response = pprint.pformat(response) - logger.warning( - f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" - f"response:\n {response}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - return response - - def _store_original_method(self, provider: str, method_name: str, method): - """Store original method with provider context""" - if provider not in self.original_methods: - self.original_methods[provider] = {} - self.original_methods[provider][method_name] = method - - def _get_original_method(self, provider: str, method_name: str): - """Retrieve original method for provider""" - return self.original_methods.get(provider, {}).get(method_name) - - - - def override_litellm_completion(self): - import litellm - - # Store with provider context - self._store_original_method('litellm', 'completion', litellm.completion) - self.active_providers['litellm'] = True - - def patched_function(*args, **kwargs): - init_timestamp = get_ISO_time() - # Get provider-specific original method - original = self._get_original_method('litellm', 'completion') - result = original(*args, **kwargs) - return self._handle_response_v1_openai(result, kwargs, init_timestamp) - - litellm.completion = patched_function - - def override_litellm_async_completion(self): - import litellm - - # Store with provider context - self._store_original_method('litellm', 'acompletion', litellm.acompletion) - self.active_providers['litellm'] = True - - async def patched_function(*args, **kwargs): - init_timestamp = get_ISO_time() - # Get provider-specific original method - original = self._get_original_method('litellm', 'acompletion') - result = await original(*args, **kwargs) - return self._handle_response_v1_openai(result, kwargs, init_timestamp) - - litellm.acompletion = patched_function - - def override_cohere_chat(self): - import cohere - import cohere.types - - original_chat = cohere.Client.chat - - def patched_function(*args, **kwargs): - # Call the original function with its original arguments - init_timestamp = get_ISO_time() - result = original_chat(*args, **kwargs) - return self._handle_response_cohere(result, kwargs, init_timestamp) - - # Override the original method with the patched one - cohere.Client.chat = patched_function - - def override_cohere_chat_stream(self): - import cohere - - original_chat = cohere.Client.chat_stream - - def patched_function(*args, **kwargs): - # Call the original function with its original arguments - init_timestamp = get_ISO_time() - result = original_chat(*args, **kwargs) - return self._handle_response_cohere(result, kwargs, init_timestamp) - - # Override the original method with the patched one - cohere.Client.chat_stream = patched_function - - def _override_method(self, api, method_path, module): - def handle_response(result, kwargs, init_timestamp): - if api == "openai": - return self._handle_response_v0_openai(result, kwargs, init_timestamp) - return result - - def wrap_method(original_method): - if inspect.iscoroutinefunction(original_method): - - @functools.wraps(original_method) - async def async_method(*args, **kwargs): - init_timestamp = get_ISO_time() - response = await original_method(*args, **kwargs) - return handle_response(response, kwargs, init_timestamp) - - return async_method - - else: - - @functools.wraps(original_method) - def sync_method(*args, **kwargs): - init_timestamp = get_ISO_time() - response = original_method(*args, **kwargs) - return handle_response(response, kwargs, init_timestamp) - - return sync_method - - method_parts = method_path.split(".") - original_method = functools.reduce(getattr, method_parts, module) - new_method = wrap_method(original_method) - - if len(method_parts) == 1: - setattr(module, method_parts[0], new_method) - else: - parent = functools.reduce(getattr, method_parts[:-1], module) - setattr(parent, method_parts[-1], new_method) - - def override_api(self): - """ - Overrides key methods of the specified API to record events. - """ - - for api in self.SUPPORTED_APIS: - if api in sys.modules: - module = import_module(api) - if api == "litellm": - module_version = version(api) - if module_version is None: - logger.warning( - f"Cannot determine LiteLLM version. Only LiteLLM>=1.3.1 supported." - ) - - if Version(module_version) >= parse("1.3.1"): - self.override_litellm_completion() - self.override_litellm_async_completion() - else: - logger.warning( - f"Only LiteLLM>=1.3.1 supported. v{module_version} found." - ) - continue # Continue to check other APIs instead of returning - - if api == "openai": - # Patch openai v1.0.0+ methods - if hasattr(module, "__version__"): - module_version = parse(module.__version__) - if module_version >= parse("1.0.0"): - self.override_openai_v1_completion() - self.override_openai_v1_async_completion() - else: - # Patch openai =5.4.0 supported." - ) - - if Version(module_version) >= parse("5.4.0"): - self.override_cohere_chat() - self.override_cohere_chat_stream() - else: - logger.warning( - f"Only Cohere>=5.4.0 supported. v{module_version} found." - ) - - def stop_instrumenting(self): - """Restore original methods for all providers""" - if 'openai' in self.active_providers: - from openai.resources.chat import completions - original = self._get_original_method('openai', 'create') - if original: - completions.Completions.create = original - original_async = self._get_original_method('openai', 'create_async') - if original_async: - completions.AsyncCompletions.create = original_async - - if 'litellm' in self.active_providers: - import litellm - original = self._get_original_method('litellm', 'completion') - if original: - litellm.completion = original - original_async = self._get_original_method('litellm', 'acompletion') - if original_async: - litellm.acompletion = original_async - - self.active_providers.clear() - self.original_methods.clear() - - def undo_override_openai_v1_completion(self): - global original_create - from openai.resources.chat import completions - - completions.Completions.create = original_create - - def undo_override_openai_v1_async_completion(self): - global original_create_async - from openai.resources.chat import completions - - completions.AsyncCompletions.create = original_create_async - - def override_openai_v1_async_completion(self): - from openai.resources.chat import completions - - # Store with provider context - self._store_original_method('openai', 'create_async', completions.AsyncCompletions.create) - self.active_providers['openai'] = True - - async def patched_function(*args, **kwargs): - init_timestamp = get_ISO_time() - # Get provider-specific original method - original = self._get_original_method('openai', 'create_async') - result = await original(*args, **kwargs) - return self._handle_response_v1_openai(result, kwargs, init_timestamp) - - completions.AsyncCompletions.create = patched_function From 7069d15a6bbfb1d7e247b2b9bdc12b3b6eb6a093 Mon Sep 17 00:00:00 2001 From: sarath2496 Date: Thu, 31 Oct 2024 11:36:15 -0500 Subject: [PATCH 5/6] changes --- agentops/llm_tracker.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/agentops/llm_tracker.py b/agentops/llm_tracker.py index 3eb7a806..257c5c90 100644 --- a/agentops/llm_tracker.py +++ b/agentops/llm_tracker.py @@ -418,6 +418,22 @@ def generator(): ) return response + + def override_openai_v1_completion(self): + from openai.resources.chat import completions + + # Store the original method + global original_create + original_create = completions.Completions.create + + def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + # Call the original function with its original arguments + result = original_create(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + # Override the original method with the patched one + completions.Completions.create = patched_function def _store_original_method(self, provider: str, method_name: str, method): """Store original method with provider context""" @@ -604,6 +620,12 @@ def stop_instrumenting(self): self.active_providers.clear() self.original_methods.clear() + def undo_override_openai_v1_completion(self): + global original_create + from openai.resources.chat import completions + + completions.Completions.create = original_create + def undo_override_openai_v1_async_completion(self): global original_create_async from openai.resources.chat import completions From dec928ed5b1f832d70867319d0d3504235e4a7df Mon Sep 17 00:00:00 2001 From: Teo Date: Wed, 6 Nov 2024 17:56:15 -0600 Subject: [PATCH 6/6] test: add VCR cassettes, change tets dynamic refactor(test_llmintegration.py): restructure tests to use fixtures for AgentOps and clients, enhancing maintainability and readability --- .../test_openai_litellm_tango.yaml | 167 ++++++++++++++++++ tests/test_llmintegration.py | 147 ++++++++++----- 2 files changed, 273 insertions(+), 41 deletions(-) create mode 100644 tests/fixtures/vcr_cassettes/test_llmintegration/test_openai_litellm_tango.yaml diff --git a/tests/fixtures/vcr_cassettes/test_llmintegration/test_openai_litellm_tango.yaml b/tests/fixtures/vcr_cassettes/test_llmintegration/test_openai_litellm_tango.yaml new file mode 100644 index 00000000..a63a10e1 --- /dev/null +++ b/tests/fixtures/vcr_cassettes/test_llmintegration/test_openai_litellm_tango.yaml @@ -0,0 +1,167 @@ +interactions: +- request: + body: '{"messages": [{"role": "user", "content": [{"type": "text", "text": "Write + a 3 word sentence."}]}], "temperature": 0, "max_tokens": 4096, "model": "claude-3-sonnet-20240229"}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + anthropic-version: + - '2023-06-01' + connection: + - keep-alive + content-length: + - '174' + content-type: + - application/json + host: + - api.anthropic.com + user-agent: + - REDACTED + x-api-key: + - REDACTED + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: '{"id":"msg_01XWAEFbHpPf9r4bwn1k5rP9","type":"message","role":"assistant","model":"claude-3-sonnet-20240229","content":[{"type":"text","text":"Brevity + wins today."}],"stop_reason":"end_turn","stop_sequence":null,"usage":{"input_tokens":15,"output_tokens":9}}' + headers: + CF-Cache-Status: + - DYNAMIC + CF-RAY: + - 8de91c545f396b14-DFW + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Thu, 07 Nov 2024 00:12:15 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-ratelimit-requests-limit: + - '4000' + anthropic-ratelimit-requests-remaining: + - '3999' + anthropic-ratelimit-requests-reset: + - '2024-11-07T00:12:33Z' + anthropic-ratelimit-tokens-limit: + - '400000' + anthropic-ratelimit-tokens-remaining: + - '400000' + anthropic-ratelimit-tokens-reset: + - '2024-11-07T00:12:15Z' + content-length: + - '257' + request-id: + - REDACTED + via: + - 1.1 google + status: + code: 200 + message: OK +- request: + body: '{"messages": [{"role": "user", "content": "Write a 3 word sentence."}], + "model": "gpt-4", "temperature": 0}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + authorization: + - REDACTED + connection: + - keep-alive + content-length: + - '107' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - REDACTED + x-stainless-arch: + - x64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.54.3 + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.10 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-AQkQCgTJp2n4t5XB1AtMTzoLkOgPl\",\n \"object\": + \"chat.completion\",\n \"created\": 1730938336,\n \"model\": \"gpt-4-0613\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Cats are cute.\",\n \"refusal\": + null\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n + \ }\n ],\n \"usage\": {\n \"prompt_tokens\": 14,\n \"completion_tokens\": + 5,\n \"total_tokens\": 19,\n \"prompt_tokens_details\": {\n \"cached_tokens\": + 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"system_fingerprint\": + null\n}\n" + headers: + CF-Cache-Status: + - DYNAMIC + CF-RAY: + - REDACTED + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Thu, 07 Nov 2024 00:12:16 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + content-length: + - '741' + openai-organization: + - REDACTED + openai-processing-ms: + - '507' + openai-version: + - '2020-10-01' + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-ratelimit-limit-requests: + - '10000' + x-ratelimit-limit-tokens: + - '1000000' + x-ratelimit-remaining-requests: + - '9999' + x-ratelimit-remaining-tokens: + - '999977' + x-ratelimit-reset-requests: + - 6ms + x-ratelimit-reset-tokens: + - 1ms + x-request-id: + - REDACTED + status: + code: 200 + message: OK +version: 1 diff --git a/tests/test_llmintegration.py b/tests/test_llmintegration.py index 271157c1..c37da1d8 100644 --- a/tests/test_llmintegration.py +++ b/tests/test_llmintegration.py @@ -1,49 +1,114 @@ -import os -import sys -from dotenv import load_dotenv -import openai -import litellm import logging +import os +from pathlib import Path +from typing import TYPE_CHECKING -# Add the project root to Python path to use local agentops -project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -sys.path.insert(0, project_root) +import litellm # litellm, openai need to be imported before agentops.init +import openai +import pytest import agentops -# Load environment variables -load_dotenv() +os.environ["VCR_TURN_OFF"] = "true" + + +if TYPE_CHECKING: + from pytest_mock import MockerFixture + +try: + import pytest_vcr # noqa: F401 +except ImportError: + raise RuntimeError("Please install pytest-vcr to run this test") -# Enable debug logging agentops.logger.setLevel(logging.DEBUG) -def test_openai_only(): - print("\n=== Testing OpenAI with local AgentOps changes ===") - - # Initialize AgentOps - agentops.init(api_key=os.getenv("AGENTOPS_API_KEY")) - - # Set API keys - openai.api_key = os.getenv("OPENAI_API_KEY") - - try: - # Make OpenAI calls - for i in range(3): - print(f"\nOpenAI Call {i+1}:") - response = openai.chat.completions.create( - model="gpt-3.5-turbo", - messages=[{ - "role": "user", - "content": f"Say 'OpenAI Test {i+1} successful when LiteLLM is imported'" - }] - ) - print(f"Response {i+1}:", response.choices[0].message.content) - - except Exception as e: - print("\nError occurred:", str(e)) - raise e - finally: - agentops.end_session("Success") - -if __name__ == "__main__": - test_openai_only() + +@pytest.fixture(scope="session", autouse=True) +def agentops_init(): + agentops.init(api_key=os.getenv("AGENTOPS_API_KEY", "")) + + +@pytest.fixture +def agentops_client(): + return agentops.Client() + + +@pytest.fixture(scope="session") +def litellm_client(): + return litellm + + +@pytest.fixture(scope="session") +def openai_client(): + return openai.Client() + + +@pytest.fixture(scope="module") +def vcr(vcr): + return vcr + + +@pytest.fixture(scope="module") +def vcr_config(): + vcr_cassettes = Path(__file__).parent / "fixtures" / "vcr_cassettes" / __name__ + vcr_cassettes.mkdir(parents=True, exist_ok=True) + return { + "serializer": "yaml", + "cassette_library_dir": str(vcr_cassettes), + # Enhanced header filtering using tuples for replacement values + "filter_headers": [ + # Auth headers with REDACTED values + ("authorization", "REDACTED"), + ("api-key", "REDACTED"), + ("x-api-key", "REDACTED"), + ("X-Agentops-Api-Key", "REDACTED"), + ("openai-organization", "REDACTED"), + # User identifiers + ("user-agent", "REDACTED"), + # Cookie related - remove completely + ("cookie", "REDACTED"), # Request cookies + ("set-cookie", "REDACTED"), # Response cookies (both cases) + ("Set-Cookie", "REDACTED"), + ("Cookie", "REDACTED"), + # IP addresses and other sensitive headers + ("x-forwarded-for", "REDACTED"), + ("x-real-ip", "REDACTED"), + ("request-id", "REDACTED"), + ], + # Other settings remain the same + "match_on": ["uri", "method", "body", "query"], + "decode_compressed_response": True, + "record_on_exception": False, + "record_mode": "once", + "ignore_hosts": ["pypi.org", "files.pythonhosted.org"], + } + + +@pytest.fixture +def llm_event_spy(agentops_client, mocker: "MockerFixture"): + """Fixture that provides spies on both providers' response handling""" + from agentops.llms.litellm import LiteLLMProvider + from agentops.llms.openai import OpenAiProvider + + return { + "litellm": mocker.spy(LiteLLMProvider(agentops_client), "handle_response"), + "openai": mocker.spy(OpenAiProvider(agentops_client), "handle_response"), + } + + +@pytest.mark.vcr +def test_openai_litellm_tango(llm_event_spy, openai_client, litellm_client): + """Test that LiteLLM integration does not break OpenAI from sending events""" + message = [{"role": "user", "content": "Write a 3 word sentence."}] + + litellm_client.completion( + model="claude-3-sonnet-20240229", messages=message, temperature=0 + ) + + assert llm_event_spy["litellm"].call_count == 1 + + openai_client.chat.completions.create( + model="gpt-4", messages=message, temperature=0 + ) + + assert llm_event_spy["openai"].call_count == 1