diff --git a/tests/profiling_v2/collector/test_threading.py b/tests/profiling_v2/collector/test_threading.py index 672a936879d..a2ea6c489c4 100644 --- a/tests/profiling_v2/collector/test_threading.py +++ b/tests/profiling_v2/collector/test_threading.py @@ -1,5 +1,6 @@ import glob import os +import sys import threading from typing import Any from typing import Optional @@ -30,85 +31,100 @@ TESTING_GEVENT: Union[str, bool] = os.getenv("DD_PROFILE_TEST_GEVENT", False) - -# Module-level globals for testing global lock profiling -_test_global_lock = None -_test_global_bar_instance = None - - -TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False) - init_linenos(__file__) # Helper classes for testing lock collector class Foo: - def __init__(self, lock_class: Any): + def __init__(self, lock_class: LockClass) -> None: self.foo_lock = lock_class() # !CREATE! foolock - def foo(self): + def foo(self) -> None: with self.foo_lock: # !RELEASE! !ACQUIRE! foolock pass class Bar: - def __init__(self, lock_class: Any): + def __init__(self, lock_class: LockClass) -> None: self.foo = Foo(lock_class) - def bar(self): + def bar(self) -> None: self.foo.foo() -@pytest.mark.parametrize( - "collector_class,expected_repr", - [ - ( - ThreadingLockCollector, - "ThreadingLockCollector(status=, " - "capture_pct=1.0, nframes=64, " - "endpoint_collection_enabled=True, tracer=None)", - ), - ( - ThreadingRLockCollector, - "ThreadingRLockCollector(status=, " - "capture_pct=1.0, nframes=64, " - "endpoint_collection_enabled=True, tracer=None)", - ), - ], -) -def test_repr( - collector_class: CollectorClass, - expected_repr: str, -) -> None: - test_collector._test_repr(collector_class, expected_repr) - - -@pytest.mark.parametrize( - "lock_class,collector_class", - [ - (threading.Lock, ThreadingLockCollector), - (threading.RLock, ThreadingRLockCollector), - ], -) -def test_patch( - lock_class: LockClass, - collector_class: CollectorClass, -) -> None: - lock = lock_class - collector = collector_class() +def test_repr() -> None: + test_collector._test_repr( + ThreadingLockCollector, + "ThreadingLockCollector(status=, " + "capture_pct=1.0, nframes=64, " + "endpoint_collection_enabled=True, tracer=None)", + ) + + +def test_patch() -> None: + lock = threading.Lock + collector = ThreadingLockCollector() collector.start() assert lock == collector._original # wrapt makes this true - assert lock == lock_class + assert lock == threading.Lock collector.stop() - assert lock == lock_class - assert collector._original == lock_class + assert lock == threading.Lock + assert collector._original == threading.Lock + + +@pytest.mark.skipif(not sys.platform.startswith("linux"), reason="only works on linux") +@pytest.mark.subprocess(err=None) +# For macOS: Could print 'Error uploading' but okay to ignore since we are checking if native_id is set +def test_user_threads_have_native_id() -> None: + from os import getpid + from threading import Thread + from threading import _MainThread + from threading import current_thread + from time import sleep + + from ddtrace.profiling import profiler + + # DEV: We used to run this test with ddtrace_run=True passed into the + # subprocess decorator, but that caused this to be flaky for Python 3.8.x + # with gevent. When it failed for that specific venv, current_thread() + # returned a DummyThread instead of a _MainThread. + p = profiler.Profiler() + p.start() + + main = current_thread() + assert isinstance(main, _MainThread) + # We expect the current thread to have the same ID as the PID + assert main.native_id == getpid(), (main.native_id, getpid()) + + t = Thread(target=lambda: None) + t.start() + + for _ in range(10): + try: + # The TID should be higher than the PID, but not too high + native_id = getattr(t, "native_id", None) + if native_id is not None: + assert 0 < native_id - getpid() < 100, (native_id, getpid()) + break + else: + raise AttributeError("native_id not set yet") + except AttributeError: + # The native_id attribute is set by the thread so we might have to + # wait a bit for it to be set. + sleep(0.1) + else: + raise AssertionError("Thread.native_id not set") + + t.join() + + p.stop() @pytest.mark.subprocess( env=dict(WRAPT_DISABLE_EXTENSIONS="True", DD_PROFILING_FILE_PATH=__file__), ) -def test_wrapt_disable_extensions(): +def test_wrapt_disable_extensions() -> None: import os import threading @@ -185,7 +201,6 @@ def test_lock_gevent_tasks() -> None: import threading from ddtrace.internal.datadog.profiling import ddup - from ddtrace.profiling.collector.threading import ThreadingLockCollector from tests.profiling.collector import pprof_utils from tests.profiling.collector.lock_utils import get_lock_linenos from tests.profiling.collector.lock_utils import init_linenos @@ -206,158 +221,71 @@ def play_with_lock() -> None: lock.acquire() # !ACQUIRE! test_lock_gevent_tasks lock.release() # !RELEASE! test_lock_gevent_tasks - def validate_and_cleanup(): - ddup.upload() - - expected_filename = "test_threading.py" - linenos = get_lock_linenos(test_name) - - profile = pprof_utils.parse_newest_profile(output_filename) - pprof_utils.assert_lock_events( - profile, - expected_acquire_events=[ - pprof_utils.LockAcquireEvent( - caller_name="play_with_lock", - filename=expected_filename, - linenos=linenos, - lock_name="lock", - # TODO: With stack_v2, the way we trace gevent greenlets has - # changed, and we'd need to expose an API to get the task_id, - # task_name, and task_frame. - # task_id=t.ident, - # task_name="foobar", - ), - ], - expected_release_events=[ - pprof_utils.LockReleaseEvent( - caller_name="play_with_lock", - filename=expected_filename, - linenos=linenos, - lock_name="lock", - # TODO: With stack_v2, the way we trace gevent greenlets has - # changed, and we'd need to expose an API to get the task_id, - # task_name, and task_frame. - # task_id=t.ident, - # task_name="foobar", - ), - ], - ) - - for f in glob.glob(pprof_prefix + ".*"): - try: - os.remove(f) - except Exception as e: - print("Error removing file: {}".format(e)) - with ThreadingLockCollector(capture_pct=100): t = threading.Thread(name="foobar", target=play_with_lock) t.start() t.join() - validate_and_cleanup() - - -# This test has to be run in a subprocess because it calls gevent.monkey.patch_all() -# which affects the whole process. -@pytest.mark.skipif(not TESTING_GEVENT, reason="gevent is not available") -@pytest.mark.subprocess( - env=dict(DD_PROFILING_FILE_PATH=__file__), -) -def test_rlock_gevent_tasks() -> None: - from gevent import monkey - - monkey.patch_all() - - import glob - import os - import threading - - from ddtrace.internal.datadog.profiling import ddup - from ddtrace.profiling.collector.threading import ThreadingRLockCollector - from tests.profiling.collector import pprof_utils - from tests.profiling.collector.lock_utils import get_lock_linenos - from tests.profiling.collector.lock_utils import init_linenos - - assert ddup.is_available, "ddup is not available" - - # Set up the ddup exporter - test_name = "test_rlock_gevent_tasks" - pprof_prefix = "/tmp" + os.sep + test_name - output_filename = pprof_prefix + "." + str(os.getpid()) - ddup.config(env="test", service=test_name, version="my_version", output_filename=pprof_prefix) - ddup.start() - - init_linenos(os.environ["DD_PROFILING_FILE_PATH"]) - - def play_with_lock() -> None: - lock = threading.RLock() # !CREATE! test_rlock_gevent_tasks - lock.acquire() # !ACQUIRE! test_rlock_gevent_tasks - lock.release() # !RELEASE! test_rlock_gevent_tasks - - def validate_and_cleanup(): - ddup.upload() - - expected_filename = "test_threading.py" - linenos = get_lock_linenos(test_name) - - profile = pprof_utils.parse_newest_profile(output_filename) - pprof_utils.assert_lock_events( - profile, - expected_acquire_events=[ - pprof_utils.LockAcquireEvent( - caller_name="play_with_lock", - filename=expected_filename, - linenos=linenos, - lock_name="lock", - # TODO: With stack_v2, the way we trace gevent greenlets has - # changed, and we'd need to expose an API to get the task_id, - # task_name, and task_frame. - # task_id=t.ident, - # task_name="foobar", - ), - ], - expected_release_events=[ - pprof_utils.LockReleaseEvent( - caller_name="play_with_lock", - filename=expected_filename, - linenos=linenos, - lock_name="lock", - # TODO: With stack_v2, the way we trace gevent greenlets has - # changed, and we'd need to expose an API to get the task_id, - # task_name, and task_frame. - # task_id=t.ident, - # task_name="foobar", - ), - ], - ) + ddup.upload() - for f in glob.glob(pprof_prefix + ".*"): - try: - os.remove(f) - except Exception as e: - print("Error removing file: {}".format(e)) + expected_filename = "test_threading.py" + linenos = get_lock_linenos(test_name) - with ThreadingRLockCollector(capture_pct=100): - t = threading.Thread(name="foobar", target=play_with_lock) - t.start() - t.join() + profile = pprof_utils.parse_newest_profile(output_filename) + pprof_utils.assert_lock_events( + profile, + expected_acquire_events=[ + pprof_utils.LockAcquireEvent( + caller_name="play_with_lock", + filename=expected_filename, + linenos=linenos, + lock_name="lock", + # TODO: With stack_v2, the way we trace gevent greenlets has + # changed, and we'd need to expose an API to get the task_id, + # task_name, and task_frame. + # task_id=t.ident, + # task_name="foobar", + ), + ], + expected_release_events=[ + pprof_utils.LockReleaseEvent( + caller_name="play_with_lock", + filename=expected_filename, + linenos=linenos, + lock_name="lock", + # TODO: With stack_v2, the way we trace gevent greenlets has + # changed, and we'd need to expose an API to get the task_id, + # task_name, and task_frame. + # task_id=t.ident, + # task_name="foobar", + ), + ], + ) - validate_and_cleanup() + for f in glob.glob(pprof_prefix + ".*"): + try: + os.remove(f) + except Exception as e: + print("Error removing file: {}".format(e)) class BaseThreadingLockCollectorTest: + test_name: str + pprof_prefix: str + output_filename: str + # These should be implemented by child classes @property - def collector_class(self): + def collector_class(self) -> CollectorClass: raise NotImplementedError("Child classes must implement collector_class") @property - def lock_class(self): + def lock_class(self) -> LockClass: raise NotImplementedError("Child classes must implement lock_class") # setup_method and teardown_method which will be called before and after # each test method, respectively, part of pytest api. - def setup_method(self, method): + def setup_method(self, method: Any) -> None: self.test_name = method.__name__ self.pprof_prefix = "/tmp" + os.sep + self.test_name # The output filename will be /tmp/method_name... @@ -370,7 +298,7 @@ def setup_method(self, method): ddup.config(env="test", service=self.test_name, version="my_version", output_filename=self.pprof_prefix) ddup.start() - def teardown_method(self, method): + def teardown_method(self, method: Any) -> None: # might be unnecessary but this will ensure that the file is removed # after each successful test, and when a test fails it's easier to # pinpoint and debug. @@ -380,25 +308,28 @@ def teardown_method(self, method): except Exception as e: print("Error removing file: {}".format(e)) - def test_wrapper(self): - collector = self.collector_class() + def test_wrapper(self) -> None: + # TODO: change to collector_class + collector = ThreadingLockCollector() with collector: class Foobar(object): - def __init__(self, lock_class): - lock = lock_class() + lock_class = threading.Lock + + def __init__(self) -> None: + lock = self.lock_class() assert lock.acquire() lock.release() - lock = self.lock_class() + lock = Foobar.lock_class() assert lock.acquire() lock.release() # Try this way too - Foobar(self.lock_class) + Foobar() # Tests - def test_lock_events(self): + def test_lock_events(self) -> None: # The first argument is the recorder.Recorder which is used for the # v1 exporter. We don't need it for the v2 exporter. with self.collector_class(capture_pct=100): @@ -430,12 +361,12 @@ def test_lock_events(self): ], ) - def test_lock_acquire_events_class(self): + def test_lock_acquire_events_class(self) -> None: with self.collector_class(capture_pct=100): lock_class = self.lock_class # Capture for inner class class Foobar(object): - def lockfunc(self): + def lockfunc(self) -> None: lock = lock_class() # !CREATE! test_lock_acquire_events_class lock.acquire() # !ACQUIRE! test_lock_acquire_events_class @@ -458,7 +389,7 @@ def lockfunc(self): ], ) - def test_lock_events_tracer(self, tracer): + def test_lock_events_tracer(self, tracer: Any) -> None: tracer._endpoint_call_counter_span_processor.enable() resource = str(uuid.uuid4()) span_type = ext.SpanTypes.WEB @@ -519,7 +450,7 @@ def test_lock_events_tracer(self, tracer): ], ) - def test_lock_events_tracer_non_web(self, tracer): + def test_lock_events_tracer_non_web(self, tracer: Any) -> None: tracer._endpoint_call_counter_span_processor.enable() resource = str(uuid.uuid4()) span_type = ext.SpanTypes.SQL @@ -561,7 +492,7 @@ def test_lock_events_tracer_non_web(self, tracer): ], ) - def test_lock_events_tracer_late_finish(self, tracer): + def test_lock_events_tracer_late_finish(self, tracer: Any) -> None: tracer._endpoint_call_counter_span_processor.enable() resource = str(uuid.uuid4()) span_type = ext.SpanTypes.WEB @@ -616,7 +547,7 @@ def test_lock_events_tracer_late_finish(self, tracer): ], ) - def test_resource_not_collected(self, tracer): + def test_resource_not_collected(self, tracer: Any) -> None: tracer._endpoint_call_counter_span_processor.enable() resource = str(uuid.uuid4()) span_type = ext.SpanTypes.WEB @@ -677,7 +608,7 @@ def test_resource_not_collected(self, tracer): ], ) - def test_lock_enter_exit_events(self): + def test_lock_enter_exit_events(self) -> None: with self.collector_class(capture_pct=100): th_lock = self.lock_class() # !CREATE! test_lock_enter_exit_events with th_lock: # !ACQUIRE! !RELEASE! test_lock_enter_exit_events @@ -713,7 +644,7 @@ def test_lock_enter_exit_events(self): "inspect_dir_enabled", [True, False], ) - def test_class_member_lock(self, inspect_dir_enabled): + def test_class_member_lock(self, inspect_dir_enabled: bool) -> None: with mock.patch("ddtrace.settings.profiling.config.lock.name_inspect_dir", inspect_dir_enabled): expected_lock_name = "foo_lock" if inspect_dir_enabled else None @@ -752,12 +683,12 @@ def test_class_member_lock(self, inspect_dir_enabled): ], ) - def test_private_lock(self): + def test_private_lock(self) -> None: class Foo: - def __init__(self, lock_class: Any): + def __init__(self, lock_class: LockClass) -> None: self.__lock = lock_class() # !CREATE! test_private_lock - def foo(self): + def foo(self) -> None: with self.__lock: # !RELEASE! !ACQUIRE! test_private_lock pass @@ -791,12 +722,12 @@ def foo(self): ], ) - def test_inner_lock(self): + def test_inner_lock(self) -> None: class Bar: - def __init__(self, lock_class: Any): + def __init__(self, lock_class: LockClass) -> None: self.foo = Foo(lock_class) - def bar(self): + def bar(self) -> None: with self.foo.foo_lock: # !RELEASE! !ACQUIRE! test_inner_lock pass @@ -831,7 +762,7 @@ def bar(self): ], ) - def test_anonymous_lock(self): + def test_anonymous_lock(self) -> None: with self.collector_class(capture_pct=100): with self.lock_class(): # !CREATE! !ACQUIRE! !RELEASE! test_anonymous_lock pass @@ -869,11 +800,11 @@ class TestBar: def __init__(self, lock_class: LockClass) -> None: self.bar_lock = lock_class() # !CREATE! bar_lock - def bar(self): + def bar(self) -> None: with self.bar_lock: # !ACQUIRE! !RELEASE! bar_lock pass - def foo(): + def foo() -> None: global _test_global_lock assert _test_global_lock is not None with _test_global_lock: # !ACQUIRE! !RELEASE! _test_global_lock @@ -891,8 +822,8 @@ def foo(): init_linenos(__file__) profile = pprof_utils.parse_newest_profile(self.output_filename) - linenos_global = get_lock_linenos("_test_global_lock", with_stmt=True) - linenos_bar = get_lock_linenos("bar_lock", with_stmt=True) + linenos_global = get_lock_linenos("_test_global_lock") + linenos_bar = get_lock_linenos("bar_lock") pprof_utils.assert_lock_events( profile, @@ -926,7 +857,7 @@ def foo(): ], ) - def test_upload_resets_profile(self): + def test_upload_resets_profile(self) -> None: # This test checks that the profile is cleared after each upload() call # It is added in test_threading.py as LockCollector can easily be # configured to be deterministic with capture_pct=100. @@ -965,24 +896,24 @@ def test_upload_resets_profile(self): class TestThreadingLockCollector(BaseThreadingLockCollectorTest): - """Test Lock profiling""" + """Test threading.Lock profiling""" @property - def collector_class(self): + def collector_class(self) -> Type[ThreadingLockCollector]: return ThreadingLockCollector @property - def lock_class(self): + def lock_class(self) -> Type[threading.Lock]: return threading.Lock class TestThreadingRLockCollector(BaseThreadingLockCollectorTest): - """Test RLock profiling""" + """Test threading.RLock profiling""" @property - def collector_class(self): + def collector_class(self) -> Type[ThreadingRLockCollector]: return ThreadingRLockCollector @property - def lock_class(self): + def lock_class(self) -> Type[threading.RLock]: return threading.RLock