Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add mutual exclusion for synchronized stream access in logging handlers and CLPLoglevelTimeout (fixes #55). #59

Merged
merged 12 commits into from
Feb 28, 2025

Conversation

hoophalab
Copy link
Contributor

@hoophalab hoophalab commented Feb 25, 2025

Description

This PR fixes #55 by adding a wrapper class, _LockedStream, around ZstdCompressionWriter or IO[bytes]. It ensures thread safety by wrapping the write and flush methods with a lock.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

All unit tests have passed.

Summary by CodeRabbit

  • Refactor
    • Enhanced the logging system to deliver more reliable output during simultaneous operations, ensuring smoother and more consistent logging performance for users.

Copy link

coderabbitai bot commented Feb 25, 2025

Walkthrough

The changes enhance thread safety in the CLPLogLevelTimeout class by introducing an RLock to manage concurrent access to shared resources. The __init__ method initializes self.lock as an RLock. Modifications to the timeout and update methods ensure that the lock is utilized for thread-safe operations. A new method, get_lock, is added to provide access to the lock. Additionally, the log_fn function in the CLPSockListener._aggregator method is updated to acquire the lock before writing to the output stream, enhancing concurrency handling in the logging system.

Changes

File(s) Change Summary
src/clp_logging/handlers.py - Introduced RLock in CLPLogLevelTimeout and updated __init__ to initialize self.lock.
- Modified timeout and update methods to manage lock acquisition and release.
- Added get_lock method to CLPLogLevelTimeout.
- Updated log_fn in CLPSockListener._aggregator to acquire the lock before logging.

Sequence Diagram(s)

sequenceDiagram
    participant CLPLogLevelTimeout
    participant CLPSockListener
    participant OutputStream

    CLPLogLevelTimeout->>CLPLogLevelTimeout: acquire lock
    CLPLogLevelTimeout->>CLPLogLevelTimeout: perform timeout/update operation
    CLPLogLevelTimeout->>OutputStream: flush/write data
    CLPLogLevelTimeout->>CLPLogLevelTimeout: release lock
Loading

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0e6773 and 4985687.

📒 Files selected for processing (1)
  • src/clp_logging/handlers.py (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/clp_logging/handlers.py

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/clp_logging/handlers.py (1)

247-247: Docstring inconsistencies about the ostream type.
The field self.ostream is now a _LockedStream, but some docstrings in this class still reference ZstdCompressionWriter or IO[bytes]. This discrepancy may confuse future maintainers.

Recommend updating docstrings to reflect the _LockedStream type throughout, ensuring clarity.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a22e821 and 820d069.

📒 Files selected for processing (1)
  • src/clp_logging/handlers.py (5 hunks)
🔇 Additional comments (3)
src/clp_logging/handlers.py (3)

13-13: New concurrency imports appear suitable.
No issues identified; they align with the _LockedStream usage and the Timer usage in CLPLogLevelTimeout.


251-252: Confirm usage of _LockedStream for set_ostream.
Switching set_ostream to expect _LockedStream rather than Union[ZstdCompressionWriter, IO[bytes]] aligns with the concurrency strategy. However, confirm that no external callers still rely on the older signature.

Would you like me to generate a script to find all references to set_ostream in the codebase to ensure they properly pass _LockedStream?


726-727: Good approach for concurrency wrapping.
Instantiating _LockedStream with either ZstdCompressionWriter or a raw file object ensures concurrency and consistency for writes and flushes. This is aligned with the pull request’s goal to support multi-threaded usage safely.

Copy link
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High-level comment. Instead of locking the stream, shall we lock the Timeout handler instead? Reasons:

  • If a logging handler has no Timeout given, there's no need to lock the stream.
  • The failure/crash is caused by a race condition in the main thread and the background Timeout threads. Instead of making the stream locked, I prefer to directly solve the problem of the main thread and Timeout threads not synchronized.

@hoophalab
Copy link
Contributor Author

High-level comment. Instead of locking the stream, shall we lock the Timeout handler instead? Reasons:

  • If a logging handler has no Timeout given, there's no need to lock the stream.
  • The failure/crash is caused by a race condition in the main thread and the background Timeout threads. Instead of making the stream locked, I prefer to directly solve the problem of the main thread and Timeout threads not synchronized.

We need to acquire locks in both threads: if the timeout is flushing, the main thread should wait until the flush is complete; if the main thread is writing, the timeout should wait until the write finishes. Otherwise, we need a scheduler that minimizes blocking the main thread.

@@ -750,7 +762,8 @@ def _write(self, loglevel: int, msg: str) -> None:
clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms)
if self.loglevel_timeout:
self.loglevel_timeout.update(loglevel, self.last_timestamp_ms, self._direct_write)
self.ostream.write(clp_msg)
with self.loglevel_timeout.get_lock() if self.loglevel_timeout else nullcontext():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about creating a member function like this:

    def _mutex_context(self) -> AbstractContextManager[Any]:
        return self.loglevel_timeout.get_lock() if self.loglevel_timeout else nullcontext()

Benefits:

  • Improve readability
  • We can properly document how this context works and why we need it

Notice that you might need to check whether we can narrow down the Any I put in the type parameter above. I just put Any to silence mypy for a proof-of-concept.

After doing this, we should also consider to move self.loglevel_timeout_update into the locked context.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/clp_logging/handlers.py (2)

315-319: Consider improving function naming and documentation.

The utility function is a good abstraction, but it could benefit from better documentation explaining its purpose and behavior.

Consider adding a docstring:

def _get_mutex_context_from_loglevel_timeout(
    loglevel_timeout: Optional[CLPLogLevelTimeout],
) -> AbstractContextManager[bool | None]:
+    """
+    Returns the appropriate mutex context from a LogLevelTimeout object.
+
+    If loglevel_timeout is provided, returns its lock for thread-safe operations.
+    Otherwise, returns a nullcontext that performs no locking.
+
+    :param loglevel_timeout: The timeout object containing a lock, or None.
+    :return: A context manager that can be used in a 'with' statement.
+    """
    return loglevel_timeout.get_lock() if loglevel_timeout else nullcontext()

1-990: Consider implementing LinZhihao-723's suggestion for improved readability.

As mentioned in the past review comments, consider implementing a member function like _mutex_context() in the handler classes to improve readability and provide proper documentation for the context usage.

def _mutex_context(self) -> AbstractContextManager[Any]:
    return self.loglevel_timeout.get_lock() if self.loglevel_timeout else nullcontext()

This would make the code more readable and allow for better documentation of why the context is needed.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 299fcc5 and c96f636.

📒 Files selected for processing (1)
  • src/clp_logging/handlers.py (7 hunks)
🔇 Additional comments (12)
src/clp_logging/handlers.py (12)

8-8: Good addition of required imports.

Added the necessary imports from contextlib for the mutex context implementation.


13-13: Good addition of RLock import.

Added RLock import to support the new thread safety mechanism.


179-182: Clear documentation on thread safety.

The updated documentation clearly explains the thread safety guarantees and how other handlers should interact with the locking mechanism.


234-234: RLock is well-suited for this use case.

Using RLock instead of a regular Lock is appropriate here as it allows for nested acquisitions by the same thread, which may occur when operations call each other.


239-241: Good accessor method implementation.

The get_lock() method provides a clean way to access the lock from outside the class.


248-261: Properly secured critical section with locking.

Critical operations (canceling threads, resetting timeouts, flushing streams) are now protected by the lock, preventing race conditions.


471-474: Properly synchronized stream access in aggregator.

Good use of the mutex context to protect both the update and write operations in the aggregator method.


478-485: Protected critical closing operations.

Added locking around the final write and close operations, which is important to prevent race conditions when the stream is being closed.


762-763: Thread-safe direct write implementation.

Correctly uses the mutex context to protect direct writes to the output stream.


771-774: Properly synchronized write operations.

Good implementation of thread safety for both timeout updates and stream writes in the main write method.


799-802: Protected stream closing operations.

Correctly synchronized the EOF write and stream closing operations.


1-990: Consider locking the close method in the timeout class.

While you've correctly implemented locking for the critical sections, consider also adding locking to the close method of any streams that might be used by the CLPLogLevelTimeout to ensure complete thread safety.

This was mentioned in a previous review comment and remains a valid concern to address for complete thread safety.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/clp_logging/handlers.py (1)

315-327: Well-designed helper function for lock management.

This helper function provides a convenient way to handle locks or fallback to nullcontext() when no lock is available. The function is well-documented and follows good practices for context management.

However, the return type annotation could be improved for consistency:

- ) -> AbstractContextManager[bool | None]:
+ ) -> AbstractContextManager[Optional[bool]]:
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c96f636 and edf90b1.

