diff --git a/ddtrace/profiling/collector/threading.py b/ddtrace/profiling/collector/threading.py index c49ccc5dd1a..2f136870d23 100644 --- a/ddtrace/profiling/collector/threading.py +++ b/ddtrace/profiling/collector/threading.py @@ -14,6 +14,10 @@ class _ProfiledThreadingLock(_lock._ProfiledLock): pass +class _ProfiledThreadingRLock(_lock._ProfiledLock): + pass + + class ThreadingLockCollector(_lock.LockCollector): """Record threading.Lock usage.""" @@ -29,6 +33,21 @@ def _set_patch_target( threading.Lock = value +class ThreadingRLockCollector(_lock.LockCollector): + """Record threading.RLock usage.""" + + PROFILED_LOCK_CLASS = _ProfiledThreadingRLock + + def _get_patch_target(self) -> typing.Type[threading.RLock]: + return threading.RLock + + def _set_patch_target( + self, + value: typing.Any, + ) -> None: + threading.RLock = value + + # Also patch threading.Thread so echion can track thread lifetimes def init_stack_v2() -> None: if config.stack.v2_enabled and stack_v2.is_available: diff --git a/ddtrace/profiling/profiler.py b/ddtrace/profiling/profiler.py index 57c72b52697..945718fec9b 100644 --- a/ddtrace/profiling/profiler.py +++ b/ddtrace/profiling/profiler.py @@ -29,6 +29,8 @@ from ddtrace.settings.profiling import config_str +# TODO(vlad): add type annotations + LOG = logging.getLogger(__name__) @@ -223,6 +225,7 @@ def start_collector(collector_class: Type) -> None: self._collectors_on_import = [ ("threading", lambda _: start_collector(threading.ThreadingLockCollector)), + ("threading", lambda _: start_collector(threading.ThreadingRLockCollector)), ("asyncio", lambda _: start_collector(asyncio.AsyncioLockCollector)), ] diff --git a/releasenotes/notes/feat-profiling-rlock-support-e8607e68252647.yaml b/releasenotes/notes/feat-profiling-rlock-support-e8607e68252647.yaml new file mode 100644 index 00000000000..a6d685bf95b --- /dev/null +++ b/releasenotes/notes/feat-profiling-rlock-support-e8607e68252647.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + profiling: Add support for ``threading.RLock`` (reentrant lock) profiling. The Lock profiler now tracks both ``threading.Lock`` and ``threading.RLock`` usage, providing comprehensive lock contention visibility for Python applications. diff --git a/tests/profiling/collector/global_locks.py b/tests/profiling/collector/global_locks.py deleted file mode 100644 index ee7510514dd..00000000000 --- a/tests/profiling/collector/global_locks.py +++ /dev/null @@ -1,25 +0,0 @@ -import threading - -from tests.profiling.collector.lock_utils import init_linenos - - -init_linenos(__file__) -global_lock = threading.Lock() # !CREATE! global_lock - - -def foo(): - global global_lock - with global_lock: # !ACQUIRE! !RELEASE! global_lock - pass - - -class Bar: - def __init__(self): - self.bar_lock = threading.Lock() # !CREATE! bar_lock - - def bar(self): - with self.bar_lock: # !ACQUIRE! !RELEASE! bar_lock - pass - - -bar_instance = Bar() diff --git a/tests/profiling_v2/collector/test_threading.py b/tests/profiling_v2/collector/test_threading.py index 8ace8c80ecd..672a936879d 100644 --- a/tests/profiling_v2/collector/test_threading.py +++ b/tests/profiling_v2/collector/test_threading.py @@ -1,7 +1,10 @@ import glob import os -import sys import threading +from typing import Any +from typing import Optional +from typing import Type +from typing import Union import uuid import mock @@ -9,13 +12,30 @@ from ddtrace import ext from ddtrace.internal.datadog.profiling import ddup -from ddtrace.profiling.collector import threading as collector_threading +from ddtrace.profiling.collector.threading import ThreadingLockCollector +from ddtrace.profiling.collector.threading import ThreadingRLockCollector from tests.profiling.collector import pprof_utils from tests.profiling.collector import test_collector from tests.profiling.collector.lock_utils import get_lock_linenos from tests.profiling.collector.lock_utils import init_linenos +# Type aliases for supported classes +LockClass = Union[Type[threading.Lock], Type[threading.RLock]] +CollectorClass = Union[Type[ThreadingLockCollector], Type[ThreadingRLockCollector]] + +# Module-level globals for testing global lock profiling +_test_global_lock: Optional[Any] = None +_test_global_bar_instance: Optional[Any] = None + +TESTING_GEVENT: Union[str, bool] = os.getenv("DD_PROFILE_TEST_GEVENT", False) + + +# Module-level globals for testing global lock profiling +_test_global_lock = None +_test_global_bar_instance = None + + TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False) init_linenos(__file__) @@ -23,8 +43,8 @@ # Helper classes for testing lock collector class Foo: - def __init__(self): - self.foo_lock = threading.Lock() # !CREATE! foolock + def __init__(self, lock_class: Any): + self.foo_lock = lock_class() # !CREATE! foolock def foo(self): with self.foo_lock: # !RELEASE! !ACQUIRE! foolock @@ -32,77 +52,57 @@ def foo(self): class Bar: - def __init__(self): - self.foo = Foo() + def __init__(self, lock_class: Any): + self.foo = Foo(lock_class) def bar(self): self.foo.foo() -def test_repr(): - test_collector._test_repr( - collector_threading.ThreadingLockCollector, - "ThreadingLockCollector(status=, " - "capture_pct=1.0, nframes=64, " - "endpoint_collection_enabled=True, tracer=None)", - ) - - -def test_patch(): - lock = threading.Lock - collector = collector_threading.ThreadingLockCollector() +@pytest.mark.parametrize( + "collector_class,expected_repr", + [ + ( + ThreadingLockCollector, + "ThreadingLockCollector(status=, " + "capture_pct=1.0, nframes=64, " + "endpoint_collection_enabled=True, tracer=None)", + ), + ( + ThreadingRLockCollector, + "ThreadingRLockCollector(status=, " + "capture_pct=1.0, nframes=64, " + "endpoint_collection_enabled=True, tracer=None)", + ), + ], +) +def test_repr( + collector_class: CollectorClass, + expected_repr: str, +) -> None: + test_collector._test_repr(collector_class, expected_repr) + + +@pytest.mark.parametrize( + "lock_class,collector_class", + [ + (threading.Lock, ThreadingLockCollector), + (threading.RLock, ThreadingRLockCollector), + ], +) +def test_patch( + lock_class: LockClass, + collector_class: CollectorClass, +) -> None: + lock = lock_class + collector = collector_class() collector.start() assert lock == collector._original # wrapt makes this true - assert lock == threading.Lock + assert lock == lock_class collector.stop() - assert lock == threading.Lock - assert collector._original == threading.Lock - - -@pytest.mark.skipif(not sys.platform.startswith("linux"), reason="only works on linux") -@pytest.mark.subprocess(err=None) -# For macOS: Could print 'Error uploading' but okay to ignore since we are checking if native_id is set -def test_user_threads_have_native_id(): - from os import getpid - from threading import Thread - from threading import _MainThread - from threading import current_thread - from time import sleep - - from ddtrace.profiling import profiler - - # DEV: We used to run this test with ddtrace_run=True passed into the - # subprocess decorator, but that caused this to be flaky for Python 3.8.x - # with gevent. When it failed for that specific venv, current_thread() - # returned a DummyThread instead of a _MainThread. - p = profiler.Profiler() - p.start() - - main = current_thread() - assert isinstance(main, _MainThread) - # We expect the current thread to have the same ID as the PID - assert main.native_id == getpid(), (main.native_id, getpid()) - - t = Thread(target=lambda: None) - t.start() - - for _ in range(10): - try: - # The TID should be higher than the PID, but not too high - assert 0 < t.native_id - getpid() < 100, (t.native_id, getpid()) - except AttributeError: - # The native_id attribute is set by the thread so we might have to - # wait a bit for it to be set. - sleep(0.1) - else: - break - else: - raise AssertionError("Thread.native_id not set") - - t.join() - - p.stop() + assert lock == lock_class + assert collector._original == lock_class @pytest.mark.subprocess( @@ -114,7 +114,7 @@ def test_wrapt_disable_extensions(): from ddtrace.internal.datadog.profiling import ddup from ddtrace.profiling.collector import _lock - from ddtrace.profiling.collector import threading as collector_threading + from ddtrace.profiling.collector.threading import ThreadingLockCollector from tests.profiling.collector import pprof_utils from tests.profiling.collector.lock_utils import get_lock_linenos from tests.profiling.collector.lock_utils import init_linenos @@ -136,7 +136,7 @@ def test_wrapt_disable_extensions(): assert os.environ.get("WRAPT_DISABLE_EXTENSIONS") assert _lock.WRAPT_C_EXT is False - with collector_threading.ThreadingLockCollector(capture_pct=100): + with ThreadingLockCollector(capture_pct=100): th_lock = threading.Lock() # !CREATE! test_wrapt_disable_extensions with th_lock: # !ACQUIRE! !RELEASE! test_wrapt_disable_extensions pass @@ -175,7 +175,7 @@ def test_wrapt_disable_extensions(): @pytest.mark.subprocess( env=dict(DD_PROFILING_FILE_PATH=__file__), ) -def test_lock_gevent_tasks(): +def test_lock_gevent_tasks() -> None: from gevent import monkey monkey.patch_all() @@ -185,7 +185,7 @@ def test_lock_gevent_tasks(): import threading from ddtrace.internal.datadog.profiling import ddup - from ddtrace.profiling.collector import threading as collector_threading + from ddtrace.profiling.collector.threading import ThreadingLockCollector from tests.profiling.collector import pprof_utils from tests.profiling.collector.lock_utils import get_lock_linenos from tests.profiling.collector.lock_utils import init_linenos @@ -201,60 +201,160 @@ def test_lock_gevent_tasks(): init_linenos(os.environ["DD_PROFILING_FILE_PATH"]) - def play_with_lock(): + def play_with_lock() -> None: lock = threading.Lock() # !CREATE! test_lock_gevent_tasks lock.acquire() # !ACQUIRE! test_lock_gevent_tasks lock.release() # !RELEASE! test_lock_gevent_tasks - with collector_threading.ThreadingLockCollector(capture_pct=100): + def validate_and_cleanup(): + ddup.upload() + + expected_filename = "test_threading.py" + linenos = get_lock_linenos(test_name) + + profile = pprof_utils.parse_newest_profile(output_filename) + pprof_utils.assert_lock_events( + profile, + expected_acquire_events=[ + pprof_utils.LockAcquireEvent( + caller_name="play_with_lock", + filename=expected_filename, + linenos=linenos, + lock_name="lock", + # TODO: With stack_v2, the way we trace gevent greenlets has + # changed, and we'd need to expose an API to get the task_id, + # task_name, and task_frame. + # task_id=t.ident, + # task_name="foobar", + ), + ], + expected_release_events=[ + pprof_utils.LockReleaseEvent( + caller_name="play_with_lock", + filename=expected_filename, + linenos=linenos, + lock_name="lock", + # TODO: With stack_v2, the way we trace gevent greenlets has + # changed, and we'd need to expose an API to get the task_id, + # task_name, and task_frame. + # task_id=t.ident, + # task_name="foobar", + ), + ], + ) + + for f in glob.glob(pprof_prefix + ".*"): + try: + os.remove(f) + except Exception as e: + print("Error removing file: {}".format(e)) + + with ThreadingLockCollector(capture_pct=100): t = threading.Thread(name="foobar", target=play_with_lock) t.start() t.join() - ddup.upload() + validate_and_cleanup() - expected_filename = "test_threading.py" - linenos = get_lock_linenos(test_name) - profile = pprof_utils.parse_newest_profile(output_filename) - pprof_utils.assert_lock_events( - profile, - expected_acquire_events=[ - pprof_utils.LockAcquireEvent( - caller_name="play_with_lock", - filename=expected_filename, - linenos=linenos, - lock_name="lock", - # TODO: With stack_v2, the way we trace gevent greenlets has - # changed, and we'd need to expose an API to get the task_id, - # task_name, and task_frame. - # task_id=t.ident, - # task_name="foobar", - ), - ], - expected_release_events=[ - pprof_utils.LockReleaseEvent( - caller_name="play_with_lock", - filename=expected_filename, - linenos=linenos, - lock_name="lock", - # TODO: With stack_v2, the way we trace gevent greenlets has - # changed, and we'd need to expose an API to get the task_id, - # task_name, and task_frame. - # task_id=t.ident, - # task_name="foobar", - ), - ], - ) +# This test has to be run in a subprocess because it calls gevent.monkey.patch_all() +# which affects the whole process. +@pytest.mark.skipif(not TESTING_GEVENT, reason="gevent is not available") +@pytest.mark.subprocess( + env=dict(DD_PROFILING_FILE_PATH=__file__), +) +def test_rlock_gevent_tasks() -> None: + from gevent import monkey + + monkey.patch_all() + + import glob + import os + import threading + + from ddtrace.internal.datadog.profiling import ddup + from ddtrace.profiling.collector.threading import ThreadingRLockCollector + from tests.profiling.collector import pprof_utils + from tests.profiling.collector.lock_utils import get_lock_linenos + from tests.profiling.collector.lock_utils import init_linenos - for f in glob.glob(pprof_prefix + ".*"): - try: - os.remove(f) - except Exception as e: - print("Error removing file: {}".format(e)) + assert ddup.is_available, "ddup is not available" + # Set up the ddup exporter + test_name = "test_rlock_gevent_tasks" + pprof_prefix = "/tmp" + os.sep + test_name + output_filename = pprof_prefix + "." + str(os.getpid()) + ddup.config(env="test", service=test_name, version="my_version", output_filename=pprof_prefix) + ddup.start() + + init_linenos(os.environ["DD_PROFILING_FILE_PATH"]) + + def play_with_lock() -> None: + lock = threading.RLock() # !CREATE! test_rlock_gevent_tasks + lock.acquire() # !ACQUIRE! test_rlock_gevent_tasks + lock.release() # !RELEASE! test_rlock_gevent_tasks + + def validate_and_cleanup(): + ddup.upload() + + expected_filename = "test_threading.py" + linenos = get_lock_linenos(test_name) + + profile = pprof_utils.parse_newest_profile(output_filename) + pprof_utils.assert_lock_events( + profile, + expected_acquire_events=[ + pprof_utils.LockAcquireEvent( + caller_name="play_with_lock", + filename=expected_filename, + linenos=linenos, + lock_name="lock", + # TODO: With stack_v2, the way we trace gevent greenlets has + # changed, and we'd need to expose an API to get the task_id, + # task_name, and task_frame. + # task_id=t.ident, + # task_name="foobar", + ), + ], + expected_release_events=[ + pprof_utils.LockReleaseEvent( + caller_name="play_with_lock", + filename=expected_filename, + linenos=linenos, + lock_name="lock", + # TODO: With stack_v2, the way we trace gevent greenlets has + # changed, and we'd need to expose an API to get the task_id, + # task_name, and task_frame. + # task_id=t.ident, + # task_name="foobar", + ), + ], + ) + + for f in glob.glob(pprof_prefix + ".*"): + try: + os.remove(f) + except Exception as e: + print("Error removing file: {}".format(e)) + + with ThreadingRLockCollector(capture_pct=100): + t = threading.Thread(name="foobar", target=play_with_lock) + t.start() + t.join() + + validate_and_cleanup() + + +class BaseThreadingLockCollectorTest: + # These should be implemented by child classes + @property + def collector_class(self): + raise NotImplementedError("Child classes must implement collector_class") + + @property + def lock_class(self): + raise NotImplementedError("Child classes must implement lock_class") -class TestThreadingLockCollector: # setup_method and teardown_method which will be called before and after # each test method, respectively, part of pytest api. def setup_method(self, method): @@ -279,33 +379,30 @@ def teardown_method(self, method): os.remove(f) except Exception as e: print("Error removing file: {}".format(e)) - pass def test_wrapper(self): - collector = collector_threading.ThreadingLockCollector() + collector = self.collector_class() with collector: class Foobar(object): - lock_class = threading.Lock - - def __init__(self): - lock = self.lock_class() + def __init__(self, lock_class): + lock = lock_class() assert lock.acquire() lock.release() - lock = Foobar.lock_class() + lock = self.lock_class() assert lock.acquire() lock.release() # Try this way too - Foobar() + Foobar(self.lock_class) # Tests def test_lock_events(self): # The first argument is the recorder.Recorder which is used for the # v1 exporter. We don't need it for the v2 exporter. - with collector_threading.ThreadingLockCollector(capture_pct=100): - lock = threading.Lock() # !CREATE! test_lock_events + with self.collector_class(capture_pct=100): + lock = self.lock_class() # !CREATE! test_lock_events lock.acquire() # !ACQUIRE! test_lock_events lock.release() # !RELEASE! test_lock_events # Calling upload will trigger the exporter to write to a file @@ -334,11 +431,12 @@ def test_lock_events(self): ) def test_lock_acquire_events_class(self): - with collector_threading.ThreadingLockCollector(capture_pct=100): + with self.collector_class(capture_pct=100): + lock_class = self.lock_class # Capture for inner class class Foobar(object): def lockfunc(self): - lock = threading.Lock() # !CREATE! test_lock_acquire_events_class + lock = lock_class() # !CREATE! test_lock_acquire_events_class lock.acquire() # !ACQUIRE! test_lock_acquire_events_class Foobar().lockfunc() @@ -364,14 +462,14 @@ def test_lock_events_tracer(self, tracer): tracer._endpoint_call_counter_span_processor.enable() resource = str(uuid.uuid4()) span_type = ext.SpanTypes.WEB - with collector_threading.ThreadingLockCollector( + with self.collector_class( tracer=tracer, capture_pct=100, ): - lock1 = threading.Lock() # !CREATE! test_lock_events_tracer_1 + lock1 = self.lock_class() # !CREATE! test_lock_events_tracer_1 lock1.acquire() # !ACQUIRE! test_lock_events_tracer_1 with tracer.trace("test", resource=resource, span_type=span_type) as t: - lock2 = threading.Lock() # !CREATE! test_lock_events_tracer_2 + lock2 = self.lock_class() # !CREATE! test_lock_events_tracer_2 lock2.acquire() # !ACQUIRE! test_lock_events_tracer_2 lock1.release() # !RELEASE! test_lock_events_tracer_1 span_id = t.span_id @@ -425,12 +523,12 @@ def test_lock_events_tracer_non_web(self, tracer): tracer._endpoint_call_counter_span_processor.enable() resource = str(uuid.uuid4()) span_type = ext.SpanTypes.SQL - with collector_threading.ThreadingLockCollector( + with self.collector_class( tracer=tracer, capture_pct=100, ): with tracer.trace("test", resource=resource, span_type=span_type) as t: - lock2 = threading.Lock() # !CREATE! test_lock_events_tracer_non_web + lock2 = self.lock_class() # !CREATE! test_lock_events_tracer_non_web lock2.acquire() # !ACQUIRE! test_lock_events_tracer_non_web span_id = t.span_id @@ -467,14 +565,14 @@ def test_lock_events_tracer_late_finish(self, tracer): tracer._endpoint_call_counter_span_processor.enable() resource = str(uuid.uuid4()) span_type = ext.SpanTypes.WEB - with collector_threading.ThreadingLockCollector( + with self.collector_class( tracer=tracer, capture_pct=100, ): - lock1 = threading.Lock() # !CREATE! test_lock_events_tracer_late_finish_1 + lock1 = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_1 lock1.acquire() # !ACQUIRE! test_lock_events_tracer_late_finish_1 span = tracer.start_span("test", span_type=span_type) - lock2 = threading.Lock() # !CREATE! test_lock_events_tracer_late_finish_2 + lock2 = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_2 lock2.acquire() # !ACQUIRE! test_lock_events_tracer_late_finish_2 lock1.release() # !RELEASE! test_lock_events_tracer_late_finish_1 lock2.release() # !RELEASE! test_lock_events_tracer_late_finish_2 @@ -522,15 +620,15 @@ def test_resource_not_collected(self, tracer): tracer._endpoint_call_counter_span_processor.enable() resource = str(uuid.uuid4()) span_type = ext.SpanTypes.WEB - with collector_threading.ThreadingLockCollector( + with self.collector_class( tracer=tracer, capture_pct=100, endpoint_collection_enabled=False, ): - lock1 = threading.Lock() # !CREATE! test_resource_not_collected_1 + lock1 = self.lock_class() # !CREATE! test_resource_not_collected_1 lock1.acquire() # !ACQUIRE! test_resource_not_collected_1 with tracer.trace("test", resource=resource, span_type=span_type) as t: - lock2 = threading.Lock() # !CREATE! test_resource_not_collected_2 + lock2 = self.lock_class() # !CREATE! test_resource_not_collected_2 lock2.acquire() # !ACQUIRE! test_resource_not_collected_2 lock1.release() # !RELEASE! test_resource_not_collected_1 span_id = t.span_id @@ -580,8 +678,8 @@ def test_resource_not_collected(self, tracer): ) def test_lock_enter_exit_events(self): - with collector_threading.ThreadingLockCollector(capture_pct=100): - th_lock = threading.Lock() # !CREATE! test_lock_enter_exit_events + with self.collector_class(capture_pct=100): + th_lock = self.lock_class() # !CREATE! test_lock_enter_exit_events with th_lock: # !ACQUIRE! !RELEASE! test_lock_enter_exit_events pass @@ -619,10 +717,10 @@ def test_class_member_lock(self, inspect_dir_enabled): with mock.patch("ddtrace.settings.profiling.config.lock.name_inspect_dir", inspect_dir_enabled): expected_lock_name = "foo_lock" if inspect_dir_enabled else None - with collector_threading.ThreadingLockCollector(capture_pct=100): - foobar = Foo() + with self.collector_class(capture_pct=100): + foobar = Foo(self.lock_class) foobar.foo() - bar = Bar() + bar = Bar(self.lock_class) bar.bar() ddup.upload() @@ -656,15 +754,15 @@ def test_class_member_lock(self, inspect_dir_enabled): def test_private_lock(self): class Foo: - def __init__(self): - self.__lock = threading.Lock() # !CREATE! test_private_lock + def __init__(self, lock_class: Any): + self.__lock = lock_class() # !CREATE! test_private_lock def foo(self): with self.__lock: # !RELEASE! !ACQUIRE! test_private_lock pass - with collector_threading.ThreadingLockCollector(capture_pct=100): - foo = Foo() + with self.collector_class(capture_pct=100): + foo = Foo(self.lock_class) foo.foo() ddup.upload() @@ -695,15 +793,15 @@ def foo(self): def test_inner_lock(self): class Bar: - def __init__(self): - self.foo = Foo() + def __init__(self, lock_class: Any): + self.foo = Foo(lock_class) def bar(self): with self.foo.foo_lock: # !RELEASE! !ACQUIRE! test_inner_lock pass - with collector_threading.ThreadingLockCollector(capture_pct=100): - bar = Bar() + with self.collector_class(capture_pct=100): + bar = Bar(self.lock_class) bar.bar() ddup.upload() @@ -734,8 +832,8 @@ def bar(self): ) def test_anonymous_lock(self): - with collector_threading.ThreadingLockCollector(capture_pct=100): - with threading.Lock(): # !CREATE! !ACQUIRE! !RELEASE! test_anonymous_lock + with self.collector_class(capture_pct=100): + with self.lock_class(): # !CREATE! !ACQUIRE! !RELEASE! test_anonymous_lock pass ddup.upload() @@ -760,17 +858,40 @@ def test_anonymous_lock(self): ], ) - def test_global_locks(self): - with collector_threading.ThreadingLockCollector(capture_pct=100): - from tests.profiling.collector import global_locks + def test_global_locks(self) -> None: + global _test_global_lock, _test_global_bar_instance + + with self.collector_class(capture_pct=100): + # Create true module-level globals + _test_global_lock = self.lock_class() # !CREATE! _test_global_lock + + class TestBar: + def __init__(self, lock_class: LockClass) -> None: + self.bar_lock = lock_class() # !CREATE! bar_lock + + def bar(self): + with self.bar_lock: # !ACQUIRE! !RELEASE! bar_lock + pass - global_locks.foo() - global_locks.bar_instance.bar() + def foo(): + global _test_global_lock + assert _test_global_lock is not None + with _test_global_lock: # !ACQUIRE! !RELEASE! _test_global_lock + pass + + _test_global_bar_instance = TestBar(self.lock_class) + + # Use the locks + foo() + _test_global_bar_instance.bar() ddup.upload() + # Process this file to get the correct line numbers for our !CREATE! comments + init_linenos(__file__) + profile = pprof_utils.parse_newest_profile(self.output_filename) - linenos_foo = get_lock_linenos("global_lock", with_stmt=True) + linenos_global = get_lock_linenos("_test_global_lock", with_stmt=True) linenos_bar = get_lock_linenos("bar_lock", with_stmt=True) pprof_utils.assert_lock_events( @@ -778,13 +899,13 @@ def test_global_locks(self): expected_acquire_events=[ pprof_utils.LockAcquireEvent( caller_name="foo", - filename=os.path.basename(global_locks.__file__), - linenos=linenos_foo, - lock_name="global_lock", + filename=os.path.basename(__file__), + linenos=linenos_global, + lock_name="_test_global_lock", ), pprof_utils.LockAcquireEvent( caller_name="bar", - filename=os.path.basename(global_locks.__file__), + filename=os.path.basename(__file__), linenos=linenos_bar, lock_name="bar_lock", ), @@ -792,13 +913,13 @@ def test_global_locks(self): expected_release_events=[ pprof_utils.LockReleaseEvent( caller_name="foo", - filename=os.path.basename(global_locks.__file__), - linenos=linenos_foo, - lock_name="global_lock", + filename=os.path.basename(__file__), + linenos=linenos_global, + lock_name="_test_global_lock", ), pprof_utils.LockReleaseEvent( caller_name="bar", - filename=os.path.basename(global_locks.__file__), + filename=os.path.basename(__file__), linenos=linenos_bar, lock_name="bar_lock", ), @@ -809,8 +930,8 @@ def test_upload_resets_profile(self): # This test checks that the profile is cleared after each upload() call # It is added in test_threading.py as LockCollector can easily be # configured to be deterministic with capture_pct=100. - with collector_threading.ThreadingLockCollector(capture_pct=100): - with threading.Lock(): # !CREATE! !ACQUIRE! !RELEASE! test_upload_resets_profile + with self.collector_class(capture_pct=100): + with self.lock_class(): # !CREATE! !ACQUIRE! !RELEASE! test_upload_resets_profile pass ddup.upload() @@ -841,3 +962,27 @@ def test_upload_resets_profile(self): # have any samples with pytest.raises(AssertionError): pprof_utils.parse_newest_profile(self.output_filename) + + +class TestThreadingLockCollector(BaseThreadingLockCollectorTest): + """Test Lock profiling""" + + @property + def collector_class(self): + return ThreadingLockCollector + + @property + def lock_class(self): + return threading.Lock + + +class TestThreadingRLockCollector(BaseThreadingLockCollectorTest): + """Test RLock profiling""" + + @property + def collector_class(self): + return ThreadingRLockCollector + + @property + def lock_class(self): + return threading.RLock diff --git a/tests/profiling_v2/test_profiler.py b/tests/profiling_v2/test_profiler.py index e3ddf8e7dbe..85dd02c83c7 100644 --- a/tests/profiling_v2/test_profiler.py +++ b/tests/profiling_v2/test_profiler.py @@ -1,4 +1,5 @@ import logging +import sys import time import mock @@ -272,3 +273,48 @@ def test_stack_v2_failure_telemetry_logging_with_auto(): message = call_args[0][1] assert "Failed to load stack_v2 module" in message assert "mock failure message" in message + + +@pytest.mark.skipif(not sys.platform.startswith("linux"), reason="only works on linux") +@pytest.mark.subprocess(err=None) +# For macOS: Could print 'Error uploading' but okay to ignore since we are checking if native_id is set +def test_user_threads_have_native_id(): + from os import getpid + from threading import Thread + from threading import _MainThread # pyright: ignore[reportAttributeAccessIssue] + from threading import current_thread + from time import sleep + + from ddtrace.profiling import profiler + + # DEV: We used to run this test with ddtrace_run=True passed into the + # subprocess decorator, but that caused this to be flaky for Python 3.8.x + # with gevent. When it failed for that specific venv, current_thread() + # returned a DummyThread instead of a _MainThread. + p = profiler.Profiler() + p.start() + + main = current_thread() + assert isinstance(main, _MainThread) + # We expect the current thread to have the same ID as the PID + assert main.native_id == getpid(), (main.native_id, getpid()) + + t = Thread(target=lambda: None) + t.start() + + for _ in range(10): + try: + # The TID should be higher than the PID, but not too high + assert 0 < t.native_id - getpid() < 100, (t.native_id, getpid()) + except AttributeError: + # The native_id attribute is set by the thread so we might have to + # wait a bit for it to be set. + sleep(0.1) + else: + break + else: + raise AssertionError("Thread.native_id not set") + + t.join() + + p.stop()