-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring core session management #486
base: main
Are you sure you want to change the base?
Conversation
…ration class for better control over shutdown behavior
Fixes PDB within pytest
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
…e loop Added proper error handling in both thread classesModified the EventPublisherThread to check stopping condition more frequentlyAdded condition to publish events when stoppingImproved thread shutdown logicAdded shorter timeout in ChangesObserverThread to check stopping flag more frequentlyAdded proper cleanup in stop() methods Signed-off-by: Teo <[email protected]>
2ae3fbc
to
0fc5a87
Compare
Files selected (1)
Files ignored (1)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
Signed-off-by: Teo <[email protected]>
0fc5a87
to
ce45bc9
Compare
Files selected (1)
Files ignored (1)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
3df5961
to
b3e2643
Compare
Files selected (2)
Files ignored (0)InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
Files selected (2)
Files ignored (1)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
agentops/client.py
Outdated
|
||
# replaces the session currently stored with a specific session_id, with a new session | ||
def _update_session(self, session: Session): | ||
self._sessions[ | ||
self._sessions.index( | ||
[ | ||
sess | ||
for sess in self._sessions | ||
if sess.session_id == session.session_id | ||
][0] | ||
) | ||
] = session | ||
pass | ||
|
||
def _safe_get_session(self) -> Optional[Session]: | ||
if not self.is_initialized: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Deprecated Method
Deprecation of _update_session Method
The _update_session
method has been deprecated, which is a positive change as it removes the complexity of manually updating sessions in a list. This aligns with the new SessionsCollection
approach, which handles session management more efficiently.
agentops/session.py
Outdated
super().__init__(**kwargs) | ||
|
||
|
||
class SessionsCollection(WeakSet): | ||
""" | ||
A custom collection for managing Session objects that combines WeakSet's automatic cleanup | ||
with list-like indexing capabilities. | ||
|
||
This class is needed because: | ||
1. We want WeakSet's automatic cleanup of unreferenced sessions | ||
2. We need to access sessions by index (e.g., self._sessions[0]) for backwards compatibility | ||
3. Standard WeakSet doesn't support indexing | ||
""" | ||
|
||
def __getitem__(self, index: int) -> Session: | ||
""" | ||
Enable indexing into the collection (e.g., sessions[0]). | ||
""" | ||
# Convert to list for indexing since sets aren't ordered | ||
items = list(self) | ||
return items[index] | ||
|
||
|
||
class SessionApi: | ||
""" | ||
Solely focuses on interacting with the API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Performance Improvement
Optimize Indexing in SessionsCollection
The introduction of SessionsCollection
enhances memory management by leveraging WeakSet
for automatic cleanup of unreferenced sessions. However, converting the WeakSet
to a list for indexing can be inefficient, especially with a large number of sessions. Consider caching the list conversion or using a more efficient data structure if frequent indexing is required.
agentops/session.py
Outdated
if self.is_alive(): | ||
self.join(timeout=0.5) | ||
|
||
|
||
active_sessions = SessionsCollection() | ||
|
||
__all__ = ["Session"] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Logic Error
Verify Compatibility of SessionsCollection with Existing Operations
The replacement of active_sessions
with SessionsCollection
is a significant change. Ensure that all operations previously performed on active_sessions
are compatible with the new class, especially those relying on list-specific methods that WeakSet
does not support.
Commitable Code Suggestion:
if self.is_alive(): | |
self.join(timeout=0.5) | |
active_sessions = SessionsCollection() | |
__all__ = ["Session"] | |
+active_sessions = SessionsCollection() |
agentops/client.py
Outdated
logger.warning(message) | ||
|
||
def end_all_sessions(self): | ||
for s in self._sessions: | ||
s.end_session() | ||
|
||
self._sessions.clear() | ||
|
||
@property |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code Simplification
Simplified Session Termination
The end_all_sessions
method has been simplified by removing the loop that iterates over each session to end it individually. Instead, the clear
method of SessionsCollection
is used, which is more efficient and leverages the new collection's capabilities.
Commitable Code Suggestion:
logger.warning(message) | |
def end_all_sessions(self): | |
for s in self._sessions: | |
s.end_session() | |
self._sessions.clear() | |
@property | |
def end_all_sessions(self): | |
self._sessions.clear() |
agentops/client.py
Outdated
self._pre_init_messages: List[str] = [] | ||
self._initialized: bool = False | ||
self._llm_tracker: Optional[LlmTracker] = None | ||
self._sessions: List[Session] = active_sessions | ||
self._sessions: SessionsCollection = SessionsCollection() | ||
self._config = Configuration() | ||
self._pre_init_queue = {"agents": []} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Performance Improvement
Improved Memory Management with SessionsCollection
The refactoring to use SessionsCollection
instead of a list for session management is a significant improvement. This change enhances memory management by automatically cleaning up unreferenced sessions, which can prevent memory leaks. Additionally, it maintains backward compatibility by providing list-like indexing capabilities.
Commitable Code Suggestion:
self._pre_init_messages: List[str] = [] | |
self._initialized: bool = False | |
self._llm_tracker: Optional[LlmTracker] = None | |
self._sessions: List[Session] = active_sessions | |
self._sessions: SessionsCollection = SessionsCollection() | |
self._config = Configuration() | |
self._pre_init_queue = {"agents": []} | |
- self._sessions: List[Session] = active_sessions | |
+ self._sessions: SessionsCollection = SessionsCollection() |
b3e2643
to
908a278
Compare
Files selected (1)
Files ignored (1)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
agentops/session.py
Outdated
if self.is_alive(): | ||
self.join(timeout=0.5) | ||
|
||
|
||
active_sessions = SessionsCollection() | ||
|
||
__all__ = ["Session"] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Logic Error
Potential Logic Error with Session Order
The replacement of active_sessions
with SessionsCollection
may introduce unexpected behavior if the order of sessions is relied upon elsewhere in the code. Ensure that any code relying on the order of active_sessions
is reviewed and updated accordingly.
agentops/session.py
Outdated
super().__init__(**kwargs) | ||
|
||
|
||
class SessionsCollection(WeakSet): | ||
""" | ||
A custom collection for managing Session objects that combines WeakSet's automatic cleanup | ||
with list-like indexing capabilities. | ||
|
||
This class is needed because: | ||
1. We want WeakSet's automatic cleanup of unreferenced sessions | ||
2. We need to access sessions by index (e.g., self._sessions[0]) for backwards compatibility | ||
3. Standard WeakSet doesn't support indexing | ||
""" | ||
|
||
def __getitem__(self, index: int) -> Session: | ||
""" | ||
Enable indexing into the collection (e.g., sessions[0]). | ||
""" | ||
# Convert to list for indexing since sets aren't ordered | ||
items = list(self) | ||
return items[index] | ||
|
||
|
||
class SessionApi: | ||
""" | ||
Solely focuses on interacting with the API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Performance Improvement
Optimize Indexing in SessionsCollection
The SessionsCollection
class introduces list-like indexing to a WeakSet
, which inherently does not maintain order. Converting the WeakSet
to a list for indexing can be inefficient, especially with a large number of sessions. Consider maintaining an internal list alongside the WeakSet
to optimize indexing operations.
class SessionsCollection(WeakSet):
def __init__(self):
super().__init__()
self._ordered_sessions = []
def add(self, session):
super().add(session)
self._ordered_sessions.append(session)
def __getitem__(self, index: int) -> Session:
return self._ordered_sessions[index]
def discard(self, session):
super().discard(session)
self._ordered_sessions.remove(session)
Commitable Code Suggestion:
super().__init__(**kwargs) | |
class SessionsCollection(WeakSet): | |
""" | |
A custom collection for managing Session objects that combines WeakSet's automatic cleanup | |
with list-like indexing capabilities. | |
This class is needed because: | |
1. We want WeakSet's automatic cleanup of unreferenced sessions | |
2. We need to access sessions by index (e.g., self._sessions[0]) for backwards compatibility | |
3. Standard WeakSet doesn't support indexing | |
""" | |
def __getitem__(self, index: int) -> Session: | |
""" | |
Enable indexing into the collection (e.g., sessions[0]). | |
""" | |
# Convert to list for indexing since sets aren't ordered | |
items = list(self) | |
return items[index] | |
class SessionApi: | |
""" | |
Solely focuses on interacting with the API | |
class SessionsCollection(WeakSet): | |
def __init__(self): | |
super().__init__() | |
self._ordered_sessions = [] | |
def add(self, session): | |
super().add(session) | |
self._ordered_sessions.append(session) | |
def __getitem__(self, index: int) -> Session: | |
return self._ordered_sessions[index] | |
def discard(self, session): | |
super().discard(session) | |
self._ordered_sessions.remove(session) |
908a278
to
a7e527f
Compare
Files selected (1)
Files ignored (0)InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
self._pre_init_messages: List[str] = [] | ||
self._initialized: bool = False | ||
self._llm_tracker: Optional[LlmTracker] = None | ||
self._sessions: List[Session] = active_sessions | ||
self._sessions: List[Session] = list(active_sessions) | ||
self._config = Configuration() | ||
self._pre_init_queue = {"agents": []} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Performance Improvement
Ensure Explicit List Conversion for Robustness
The change from active_sessions
to list(active_sessions)
ensures that _sessions
is explicitly a list, which is crucial if active_sessions
is a mutable sequence like a set or another iterable. This prevents unintended side effects from external modifications to active_sessions
, enhancing the robustness of session management.
- self._sessions: List[Session] = active_sessions
+ self._sessions: List[Session] = list(active_sessions)
Commitable Code Suggestion:
self._pre_init_messages: List[str] = [] | |
self._initialized: bool = False | |
self._llm_tracker: Optional[LlmTracker] = None | |
self._sessions: List[Session] = active_sessions | |
self._sessions: List[Session] = list(active_sessions) | |
self._config = Configuration() | |
self._pre_init_queue = {"agents": []} | |
self._sessions: List[Session] = list(active_sessions) |
Signed-off-by: Teo <[email protected]>
Files selected (1)
Files ignored (0)InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
|
||
event.trigger_event_id = event.trigger_event.id | ||
event.trigger_event_type = event.trigger_event.event_type | ||
self._add_event(event.trigger_event.__dict__) | ||
event.trigger_event = None # removes trigger_event from serialization | ||
def _publish(self): | ||
"""Notify the ChangesObserverThread to perform the API call.""" | ||
with self.conditions["changes"]: # Acquire the lock before notifying | ||
self.conditions["changes"].notify() | ||
|
||
self._add_event(event.__dict__) | ||
def stop(self) -> None: | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Thread Safety Enhancement
Ensure Thread Safety by Acquiring Lock Before Notification
Acquiring the lock before notifying the condition in the _publish
method ensures that the notification is sent in a thread-safe manner. This prevents potential race conditions where other threads might miss the notification if it is sent without holding the lock.
Commitable Code Suggestion:
event.trigger_event_id = event.trigger_event.id | |
event.trigger_event_type = event.trigger_event.event_type | |
self._add_event(event.trigger_event.__dict__) | |
event.trigger_event = None # removes trigger_event from serialization | |
def _publish(self): | |
"""Notify the ChangesObserverThread to perform the API call.""" | |
with self.conditions["changes"]: # Acquire the lock before notifying | |
self.conditions["changes"].notify() | |
self._add_event(event.__dict__) | |
def stop(self) -> None: | |
""" | |
with self.conditions["changes"]: # Acquire the lock before notifying | |
self.conditions["changes"].notify() |
logger.debug(f"{self.__class__.__name__}: started") | ||
while not self.stopping: | ||
try: | ||
# Wait for explicit notification instead of continuous polling | ||
with self.s.conditions["changes"]: | ||
# Use wait with timeout to allow checking stopping condition | ||
self.s.conditions["changes"].wait(timeout=0.5) | ||
|
||
serialized_payload = safe_serialize(payload).encode("utf-8") | ||
try: | ||
HttpClient.post( | ||
f"{self.config.endpoint}/v2/create_agent", | ||
serialized_payload, | ||
jwt=self.jwt, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not create agent - {e}") | ||
if self.stopping: | ||
break | ||
|
||
# Only update if explicitly notified (not due to timeout) | ||
with self.s._locks["session"]: | ||
if not self.stopping: | ||
self.s.api.update_session() | ||
|
||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Performance Improvement
Optimize Thread Synchronization with Explicit Notifications
The updated run
method now waits for explicit notifications instead of continuous polling, which significantly reduces CPU usage and improves efficiency. This change ensures that the thread only wakes up when necessary, minimizing resource consumption.
Commitable Code Suggestion:
logger.debug(f"{self.__class__.__name__}: started") | |
while not self.stopping: | |
try: | |
# Wait for explicit notification instead of continuous polling | |
with self.s.conditions["changes"]: | |
# Use wait with timeout to allow checking stopping condition | |
self.s.conditions["changes"].wait(timeout=0.5) | |
serialized_payload = safe_serialize(payload).encode("utf-8") | |
try: | |
HttpClient.post( | |
f"{self.config.endpoint}/v2/create_agent", | |
serialized_payload, | |
jwt=self.jwt, | |
) | |
except ApiServerException as e: | |
return logger.error(f"Could not create agent - {e}") | |
if self.stopping: | |
break | |
# Only update if explicitly notified (not due to timeout) | |
with self.s._locks["session"]: | |
if not self.stopping: | |
self.s.api.update_session() | |
except Exception as e: | |
# Wait for explicit notification instead of continuous polling | |
with self.s.conditions["changes"]: | |
# Use wait with timeout to allow checking stopping condition | |
self.s.conditions["changes"].wait(timeout=0.5) | |
if self.stopping: | |
break | |
# Only update if explicitly notified (not due to timeout) | |
with self.s._locks["session"]: | |
if not self.stopping: | |
self.s.api.update_session() |
Abstract
The
Session
module functions as a god object, which complicates working with its varied responsibilities and behaviors. This design has led to significant issues, particularly with threading and synchronization, contributing to problems like the deadlock observed in #477.Mission
To refactor the
session
module by:Visioned design (Work in Progress)
Session Module Components
SessionStruct
: A model class that strictly defines the schema for session packets.SessionApi
: Acts as the service layer facilitating API communication.Session
: Maintains representing a live Session; acts as a manager, with targeted and clearer responsibilitiesThread Management Classes
_SessionThread
: Implementsthreading.Thread
and exposes an interfaceSession
understands.ChangesObserverThread
: Passive listener forSession
model changes; publishes to API when notified.EventPublisherThread
: Polls from aQueue
shared withSession
, aggregates payload in a "batch-and-flush" strategy with three triggers: max size, time threshold or empty queue....
TO BE CONTINUED