📒 Files selected for processing (1)
  • src/clp_logging/handlers.py (7 hunks)
🔇 Additional comments (12)
src/clp_logging/handlers.py (12)

8-8: Appropriate imports added for context management.

The addition of AbstractContextManager and nullcontext from the contextlib module is appropriate for the thread-safety enhancements being implemented.


13-13: RLock import added for thread safety implementation.

Adding the RLock import from threading is necessary for implementing thread-safe operations in the handler classes.


178-183: Well-documented thread safety behavior.

Good documentation that clearly explains the thread safety guarantees provided by the class and how other components should interact with it.


234-234: Proper initialization of RLock for thread-safety.

Using RLock instead of a regular Lock is a good choice as it allows the same thread to acquire the lock multiple times without deadlocking.


239-241: Good accessor method for exposing the lock.

The get_lock method provides a clean interface for other components to access and use the lock, supporting the thread-safety mechanism.


248-261: Comprehensive lock coverage for timeout operations.

The timeout method now properly acquires the lock before performing operations that could be affected by concurrent access, including thread cancellation, timestamp resets, and stream flushing.


479-482: Thread-safe update and write operations in the aggregator.

Good use of the lock context to ensure that both updating the timeout and writing to the output stream are done atomically, preventing race conditions.


486-492: Proper stream finalization with lock protection.

