diff --git a/agentops/client.py b/agentops/client.py index 86fe49b8..0f99a654 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -5,8 +5,8 @@ Client: Provides methods to interact with the AgentOps service. """ -import inspect import atexit +import inspect import logging import os import signal @@ -14,20 +14,20 @@ import threading import traceback from decimal import Decimal +from functools import cached_property +from typing import List, Optional, Tuple, Union from uuid import UUID, uuid4 -from typing import Optional, List, Union, Tuple + from termcolor import colored -from .event import Event, ErrorEvent -from .singleton import ( - conditional_singleton, -) -from .session import Session, active_sessions +from .config import Configuration +from .event import ErrorEvent, Event from .host_env import get_host_env +from .llms import LlmTracker from .log_config import logger from .meta_client import MetaClient -from .config import Configuration -from .llms import LlmTracker +from .session import Session, active_sessions +from .singleton import conditional_singleton @conditional_singleton @@ -39,6 +39,7 @@ def __init__(self): self._sessions: List[Session] = active_sessions self._config = Configuration() self._pre_init_queue = {"agents": []} + self._host_env = None # Cache host env data self.configure( api_key=os.environ.get("AGENTOPS_API_KEY"), @@ -111,6 +112,7 @@ def initialize(self) -> Union[Session, None]: def _initialize_autogen_logger(self) -> None: try: import autogen + from .partners.autogen_logger import AutogenLogger autogen.runtime_logging.start(logger=AutogenLogger()) @@ -224,7 +226,7 @@ def start_session( session = Session( session_id=session_id, tags=list(session_tags), - host_env=get_host_env(self._config.env_data_opt_out), + host_env=self.host_env, config=self._config, ) @@ -276,16 +278,23 @@ def create_agent( ): if agent_id is None: agent_id = str(uuid4()) + print(f"Generated new agent ID: {agent_id}") + + print(f"Creating agent '{name}' with ID {agent_id}") # if a session is passed in, use multi-session logic if session: + print("Using provided session for agent creation") return session.create_agent(name=name, agent_id=agent_id) else: # if no session passed, assume single session + print("No session provided - using default session") session = self._safe_get_session() if session is None: + print("No active session found - queueing agent for creation") self._pre_init_queue["agents"].append({"name": name, "agent_id": agent_id}) else: + print("Creating agent in active session") session.create_agent(name=name, agent_id=agent_id) return agent_id @@ -430,3 +439,8 @@ def api_key(self): @property def parent_key(self): return self._config.parent_key + + @cached_property + def host_env(self): + """Cache and reuse host environment data""" + return get_host_env(self._config.env_data_opt_out) diff --git a/agentops/event.py b/agentops/event.py index 70ec059c..c6200aca 100644 --- a/agentops/event.py +++ b/agentops/event.py @@ -82,6 +82,7 @@ class LLMEvent(Event): prompt_tokens: Optional[int] = None completion: Union[str, object] = None completion_tokens: Optional[int] = None + cost: Optional[float] = None model: Optional[str] = None diff --git a/agentops/http_client.py b/agentops/http_client.py index caa18b27..11c0bf49 100644 --- a/agentops/http_client.py +++ b/agentops/http_client.py @@ -1,7 +1,9 @@ from enum import Enum -from typing import Optional -from requests.adapters import Retry, HTTPAdapter +from typing import Optional, Dict, Any + import requests +from requests.adapters import HTTPAdapter, Retry +import json from .exceptions import ApiServerException @@ -54,33 +56,79 @@ def get_status(code: int) -> HttpStatus: class HttpClient: - @staticmethod + _session: Optional[requests.Session] = None + + @classmethod + def get_session(cls) -> requests.Session: + """Get or create the global session with optimized connection pooling""" + if cls._session is None: + cls._session = requests.Session() + + # Configure connection pooling + adapter = requests.adapters.HTTPAdapter( + pool_connections=15, # Number of connection pools + pool_maxsize=256, # Connections per pool + max_retries=Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]), + ) + + # Mount adapter for both HTTP and HTTPS + cls._session.mount("http://", adapter) + cls._session.mount("https://", adapter) + + # Set default headers + cls._session.headers.update( + { + "Connection": "keep-alive", + "Keep-Alive": "timeout=10, max=1000", + "Content-Type": "application/json", + } + ) + + return cls._session + + @classmethod + def _prepare_headers( + cls, + api_key: Optional[str] = None, + parent_key: Optional[str] = None, + jwt: Optional[str] = None, + custom_headers: Optional[dict] = None, + ) -> dict: + """Prepare headers for the request""" + headers = JSON_HEADER.copy() + + if api_key is not None: + headers["X-Agentops-Api-Key"] = api_key + + if parent_key is not None: + headers["X-Agentops-Parent-Key"] = parent_key + + if jwt is not None: + headers["Authorization"] = f"Bearer {jwt}" + + if custom_headers is not None: + headers.update(custom_headers) + + return headers + + @classmethod def post( + cls, url: str, payload: bytes, api_key: Optional[str] = None, parent_key: Optional[str] = None, jwt: Optional[str] = None, - header=None, + header: Optional[Dict[str, str]] = None, ) -> Response: + """Make HTTP POST request using connection pooling""" result = Response() try: - # Create request session with retries configured - request_session = requests.Session() - request_session.mount(url, HTTPAdapter(max_retries=retry_config)) - - if api_key is not None: - JSON_HEADER["X-Agentops-Api-Key"] = api_key - - if parent_key is not None: - JSON_HEADER["X-Agentops-Parent-Key"] = parent_key - - if jwt is not None: - JSON_HEADER["Authorization"] = f"Bearer {jwt}" - - res = request_session.post(url, data=payload, headers=JSON_HEADER, timeout=20) - + headers = cls._prepare_headers(api_key, parent_key, jwt, header) + session = cls.get_session() + res = session.post(url, data=payload, headers=headers, timeout=20) result.parse(res) + except requests.exceptions.Timeout: result.code = 408 result.status = HttpStatus.TIMEOUT @@ -112,28 +160,22 @@ def post( return result - @staticmethod + @classmethod def get( + cls, url: str, api_key: Optional[str] = None, jwt: Optional[str] = None, - header=None, + header: Optional[Dict[str, str]] = None, ) -> Response: + """Make HTTP GET request using connection pooling""" result = Response() try: - # Create request session with retries configured - request_session = requests.Session() - request_session.mount(url, HTTPAdapter(max_retries=retry_config)) - - if api_key is not None: - JSON_HEADER["X-Agentops-Api-Key"] = api_key - - if jwt is not None: - JSON_HEADER["Authorization"] = f"Bearer {jwt}" - - res = request_session.get(url, headers=JSON_HEADER, timeout=20) - + headers = cls._prepare_headers(api_key, None, jwt, header) + session = cls.get_session() + res = session.get(url, headers=headers, timeout=20) result.parse(res) + except requests.exceptions.Timeout: result.code = 408 result.status = HttpStatus.TIMEOUT diff --git a/agentops/partners/autogen_logger.py b/agentops/partners/autogen_logger.py index 77aca142..002a4e84 100644 --- a/agentops/partners/autogen_logger.py +++ b/agentops/partners/autogen_logger.py @@ -10,9 +10,9 @@ from openai.types.chat import ChatCompletion from autogen.logger.base_logger import BaseLogger, LLMConfig -from autogen.logger.logger_utils import get_current_ts, to_dict from agentops.enums import EndState +from agentops.helpers import get_ISO_time from agentops import LLMEvent, ToolEvent, ActionEvent from uuid import uuid4 @@ -55,17 +55,18 @@ def log_chat_completion( start_time: str, ) -> None: """Records an LLMEvent to AgentOps session""" - end_time = get_current_ts() completion = response.choices[len(response.choices) - 1] + # Note: Autogen tokens are not included in the request and function call tokens are not counted in the completion llm_event = LLMEvent( prompt=request["messages"], - completion=completion.message, + completion=completion.message.to_json(), model=response.model, + cost=cost, ) llm_event.init_timestamp = start_time - llm_event.end_timestamp = end_time + llm_event.end_timestamp = get_ISO_time() llm_event.agent_id = self._get_agentops_id_from_agent(str(id(agent))) agentops.record(llm_event) diff --git a/agentops/session.py b/agentops/session.py index 1213e2fd..28e4ca6a 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -1,21 +1,163 @@ -import copy +from __future__ import annotations + +import asyncio import functools import json import threading -import time +from datetime import datetime, timezone from decimal import ROUND_HALF_UP, Decimal -from termcolor import colored -from typing import Any, Optional, List, Union +from typing import Any, Dict, List, Optional, Sequence, Union from uuid import UUID, uuid4 -from datetime import datetime -from .exceptions import ApiServerException +from opentelemetry import trace +from opentelemetry.context import attach, detach, set_value +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SpanExporter, SpanExportResult +from termcolor import colored + +from .config import Configuration from .enums import EndState from .event import ErrorEvent, Event -from .log_config import logger -from .config import Configuration -from .helpers import get_ISO_time, filter_unjsonable, safe_serialize +from .exceptions import ApiServerException +from .helpers import filter_unjsonable, get_ISO_time, safe_serialize from .http_client import HttpClient, Response +from .log_config import logger + +""" +OTEL Guidelines: + + + +- Maintain a single TracerProvider for the application runtime + - Have one global TracerProvider in the Client class + +- According to the OpenTelemetry Python documentation, Resource should be initialized once per application and shared across all telemetry (traces, metrics, logs). +- Each Session gets its own Tracer (with session-specific context) +- Allow multiple sessions to share the provider while maintaining their own context + + + +:: Resource + + '''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + Captures information about the entity producing telemetry as Attributes. + For example, a process producing telemetry that is running in a container + on Kubernetes has a process name, a pod name, a namespace, and possibly + a deployment name. All these attributes can be included in the Resource. + '''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + + The key insight from the documentation is: + + - Resource represents the entity producing telemetry - in our case, that's the AgentOps SDK application itself + - Session-specific information should be attributes on the spans themselves + - A Resource is meant to identify the service/process/application1 + - Sessions are units of work within that application + - The documentation example about "process name, pod name, namespace" refers to where the code is running, not the work it's doing + +""" + + +class SessionExporter(SpanExporter): + """ + Manages publishing events for Session + """ + + def __init__(self, session: Session, **kwargs): + self.session = session + self._shutdown = threading.Event() + self._export_lock = threading.Lock() + super().__init__(**kwargs) + + @property + def endpoint(self): + return f"{self.session.config.endpoint}/v2/create_events" + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + if self._shutdown.is_set(): + return SpanExportResult.SUCCESS + + with self._export_lock: + try: + # Skip if no spans to export + if not spans: + return SpanExportResult.SUCCESS + + events = [] + for span in spans: + event_data = json.loads(span.attributes.get("event.data", "{}")) + + # Format event data based on event type + if span.name == "actions": + formatted_data = { + "action_type": event_data.get("action_type", event_data.get("name", "unknown_action")), + "params": event_data.get("params", {}), + "returns": event_data.get("returns"), + } + elif span.name == "tools": + formatted_data = { + "name": event_data.get("name", event_data.get("tool_name", "unknown_tool")), + "params": event_data.get("params", {}), + "returns": event_data.get("returns"), + } + else: + formatted_data = event_data + + formatted_data = {**event_data, **formatted_data} + # Get timestamps, providing defaults if missing + current_time = datetime.now(timezone.utc).isoformat() + init_timestamp = span.attributes.get("event.timestamp") + end_timestamp = span.attributes.get("event.end_timestamp") + + # Handle missing timestamps + if init_timestamp is None: + init_timestamp = current_time + if end_timestamp is None: + end_timestamp = current_time + + # Get event ID, generate new one if missing + event_id = span.attributes.get("event.id") + if event_id is None: + event_id = str(uuid4()) + + events.append( + { + "id": event_id, + "event_type": span.name, + "init_timestamp": init_timestamp, + "end_timestamp": end_timestamp, + **formatted_data, + "session_id": str(self.session.session_id), + } + ) + + # Only make HTTP request if we have events and not shutdown + if events: + try: + res = HttpClient.post( + self.endpoint, + json.dumps({"events": events}).encode("utf-8"), + api_key=self.session.config.api_key, + jwt=self.session.jwt, + ) + return SpanExportResult.SUCCESS if res.code == 200 else SpanExportResult.FAILURE + except Exception as e: + logger.error(f"Failed to send events: {e}") + return SpanExportResult.FAILURE + + return SpanExportResult.SUCCESS + + except Exception as e: + logger.error(f"Failed to export spans: {e}") + return SpanExportResult.FAILURE + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + return True + + def shutdown(self) -> None: + """Handle shutdown gracefully""" + self._shutdown.set() + # Don't call session.end_session() here to avoid circular dependencies class Session: @@ -67,9 +209,10 @@ def __init__( self.host_env = host_env self.config = config self.jwt = None - self.lock = threading.Lock() - self.queue: List[Any] = [] - self.token_cost = Decimal(0) + self._lock = threading.Lock() + self._end_session_lock = threading.Lock() + self.token_cost: Decimal = Decimal(0) + self._session_url: str = "" self.event_counts = { "llms": 0, "tools": 0, @@ -77,17 +220,30 @@ def __init__( "errors": 0, "apis": 0, } - self.session_url: Optional[str] = None - - self.stop_flag = threading.Event() - self.thread = threading.Thread(target=self._run) - self.thread.daemon = True - self.thread.start() + # self.session_url: Optional[str] = None + # Start session first to get JWT self.is_running = self._start_session() - if self.is_running == False: - self.stop_flag.set() - self.thread.join(timeout=1) + if not self.is_running: + return + + # Initialize OTEL components with a more controlled processor + self._tracer_provider = TracerProvider() + self._otel_tracer = self._tracer_provider.get_tracer( + f"agentops.session.{str(session_id)}", + ) + self._otel_exporter = SessionExporter(session=self) + + # Use smaller batch size and shorter delay to reduce buffering + self._span_processor = BatchSpanProcessor( + self._otel_exporter, + max_queue_size=self.config.max_queue_size, + schedule_delay_millis=self.config.max_wait_time, + max_export_batch_size=min(max(self.config.max_queue_size // 20, 1), min(self.config.max_queue_size, 32)), + export_timeout_millis=20000, + ) + + self._tracer_provider.add_span_processor(self._span_processor) def set_video(self, video: str) -> None: """ @@ -98,57 +254,99 @@ def set_video(self, video: str) -> None: """ self.video = video + def _flush_spans(self) -> bool: + """ + Flush pending spans for this specific session with timeout. + Returns True if flush was successful, False otherwise. + """ + if not hasattr(self, "_span_processor"): + return True + + try: + success = self._span_processor.force_flush(timeout_millis=self.config.max_wait_time) + if not success: + logger.warning("Failed to flush all spans before session end") + return success + except Exception as e: + logger.warning(f"Error flushing spans: {e}") + return False + def end_session( self, end_state: str = "Indeterminate", end_state_reason: Optional[str] = None, video: Optional[str] = None, ) -> Union[Decimal, None]: - if not self.is_running: - return None + with self._end_session_lock: + if not self.is_running: + return None - if not any(end_state == state.value for state in EndState): - logger.warning("Invalid end_state. Please use one of the EndState enums") - return None + if not any(end_state == state.value for state in EndState): + logger.warning("Invalid end_state. Please use one of the EndState enums") + return None - self.end_timestamp = get_ISO_time() - self.end_state = end_state - self.end_state_reason = end_state_reason - if video is not None: - self.video = video - - self.stop_flag.set() - self.thread.join(timeout=1) - self._flush_queue() - analytics_stats = self.get_analytics() - - analytics = ( - f"Session Stats - " - f"{colored('Duration:', attrs=['bold'])} {analytics_stats['Duration']} | " - f"{colored('Cost:', attrs=['bold'])} ${analytics_stats['Cost']} | " - f"{colored('LLMs:', attrs=['bold'])} {analytics_stats['LLM calls']} | " - f"{colored('Tools:', attrs=['bold'])} {analytics_stats['Tool calls']} | " - f"{colored('Actions:', attrs=['bold'])} {analytics_stats['Actions']} | " - f"{colored('Errors:', attrs=['bold'])} {analytics_stats['Errors']}" - ) - logger.info(analytics) + try: + # Force flush any pending spans before ending session + if hasattr(self, "_span_processor"): + self._span_processor.force_flush(timeout_millis=5000) + + # 1. Set shutdown flag on exporter first + if hasattr(self, "_otel_exporter"): + self._otel_exporter.shutdown() + + # 2. Set session end state + self.end_timestamp = get_ISO_time() + self.end_state = end_state + self.end_state_reason = end_state_reason + if video is not None: + self.video = video + + # 3. Mark session as not running before cleanup + self.is_running = False + + # 4. Clean up OTEL components + if hasattr(self, "_span_processor"): + try: + # Force flush any pending spans + self._span_processor.force_flush(timeout_millis=5000) + # Shutdown the processor + self._span_processor.shutdown() + except Exception as e: + logger.warning(f"Error during span processor cleanup: {e}") + finally: + del self._span_processor + + # 5. Final session update + if not (analytics_stats := self.get_analytics()): + return None + + analytics = ( + f"Session Stats - " + f"{colored('Duration:', attrs=['bold'])} {analytics_stats['Duration']} | " + f"{colored('Cost:', attrs=['bold'])} ${analytics_stats['Cost']} | " + f"{colored('LLMs:', attrs=['bold'])} {analytics_stats['LLM calls']} | " + f"{colored('Tools:', attrs=['bold'])} {analytics_stats['Tool calls']} | " + f"{colored('Actions:', attrs=['bold'])} {analytics_stats['Actions']} | " + f"{colored('Errors:', attrs=['bold'])} {analytics_stats['Errors']}" + ) + logger.info(analytics) - logger.info( - colored( - f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", - "blue", - ) - ) - active_sessions.remove(self) + except Exception as e: + logger.exception(f"Error during session end: {e}") + finally: + active_sessions.remove(self) # First thing, get rid of the session - return self.token_cost + logger.info( + colored( + f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", + "blue", + ) + ) + return self.token_cost def add_tags(self, tags: List[str]) -> None: """ Append to session tags at runtime. - - Args: - tags (List[str]): The list of tags to append. """ if not self.is_running: return @@ -157,16 +355,20 @@ def add_tags(self, tags: List[str]) -> None: if isinstance(tags, str): tags = [tags] + # Initialize tags if None if self.tags is None: - self.tags = tags - else: - for tag in tags: - if tag not in self.tags: - self.tags.append(tag) + self.tags = [] + + # Add new tags that don't exist + for tag in tags: + if tag not in self.tags: + self.tags.append(tag) + # Update session state immediately self._update_session() def set_tags(self, tags): + """Set session tags, replacing any existing tags""" if not self.is_running: return @@ -174,39 +376,101 @@ def set_tags(self, tags): if isinstance(tags, str): tags = [tags] - self.tags = tags + # Set tags directly + self.tags = tags.copy() # Make a copy to avoid reference issues + + # Update session state immediately self._update_session() - def record(self, event: Union[Event, ErrorEvent]): + def record(self, event: Union[Event, ErrorEvent], flush_now=False): + """Record an event using OpenTelemetry spans""" if not self.is_running: return - if isinstance(event, Event): - if not event.end_timestamp or event.init_timestamp == event.end_timestamp: - event.end_timestamp = get_ISO_time() - elif isinstance(event, ErrorEvent): - if event.trigger_event: - if ( - not event.trigger_event.end_timestamp - or event.trigger_event.init_timestamp == event.trigger_event.end_timestamp - ): - event.trigger_event.end_timestamp = get_ISO_time() - - event.trigger_event_id = event.trigger_event.id - event.trigger_event_type = event.trigger_event.event_type - self._add_event(event.trigger_event.__dict__) - event.trigger_event = None # removes trigger_event from serialization - - self._add_event(event.__dict__) - - def _add_event(self, event: dict) -> None: - with self.lock: - self.queue.append(event) - - if len(self.queue) >= self.config.max_queue_size: - self._flush_queue() + + # Ensure event has all required base attributes + if not hasattr(event, "id"): + event.id = uuid4() + if not hasattr(event, "init_timestamp"): + event.init_timestamp = get_ISO_time() + if not hasattr(event, "end_timestamp") or event.end_timestamp is None: + event.end_timestamp = get_ISO_time() + + # Create session context + token = set_value("session.id", str(self.session_id)) + + try: + token = attach(token) + + # Create a copy of event data to modify + event_data = dict(filter_unjsonable(event.__dict__)) + + # Add required fields based on event type + if isinstance(event, ErrorEvent): + event_data["error_type"] = getattr(event, "error_type", event.event_type) + elif event.event_type == "actions": + # Ensure action events have action_type + if "action_type" not in event_data: + event_data["action_type"] = event_data.get("name", "unknown_action") + if "name" not in event_data: + event_data["name"] = event_data.get("action_type", "unknown_action") + elif event.event_type == "tools": + # Ensure tool events have name + if "name" not in event_data: + event_data["name"] = event_data.get("tool_name", "unknown_tool") + if "tool_name" not in event_data: + event_data["tool_name"] = event_data.get("name", "unknown_tool") + + with self._otel_tracer.start_as_current_span( + name=event.event_type, + attributes={ + "event.id": str(event.id), + "event.type": event.event_type, + "event.timestamp": event.init_timestamp or get_ISO_time(), + "event.end_timestamp": event.end_timestamp or get_ISO_time(), + "session.id": str(self.session_id), + "session.tags": ",".join(self.tags) if self.tags else "", + "event.data": json.dumps(event_data), + }, + ) as span: + if event.event_type in self.event_counts: + self.event_counts[event.event_type] += 1 + + if isinstance(event, ErrorEvent): + span.set_attribute("error", True) + if hasattr(event, "trigger_event") and event.trigger_event: + span.set_attribute("trigger_event.id", str(event.trigger_event.id)) + span.set_attribute("trigger_event.type", event.trigger_event.event_type) + + if flush_now and hasattr(self, "_span_processor"): + self._span_processor.force_flush() + finally: + detach(token) + + def _send_event(self, event): + """Direct event sending for testing""" + try: + payload = { + "events": [ + { + "id": str(event.id), + "event_type": event.event_type, + "init_timestamp": event.init_timestamp, + "end_timestamp": event.end_timestamp, + "data": filter_unjsonable(event.__dict__), + } + ] + } + + HttpClient.post( + f"{self.config.endpoint}/v2/create_events", + json.dumps(payload).encode("utf-8"), + jwt=self.jwt, + ) + except Exception as e: + logger.error(f"Failed to send event: {e}") def _reauthorize_jwt(self) -> Union[str, None]: - with self.lock: + with self._lock: payload = {"session_id": self.session_id} serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") res = HttpClient.post( @@ -225,8 +489,7 @@ def _reauthorize_jwt(self) -> Union[str, None]: return jwt def _start_session(self): - self.queue = [] - with self.lock: + with self._lock: payload = {"session": self.__dict__} serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") @@ -234,8 +497,8 @@ def _start_session(self): res = HttpClient.post( f"{self.config.endpoint}/v2/create_session", serialized_payload, - self.config.api_key, - self.config.parent_key, + api_key=self.config.api_key, + parent_key=self.config.parent_key, ) except ApiServerException as e: return logger.error(f"Could not start session - {e}") @@ -250,14 +513,9 @@ def _start_session(self): if jwt is None: return False - session_url = res.body.get( - "session_url", - f"https://app.agentops.ai/drilldown?session_id={self.session_id}", - ) - logger.info( colored( - f"\x1b[34mSession Replay: {session_url}\x1b[0m", + f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", "blue", ) ) @@ -265,68 +523,22 @@ def _start_session(self): return True def _update_session(self) -> None: + """Update session state on the server""" if not self.is_running: return - with self.lock: + with self._lock: # TODO: Determine whether we really need to lock here: are incoming calls coming from other threads? payload = {"session": self.__dict__} try: res = HttpClient.post( f"{self.config.endpoint}/v2/update_session", json.dumps(filter_unjsonable(payload)).encode("utf-8"), + # self.config.api_key, jwt=self.jwt, ) except ApiServerException as e: return logger.error(f"Could not update session - {e}") - def _flush_queue(self) -> None: - if not self.is_running: - return - with self.lock: - queue_copy = self.queue[:] # Copy the current items - self.queue = [] - - if len(queue_copy) > 0: - payload = { - "events": queue_copy, - } - - serialized_payload = safe_serialize(payload).encode("utf-8") - try: - HttpClient.post( - f"{self.config.endpoint}/v2/create_events", - serialized_payload, - jwt=self.jwt, - ) - except ApiServerException as e: - return logger.error(f"Could not post events - {e}") - - logger.debug("\n") - logger.debug(f"Session request to {self.config.endpoint}/v2/create_events") - logger.debug(serialized_payload) - logger.debug("\n") - - # Count total events created based on type - events = payload["events"] - for event in events: - event_type = event["event_type"] - if event_type == "llms": - self.event_counts["llms"] += 1 - elif event_type == "tools": - self.event_counts["tools"] += 1 - elif event_type == "actions": - self.event_counts["actions"] += 1 - elif event_type == "errors": - self.event_counts["errors"] += 1 - elif event_type == "apis": - self.event_counts["apis"] += 1 - - def _run(self) -> None: - while not self.stop_flag.is_set(): - time.sleep(self.config.max_wait_time / 1000) - if self.queue: - self._flush_queue() - def create_agent(self, name, agent_id): if not self.is_running: return @@ -343,6 +555,7 @@ def create_agent(self, name, agent_id): HttpClient.post( f"{self.config.endpoint}/v2/create_agent", serialized_payload, + api_key=self.config.api_key, jwt=self.jwt, ) except ApiServerException as e: @@ -358,8 +571,22 @@ def wrapper(*args, **kwargs): return wrapper - @staticmethod - def _format_duration(start_time, end_time): + def _get_response(self) -> Optional[Response]: + payload = {"session": self.__dict__} + try: + response = HttpClient.post( + f"{self.config.endpoint}/v2/update_session", + json.dumps(filter_unjsonable(payload)).encode("utf-8"), + api_key=self.config.api_key, + jwt=self.jwt, + ) + except ApiServerException as e: + return logger.error(f"Could not end session - {e}") + + logger.debug(response.body) + return response + + def _format_duration(self, start_time, end_time) -> str: start = datetime.fromisoformat(start_time.replace("Z", "+00:00")) end = datetime.fromisoformat(end_time.replace("Z", "+00:00")) duration = end - start @@ -376,53 +603,29 @@ def _format_duration(start_time, end_time): return " ".join(parts) - def _get_response(self) -> Optional[Response]: - with self.lock: - payload = {"session": self.__dict__} - try: - response = HttpClient.post( - f"{self.config.endpoint}/v2/update_session", - json.dumps(filter_unjsonable(payload)).encode("utf-8"), - jwt=self.jwt, - ) - except ApiServerException as e: - logger.error(f"Could not fetch response from server - {e}") - return None - - logger.debug(response.body) - return response - def _get_token_cost(self, response: Response) -> Decimal: token_cost = response.body.get("token_cost", "unknown") if token_cost == "unknown" or token_cost is None: return Decimal(0) return Decimal(token_cost) - @staticmethod - def _format_token_cost(token_cost_d): + def _format_token_cost(self, token_cost: Decimal) -> str: return ( - "{:.2f}".format(token_cost_d) - if token_cost_d == 0 - else "{:.6f}".format(token_cost_d.quantize(Decimal("0.000001"), rounding=ROUND_HALF_UP)) + "{:.2f}".format(token_cost) + if token_cost == 0 + else "{:.6f}".format(token_cost.quantize(Decimal("0.000001"), rounding=ROUND_HALF_UP)) ) - def get_analytics(self) -> Optional[dict[str, Union[Decimal, str]]]: + def get_analytics(self) -> Optional[Dict[str, Any]]: if not self.end_timestamp: self.end_timestamp = get_ISO_time() formatted_duration = self._format_duration(self.init_timestamp, self.end_timestamp) - response = self._get_response() - if response is None: + if (response := self._get_response()) is None: return None self.token_cost = self._get_token_cost(response) - formatted_cost = self._format_token_cost(self.token_cost) - - self.session_url = response.body.get( - "session_url", - f"https://app.agentops.ai/drilldown?session_id={self.session_id}", - ) return { "LLM calls": self.event_counts["llms"], @@ -430,8 +633,18 @@ def get_analytics(self) -> Optional[dict[str, Union[Decimal, str]]]: "Actions": self.event_counts["actions"], "Errors": self.event_counts["errors"], "Duration": formatted_duration, - "Cost": formatted_cost, + "Cost": self._format_token_cost(self.token_cost), } + @property + def session_url(self) -> str: + """Returns the URL for this session in the AgentOps dashboard.""" + assert self.session_id, "Session ID is required to generate a session URL" + return f"https://app.agentops.ai/drilldown?session_id={self.session_id}" + + # @session_url.setter + # def session_url(self, url: str): + # pass + active_sessions: List[Session] = [] diff --git a/pyproject.toml b/pyproject.toml index e5129f69..74c0f791 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,10 @@ dependencies = [ "psutil==5.9.8", "packaging==23.2", "termcolor>=2.3.0", # 2.x.x tolerant - "PyYAML>=5.3,<7.0" + "PyYAML>=5.3,<7.0", + "opentelemetry-api>=1.22.0,<2.0.0", # API for interfaces + "opentelemetry-sdk>=1.22.0,<2.0.0", # SDK for implementation + "opentelemetry-exporter-otlp-proto-http>=1.22.0,<2.0.0", # For OTLPSpanExporter ] [project.optional-dependencies] dev = [ @@ -37,6 +40,7 @@ dev = [ "requests_mock==1.11.0", "ruff", "tach~=0.9", + "vcrpy>=6.0.0; python_version >= '3.8'" ] langchain = [ "langchain==0.2.14; python_version >= '3.8.1'" @@ -55,6 +59,11 @@ agentops = "agentops.cli:main" [tool.pytest.ini_options] asyncio_mode = "strict" asyncio_default_fixture_loop_scope = "function" +test_paths = [ + "tests", +] +addopts = "--import-mode=importlib --tb=short -p no:warnings" +pythonpath = ["."] [tool.ruff] line-length = 120 diff --git a/tests/core_manual_tests/api_server/main.py b/tests/core_manual_tests/api_server/main.py new file mode 100644 index 00000000..d4a5581a --- /dev/null +++ b/tests/core_manual_tests/api_server/main.py @@ -0,0 +1,37 @@ +import agentops +from fastapi import FastAPI +import uvicorn +from dotenv import load_dotenv +from agentops import record_tool +from openai import OpenAI +import time + +load_dotenv() + +openai = OpenAI() +agentops.init(auto_start_session=False) +app = FastAPI() + + +@app.get("/completion") +def completion(): + start_time = time.time() + + session = agentops.start_session(tags=["api-server-test"]) + + @record_tool(tool_name="foo") + def foo(x: str): + print(x) + + foo("Hello") + + session.end_session(end_state="Success") + + end_time = time.time() + execution_time = end_time - start_time + + return {"response": "Done", "execution_time_seconds": round(execution_time, 3)} + + +if __name__ == "__main__": + uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True) diff --git a/tests/core_manual_tests/api_server/readme.md b/tests/core_manual_tests/api_server/readme.md index 3f32804d..04e7dc16 100644 --- a/tests/core_manual_tests/api_server/readme.md +++ b/tests/core_manual_tests/api_server/readme.md @@ -1,9 +1,10 @@ # API server test This is a manual test with two files. It checks to make sure that the SDK works in an API environment. -## Running -1. `python server.py` -2. In different terminal, `python client.py` +## Running the FastAPI Server +You can run FastAPI with: +1. `uvicorn main:app --reload` +2. To test, run `curl http://localhost:8000/completion` in a different terminal. ## Validate Check in your AgentOps Dashboard diff --git a/tests/core_manual_tests/api_server/server.py b/tests/core_manual_tests/api_server/server.py deleted file mode 100644 index 5ae5e1ae..00000000 --- a/tests/core_manual_tests/api_server/server.py +++ /dev/null @@ -1,39 +0,0 @@ -import agentops -from fastapi import FastAPI -import uvicorn -from dotenv import load_dotenv -from agentops import ActionEvent -from openai import OpenAI - -load_dotenv() - -openai = OpenAI() -agentops.init() -app = FastAPI() - - -@app.get("/completion") -def completion(): - session = agentops.start_session(tags=["api-server-test"]) - - messages = [{"role": "user", "content": "Hello"}] - response = session.patch(openai.chat.completions.create)( - model="gpt-3.5-turbo", - messages=messages, - temperature=0.5, - ) - - session.record( - ActionEvent( - action_type="Agent says hello", - params=messages, - returns=str(response.choices[0].message.content), - ), - ) - - session.end_session(end_state="Success") - - return {"response": response} - - -uvicorn.run(app, host="0.0.0.0", port=9696) diff --git a/tests/core_manual_tests/benchmark.py b/tests/core_manual_tests/benchmark.py new file mode 100644 index 00000000..ee73899c --- /dev/null +++ b/tests/core_manual_tests/benchmark.py @@ -0,0 +1,50 @@ +import logging + +# logging.basicConfig(level=logging.DEBUG) + +from datetime import datetime, timezone +from uuid import uuid4 + +import openai +from pyinstrument import Profiler + +import agentops + + +def make_openai_call(): + client = openai.Client() + return client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "system", "content": "You are a chatbot."}, + {"role": "user", "content": "What are you talking about?"}, + ], + ) + + +# Initialize profiler +profiler = Profiler() +profiler.start() + +try: + # Initialize AgentOps with auto_start_session=False + agentops.init(auto_start_session=False) + # Start a single test session + session = agentops.start_session() + assert session is not None + + # Make multiple calls + responses = [] + # Make 20 sequential calls for benchmarking + for _ in range(1): + responses.append(make_openai_call()) + + # End the session properly + session.end_session(end_state="Success") + +finally: + # Stop profiling and print results + profiler.stop() + # with open("profiling_reports/{}.txt".format(datetime.now(timezone.utc).isoformat()), "w") as f: + # f.write(profiler.output_text(unicode=True, color=False)) + print(profiler.output_text(unicode=True, color=True)) diff --git a/tests/test_session.py b/tests/test_session.py index 25ad8246..4f9123bc 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -1,8 +1,22 @@ +import json +import time +from typing import Dict, Optional, Sequence +from unittest.mock import MagicMock, Mock, patch +from datetime import datetime, timezone + import pytest import requests_mock -import time +from opentelemetry import trace +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.trace import SpanContext, SpanKind +from opentelemetry.sdk.trace.export import SpanExportResult +from opentelemetry.trace import Status, StatusCode +from opentelemetry.trace.span import TraceState +from uuid import UUID + import agentops from agentops import ActionEvent, Client +from agentops.http_client import HttpClient from agentops.singleton import clear_singletons @@ -359,3 +373,218 @@ def test_get_analytics_multiple_sessions(self, mock_req): session_1.end_session(end_state) session_2.end_session(end_state) + + +class TestSessionExporter: + def setup_method(self): + self.api_key = "11111111-1111-4111-8111-111111111111" + # Initialize agentops first + agentops.init(api_key=self.api_key, max_wait_time=50, auto_start_session=False) + self.session = agentops.start_session() + assert self.session is not None # Verify session was created + self.exporter = self.session._otel_exporter + + def teardown_method(self): + """Clean up after each test""" + if self.session: + self.session.end_session("Success") + agentops.end_all_sessions() + clear_singletons() + + def create_test_span(self, name="test_span", attributes=None): + """Helper to create a test span with required attributes""" + if attributes is None: + attributes = {} + + # Ensure required attributes are present + base_attributes = { + "event.id": str(UUID(int=1)), + "event.type": "test_type", + "event.timestamp": datetime.now(timezone.utc).isoformat(), + "event.end_timestamp": datetime.now(timezone.utc).isoformat(), + "event.data": json.dumps({"test": "data"}), + "session.id": str(self.session.session_id), + } + base_attributes.update(attributes) + + context = SpanContext( + trace_id=0x000000000000000000000000DEADBEEF, + span_id=0x00000000DEADBEF0, + is_remote=False, + trace_state=TraceState(), + ) + + return ReadableSpan( + name=name, + context=context, + kind=SpanKind.INTERNAL, + status=Status(StatusCode.OK), + start_time=123, + end_time=456, + attributes=base_attributes, + events=[], + links=[], + resource=self.session._tracer_provider.resource, + ) + + def test_export_basic_span(self, mock_req): + """Test basic span export with all required fields""" + span = self.create_test_span() + result = self.exporter.export([span]) + + assert result == SpanExportResult.SUCCESS + assert len(mock_req.request_history) > 0 + + last_request = mock_req.last_request.json() + assert "events" in last_request + event = last_request["events"][0] + + # Verify required fields + assert "id" in event + assert "event_type" in event + assert "init_timestamp" in event + assert "end_timestamp" in event + assert "session_id" in event + + def test_export_action_event(self, mock_req): + """Test export of action event with specific formatting""" + action_attributes = { + "event.data": json.dumps( + {"action_type": "test_action", "params": {"param1": "value1"}, "returns": "test_return"} + ) + } + + span = self.create_test_span(name="actions", attributes=action_attributes) + result = self.exporter.export([span]) + + assert result == SpanExportResult.SUCCESS + + last_request = mock_req.request_history[-1].json() + event = last_request["events"][0] + + assert event["action_type"] == "test_action" + assert event["params"] == {"param1": "value1"} + assert event["returns"] == "test_return" + + def test_export_tool_event(self, mock_req): + """Test export of tool event with specific formatting""" + tool_attributes = { + "event.data": json.dumps({"name": "test_tool", "params": {"param1": "value1"}, "returns": "test_return"}) + } + + span = self.create_test_span(name="tools", attributes=tool_attributes) + result = self.exporter.export([span]) + + assert result == SpanExportResult.SUCCESS + + last_request = mock_req.request_history[-1].json() + event = last_request["events"][0] + + assert event["name"] == "test_tool" + assert event["params"] == {"param1": "value1"} + assert event["returns"] == "test_return" + + def test_export_with_missing_timestamp(self, mock_req): + """Test handling of missing end_timestamp""" + attributes = {"event.end_timestamp": None} # This should be handled gracefully + + span = self.create_test_span(attributes=attributes) + result = self.exporter.export([span]) + + assert result == SpanExportResult.SUCCESS + + last_request = mock_req.request_history[-1].json() + event = last_request["events"][0] + + # Verify end_timestamp is present and valid + assert "end_timestamp" in event + assert event["end_timestamp"] is not None + + def test_export_with_missing_timestamps_advanced(self, mock_req): + """Test handling of missing timestamps""" + attributes = {"event.timestamp": None, "event.end_timestamp": None} + + span = self.create_test_span(attributes=attributes) + result = self.exporter.export([span]) + + assert result == SpanExportResult.SUCCESS + + last_request = mock_req.request_history[-1].json() + event = last_request["events"][0] + + # Verify timestamps are present and valid + assert "init_timestamp" in event + assert "end_timestamp" in event + assert event["init_timestamp"] is not None + assert event["end_timestamp"] is not None + + # Verify timestamps are in ISO format + try: + datetime.fromisoformat(event["init_timestamp"].replace("Z", "+00:00")) + datetime.fromisoformat(event["end_timestamp"].replace("Z", "+00:00")) + except ValueError: + pytest.fail("Timestamps are not in valid ISO format") + + def test_export_with_shutdown(self, mock_req): + """Test export behavior when shutdown""" + self.exporter._shutdown.set() + span = self.create_test_span() + + result = self.exporter.export([span]) + assert result == SpanExportResult.SUCCESS + + # Verify no request was made + assert not any(req.url.endswith("/v2/create_events") for req in mock_req.request_history[-1:]) + + def test_export_llm_event(self, mock_req): + """Test export of LLM event with specific handling of timestamps""" + llm_attributes = { + "event.data": json.dumps( + { + "prompt": "test prompt", + "completion": "test completion", + "model": "test-model", + "tokens": 100, + "cost": 0.002, + } + ) + } + + span = self.create_test_span(name="llms", attributes=llm_attributes) + result = self.exporter.export([span]) + + assert result == SpanExportResult.SUCCESS + + last_request = mock_req.request_history[-1].json() + event = last_request["events"][0] + + # Verify LLM specific fields + assert event["prompt"] == "test prompt" + assert event["completion"] == "test completion" + assert event["model"] == "test-model" + assert event["tokens"] == 100 + assert event["cost"] == 0.002 + + # Verify timestamps + assert event["init_timestamp"] is not None + assert event["end_timestamp"] is not None + + def test_export_with_missing_id(self, mock_req): + """Test handling of missing event ID""" + attributes = {"event.id": None} + + span = self.create_test_span(attributes=attributes) + result = self.exporter.export([span]) + + assert result == SpanExportResult.SUCCESS + + last_request = mock_req.request_history[-1].json() + event = last_request["events"][0] + + # Verify ID is present and valid UUID + assert "id" in event + assert event["id"] is not None + try: + UUID(event["id"]) + except ValueError: + pytest.fail("Event ID is not a valid UUID")