From 9064e166efb37a83f4a64bd90d31262f66d426ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20G=C5=82owski?= Date: Sat, 14 Oct 2023 20:15:15 +0200 Subject: [PATCH] feat: Add ray integration support (#2400) --- sentry_sdk/integrations/ray.py | 72 ++++++++++++++++++++++++++++++ tests/integrations/ray/__init__.py | 3 ++ tests/integrations/ray/test_ray.py | 57 +++++++++++++++++++++++ tox.ini | 7 +++ 4 files changed, 139 insertions(+) create mode 100644 sentry_sdk/integrations/ray.py create mode 100644 tests/integrations/ray/__init__.py create mode 100644 tests/integrations/ray/test_ray.py diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py new file mode 100644 index 0000000000..c20b725e94 --- /dev/null +++ b/sentry_sdk/integrations/ray.py @@ -0,0 +1,72 @@ +from sentry_sdk.integrations import DidNotEnable, Integration + +try: + import ray +except ImportError: + raise DidNotEnable("Ray not installed.") +import functools + +from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK +import logging +import sentry_sdk +from importlib.metadata import version + + +def _check_sentry_initialized(): + if sentry_sdk.Hub.current.client: + return + # we cannot use sentry sdk logging facilities because it wasn't initialized + logger = logging.getLogger("sentry_sdk.errors") + logger.warning( + "[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded." + ) + + +def _patch_ray_remote(): + old_remote = ray.remote + + @functools.wraps(old_remote) + def new_remote(f, *args, **kwargs): + def _f(*f_args, _tracing=None, **f_kwargs): + _check_sentry_initialized() + with sentry_sdk.start_transaction( + sentry_sdk.continue_trace( + _tracing, + op="ray.remote.receive", + source=TRANSACTION_SOURCE_TASK, + name="Ray worker transaction", + ) + ) as tx: + result = f(*f_args, **f_kwargs) + tx.set_status("ok") + return result + + _f = old_remote(_f, *args, *kwargs) + old_remote_method = _f.remote + + def _remote_method_with_header_propagation(*args, **kwargs): + with sentry_sdk.start_span( + op="ray.remote.send", description="Sending task to ray cluster." + ): + tracing = { + k: v + for k, v in sentry_sdk.Hub.current.iter_trace_propagation_headers() + } + return old_remote_method(*args, **kwargs, _tracing=tracing) + + _f.remote = _remote_method_with_header_propagation + + return _f + + ray.remote = new_remote + return + + +class RayIntegration(Integration): + identifier = "ray" + + @staticmethod + def setup_once(): + if tuple(int(x) for x in version("ray").split(".")) < (2, 7, 0): + raise DidNotEnable("Ray 2.7.0 or newer required") + _patch_ray_remote() diff --git a/tests/integrations/ray/__init__.py b/tests/integrations/ray/__init__.py new file mode 100644 index 0000000000..92f6d93906 --- /dev/null +++ b/tests/integrations/ray/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("ray") diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py new file mode 100644 index 0000000000..97c0858b54 --- /dev/null +++ b/tests/integrations/ray/test_ray.py @@ -0,0 +1,57 @@ +import time + +import ray + +import sentry_sdk +from sentry_sdk.envelope import Envelope +from sentry_sdk.integrations.ray import RayIntegration +from tests.conftest import TestTransport + + +class RayTestTransport(TestTransport): + def __init__(self): + self.events = [] + self.envelopes = [] + super().__init__(self.events.append, self.envelopes.append) + + +def _setup_ray_sentry(): + sentry_sdk.init( + traces_sample_rate=1.0, + integrations=[RayIntegration()], + transport=RayTestTransport(), + ) + + +def test_ray(): + _setup_ray_sentry() + + @ray.remote + def _task(): + with sentry_sdk.start_span(op="task", description="example task step"): + time.sleep(0.1) + return sentry_sdk.Hub.current.client.transport.envelopes + + ray.init( + runtime_env=dict(worker_process_setup_hook=_setup_ray_sentry, working_dir="./") + ) + + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + worker_envelopes = ray.get(_task.remote()) + + _assert_envelopes_are_associated_with_same_trace_id( + sentry_sdk.Hub.current.client.transport.envelopes[0], worker_envelopes[0] + ) + + +def _assert_envelopes_are_associated_with_same_trace_id( + client_side_envelope: Envelope, worker_envelope: Envelope +): + client_side_envelope_dict = client_side_envelope.get_transaction_event() + worker_envelope_dict = worker_envelope.get_transaction_event() + trace_id = client_side_envelope_dict["contexts"]["trace"]["trace_id"] + for span in client_side_envelope_dict["spans"]: + assert span["trace_id"] == trace_id + for span in worker_envelope_dict["spans"]: + assert span["trace_id"] == trace_id + assert worker_envelope_dict["contexts"]["trace"]["trace_id"] == trace_id diff --git a/tox.ini b/tox.ini index d5e0d753a9..6461ab1a0b 100644 --- a/tox.ini +++ b/tox.ini @@ -142,6 +142,9 @@ envlist = {py3.7,py3.8,py3.9,py3.10,py3.11}-quart-v{0.16,0.17,0.18} {py3.8,py3.9,py3.10,py3.11,py3.12}-quart-v{0.19} + # Ray + {py3.10,py3.11}-ray + # Redis {py2.7,py3.7,py3.8,py3.9,py3.10,py3.11,py3.12}-redis @@ -411,6 +414,9 @@ deps = pyramid-v1.9: pyramid>=1.9,<1.10 pyramid-v1.10: pyramid>=1.10,<1.11 + # Ray + ray: ray>=2.7.0 + # Quart quart: quart-auth quart: pytest-asyncio @@ -564,6 +570,7 @@ setenv = pymongo: TESTPATH=tests/integrations/pymongo pyramid: TESTPATH=tests/integrations/pyramid quart: TESTPATH=tests/integrations/quart + ray: TESTPATH=tests/integrations/ray redis: TESTPATH=tests/integrations/redis rediscluster: TESTPATH=tests/integrations/rediscluster requests: TESTPATH=tests/integrations/requests