The EOF character writing and compression handling are now properly protected by the lock, ensuring thread-safe finalization of the stream.


770-771: Thread-safe direct write operations.

The _direct_write method now acquires the lock before writing to the output stream, preventing potential data corruption from concurrent access.


779-782: Thread-safe write and update operations.

The _write method now properly locks both the update to the timeout object and the write to the output stream, ensuring thread safety.


807-809: Thread-safe close operations.

The close method now acquires the lock before writing the EOF character and closing the output stream, preventing race conditions during handler shutdown.


315-327:

❓ Verification inconclusive

Consider additional thread safety for self.loglevel_timeout_update

As suggested in a previous review comment, consider moving self.loglevel_timeout_update into the locked context to ensure complete thread safety.


🏁 Script executed:

#!/bin/bash
# Search for loglevel_timeout_update usage to verify if it needs thread safety
rg -n "loglevel_timeout_update" src/clp_logging/handlers.py

Length of output: 59


Thread safety for self.loglevel_timeout_update: Manual Verification Required

Based on the search within src/clp_logging/handlers.py, no occurrences of self.loglevel_timeout_update were found. This raises one of two possibilities:

  • Either the attribute isn’t used in this file (and perhaps is managed elsewhere in the codebase), or
  • The reference is outdated or misnamed in this context.

Action items:

  • Please verify whether self.loglevel_timeout_update is actively used elsewhere in the module or repository where its access might occur concurrently.
  • If it is indeed used in a concurrent context, consider moving its manipulation/access inside a locked context as originally suggested.
  • If it’s not in use or thread safety is already ensured at another level, you may resolve this review comment accordingly.

Co-authored-by: Lin Zhihao <[email protected]>
LinZhihao-723
LinZhihao-723 previously approved these changes Feb 27, 2025
Copy link
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the PR title, how about:

feat: Add mutual exclusion for synchronized stream access in logging handlers and `CLPLoglevelTimeout` (fixes #55).

@hoophalab hoophalab changed the title feat: Add a lock to handle write and flush methods to output streams in handlers (fixes #55). feat: Add mutual exclusion for synchronized stream access in logging handlers and CLPLoglevelTimeout (fixes #55). Feb 27, 2025
@hoophalab hoophalab requested a review from a team as a code owner February 27, 2025 22:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/clp_logging/handlers.py (3)

8-8: Fix unused import.

The static analysis tool indicates that AbstractContextManager is imported but unused in line 8, although it appears in the type annotations in the docstring for _get_mutex_context_from_loglevel_timeout. Since Python 3.7+, string literal type annotations don't need the import to be available at runtime.

-from contextlib import AbstractContextManager, nullcontext
+from contextlib import nullcontext

If you're supporting Python 3.9+, you could use from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union, ContextManager and then use ContextManager instead of AbstractContextManager.

🧰 Tools
🪛 Ruff (0.8.2)

8-8: contextlib.AbstractContextManager imported but unused

Remove unused import: contextlib.AbstractContextManager

(F401)


315-328: Consider specializing the return type annotation.

The helper function returns Any for backward compatibility, but this is less type-safe. Since the PR adds thread safety, it's worth improving the type annotations as well.

-def _get_mutex_context_from_loglevel_timeout(loglevel_timeout: Optional[CLPLogLevelTimeout]) -> Any:
+def _get_mutex_context_from_loglevel_timeout(
+    loglevel_timeout: Optional[CLPLogLevelTimeout]
+) -> Union[RLock, '_nullcontext[None]']:

Note: If you're targeting Python 3.9+, you could use ContextManager[Optional[bool]] as suggested in past review comments.


315-328: Consider creating explicit mutex_context member functions in handlers.

As suggested in past review comments, it might be more maintainable to add a member function like _mutex_context() to relevant handler classes. This would improve code readability and allow for proper documentation of the locking behavior.

For example:

def _mutex_context(self) -> AbstractContextManager[Any]:
    """
    Gets a mutual exclusive context manager for IR stream access.
    
    :return: A context manager that ensures thread-safe operations on the stream.
    """
    return _get_mutex_context_from_loglevel_timeout(self.loglevel_timeout)

Then you could use with self._mutex_context(): instead of with _get_mutex_context_from_loglevel_timeout(self.loglevel_timeout): for better readability.

Also applies to: 480-493, 771-783, 808-810

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between be03c72 and f0e6773.

📒 Files selected for processing (1)
  • src/clp_logging/handlers.py (7 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
src/clp_logging/handlers.py

8-8: contextlib.AbstractContextManager imported but unused

Remove unused import: contextlib.AbstractContextManager

(F401)

🔇 Additional comments (10)
src/clp_logging/handlers.py (10)

179-183: Good docstring update explaining thread safety.

The docstring update clearly explains the thread safety guarantees provided by the CLPLogLevelTimeout class. This helps users understand how to properly use the class in multi-threaded environments.


234-234: LGTM: RLock initialization.

Using RLock instead of a standard Lock is a good choice since it allows for re-entrant locking, which is important if the same thread might need to acquire the lock multiple times (e.g., from nested method calls).


239-241: Good addition of a lock accessor method.

The get_lock method provides a clean way to access the lock object, maintaining encapsulation while allowing external code to synchronize with the timeout operations.


248-261: Proper synchronization of timeout operations.

The lock is correctly acquired before performing timeout operations. This prevents race conditions when multiple threads might be trying to cancel timers, reset values, and flush the output stream.


480-483: LGTM: Thread-safe update and write operations.

The implementation correctly uses the mutex context manager to ensure that the update method and subsequent write operation are executed atomically. This prevents race conditions between the main thread and timeout threads.


487-493: LGTM: Thread-safe closing operations.

The implementation correctly uses the mutex context for writing the EOF character and closing the stream, ensuring that these operations are not interrupted by concurrent operations.


771-772: LGTM: Thread-safe direct write.

Correctly using the mutex context in the _direct_write method ensures that writes to the stream are atomic operations.


780-783: LGTM: Thread-safe write with timeout update.

The implementation correctly ensures that both the timeout update and the write operation are executed atomically, preventing race conditions.


808-810: LGTM: Thread-safe close operations.

The implementation correctly ensures that writing the EOF character and closing the output stream are atomic operations, preventing potential corruption during shutdown.


929-930:

❓ Verification inconclusive

Suggestion for future improvement.

I noticed an existing TODO comment about closing behavior. Since you're making thread safety improvements, it might be worth addressing this as well to ensure proper cleanup in multi-threaded environments.


🌐 Web query:

What is the recommended way to implement a custom close method for a stream wrapper that doesn't close the underlying stream in Python?

💡 Result:

To implement a custom stream wrapper in Python that doesn’t automatically close its underlying stream when closed:

Key Implementation Strategy

1. Override close() Method
Create a subclass of your chosen base I/O class (io.TextIOWrapper, io.BufferedIOBase, etc.) and override its close() method:

import io

class NonClosingTextIOWrapper(io.TextIOWrapper):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    
    def close(self):
        self.flush()  # Ensure data is flushed
        self.closed = True  # Mark as closed without propagating closure downstream

2. Avoid Cascading Closure
By skipping calls to super().close(), you prevent automatic closure of nested streams while maintaining proper state management[1][4][10].

Advanced Configuration

Add optional parameters for flexible control:

class ConfigurableCloseWrapper(io.TextIOWrapper):
    def __init__(self, *args, **kwargs):
        self.close_underlying = kwargs.pop('close_underlying', False)
        super().__init__(*args, **kwargs)

    def close(self):
        self.flush()
        if self.close_underlying:
            super().close()

Usage Example

with open('data.bin', 'rb') as f:
    buffered = io.BufferedReader(f)
    wrapper = NonClosingTextIOWrapper(buffered)
    
# Underlying 'buffered' and 'f' remain open here even after wrapper.close()

Critical Considerations

  • Flush Before Closure: Always flush buffers explicitly (self.flush())[5][10].
  • Resource Management: Ensure manual handling of nested streams if they require closure elsewhere[4].

This approach ensures compliance with Python’s I/O protocols while decoupling closure behavior from wrapped streams[10].

Citations:


Enhance Custom Stream Wrapper Closing Behaviour

While the current changes correctly handle thread safety improvements, I suggest revisiting the custom close method for the stream wrapper. According to Python best practices, consider overriding the close() method so it merely flushes and marks the wrapper as closed without cascading the close to the underlying stream. This approach—potentially augmented with an optional flag (e.g. close_underlying) to selectively propagate closure when necessary—will ensure proper resource cleanup while maintaining flexibility in multi-threaded environments.

  • Location: src/clp_logging/handlers.py (lines 929-930)
  • Recommendation:
    • Override close() to flush the stream and update its closed state without calling the underlying stream’s close.
    • Optionally introduce a flag for cascading closure, as outlined in the best practices, to better manage resource cleanup.

Co-authored-by: Lin Zhihao <[email protected]>
@LinZhihao-723 LinZhihao-723 merged commit 8eab490 into y-scope:main Feb 28, 2025
9 checks passed
@hoophalab hoophalab deleted the race-condition branch March 5, 2025 23:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants