From a2091db3d23b11ab57641b8fce2751f7e3218eb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20G=C5=82owski?= Date: Sat, 14 Oct 2023 20:15:15 +0200 Subject: [PATCH 01/13] feat(ray): Create Ray integration The integration includes performance support. Also, add tests for the integration. Closes #2400 --- .../test-integrations-data-processing.yml | 8 ++ .../split-tox-gh-actions.py | 1 + sentry_sdk/integrations/ray.py | 85 +++++++++++++++++++ tests/integrations/ray/__init__.py | 3 + tests/integrations/ray/test_ray.py | 57 +++++++++++++ tox.ini | 7 ++ 6 files changed, 161 insertions(+) create mode 100644 sentry_sdk/integrations/ray.py create mode 100644 tests/integrations/ray/__init__.py create mode 100644 tests/integrations/ray/test_ray.py diff --git a/.github/workflows/test-integrations-data-processing.yml b/.github/workflows/test-integrations-data-processing.yml index b9f1b3fdcb..fb21656ef2 100644 --- a/.github/workflows/test-integrations-data-processing.yml +++ b/.github/workflows/test-integrations-data-processing.yml @@ -74,6 +74,10 @@ jobs: run: | set -x # print commands that are executed ./scripts/runtox.sh "py${{ matrix.python-version }}-huggingface_hub-latest" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch + - name: Test ray latest + run: | + set -x # print commands that are executed + ./scripts/runtox.sh "py${{ matrix.python-version }}-ray-latest" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch - name: Test rq latest run: | set -x # print commands that are executed @@ -142,6 +146,10 @@ jobs: run: | set -x # print commands that are executed ./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-huggingface_hub" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch + - name: Test ray pinned + run: | + set -x # print commands that are executed + ./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-ray" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch - name: Test rq pinned run: | set -x # print commands that are executed diff --git a/scripts/split-tox-gh-actions/split-tox-gh-actions.py b/scripts/split-tox-gh-actions/split-tox-gh-actions.py index 5d5f423857..66a4835064 100755 --- a/scripts/split-tox-gh-actions/split-tox-gh-actions.py +++ b/scripts/split-tox-gh-actions/split-tox-gh-actions.py @@ -74,6 +74,7 @@ "langchain", "openai", "huggingface_hub", + "ray", "rq", ], "Databases": [ diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py new file mode 100644 index 0000000000..45d66a8c2d --- /dev/null +++ b/sentry_sdk/integrations/ray.py @@ -0,0 +1,85 @@ +from sentry_sdk.integrations import DidNotEnable, Integration + +try: + import ray # type: ignore[import-not-found] +except ImportError: + raise DidNotEnable("Ray not installed.") +import functools + +from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK +import logging +import sentry_sdk +from importlib.metadata import version + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Callable + from typing import Any, Optional + + +def _check_sentry_initialized(): + # type: () -> None + if sentry_sdk.Hub.current.client: + return + # we cannot use sentry sdk logging facilities because it wasn't initialized + logger = logging.getLogger("sentry_sdk.errors") + logger.warning( + "[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded." + ) + + +def _patch_ray_remote(): + # type: () -> None + old_remote = ray.remote + + @functools.wraps(old_remote) + def new_remote(f, *args, **kwargs): + # type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any] + def _f(*f_args, _tracing=None, **f_kwargs): + # type: (Any, Optional[dict[str, Any]], Any) -> Any + _check_sentry_initialized() + transaction = None + if _tracing is not None: + transaction = sentry_sdk.continue_trace( + _tracing, + op="ray.remote.receive", + source=TRANSACTION_SOURCE_TASK, + name="Ray worker transaction", + ) + with sentry_sdk.start_transaction(transaction) as tx: + result = f(*f_args, **f_kwargs) + tx.set_status("ok") + return result + + rv = old_remote(_f, *args, *kwargs) + old_remote_method = rv.remote + + def _remote_method_with_header_propagation(*args, **kwargs): + # type: (*Any, **Any) -> Any + with sentry_sdk.start_span( + op="ray.remote.send", description="Sending task to ray cluster." + ): + tracing = { + k: v + for k, v in sentry_sdk.Hub.current.iter_trace_propagation_headers() + } + return old_remote_method(*args, **kwargs, _tracing=tracing) + + rv.remote = _remote_method_with_header_propagation + + return rv + + ray.remote = new_remote + return + + +class RayIntegration(Integration): + identifier = "ray" + + @staticmethod + def setup_once(): + # type: () -> None + if tuple(int(x) for x in version("ray").split(".")) < (2, 7, 0): + raise DidNotEnable("Ray 2.7.0 or newer required") + _patch_ray_remote() diff --git a/tests/integrations/ray/__init__.py b/tests/integrations/ray/__init__.py new file mode 100644 index 0000000000..92f6d93906 --- /dev/null +++ b/tests/integrations/ray/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("ray") diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py new file mode 100644 index 0000000000..97c0858b54 --- /dev/null +++ b/tests/integrations/ray/test_ray.py @@ -0,0 +1,57 @@ +import time + +import ray + +import sentry_sdk +from sentry_sdk.envelope import Envelope +from sentry_sdk.integrations.ray import RayIntegration +from tests.conftest import TestTransport + + +class RayTestTransport(TestTransport): + def __init__(self): + self.events = [] + self.envelopes = [] + super().__init__(self.events.append, self.envelopes.append) + + +def _setup_ray_sentry(): + sentry_sdk.init( + traces_sample_rate=1.0, + integrations=[RayIntegration()], + transport=RayTestTransport(), + ) + + +def test_ray(): + _setup_ray_sentry() + + @ray.remote + def _task(): + with sentry_sdk.start_span(op="task", description="example task step"): + time.sleep(0.1) + return sentry_sdk.Hub.current.client.transport.envelopes + + ray.init( + runtime_env=dict(worker_process_setup_hook=_setup_ray_sentry, working_dir="./") + ) + + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + worker_envelopes = ray.get(_task.remote()) + + _assert_envelopes_are_associated_with_same_trace_id( + sentry_sdk.Hub.current.client.transport.envelopes[0], worker_envelopes[0] + ) + + +def _assert_envelopes_are_associated_with_same_trace_id( + client_side_envelope: Envelope, worker_envelope: Envelope +): + client_side_envelope_dict = client_side_envelope.get_transaction_event() + worker_envelope_dict = worker_envelope.get_transaction_event() + trace_id = client_side_envelope_dict["contexts"]["trace"]["trace_id"] + for span in client_side_envelope_dict["spans"]: + assert span["trace_id"] == trace_id + for span in worker_envelope_dict["spans"]: + assert span["trace_id"] == trace_id + assert worker_envelope_dict["contexts"]["trace"]["trace_id"] == trace_id diff --git a/tox.ini b/tox.ini index f1bc0e7a5e..1697f50ea7 100644 --- a/tox.ini +++ b/tox.ini @@ -185,6 +185,9 @@ envlist = {py3.8,py3.11,py3.12}-quart-v{0.19} {py3.8,py3.11,py3.12}-quart-latest + # Ray + {py3.10,py3.11}-ray + # Redis {py3.6,py3.8}-redis-v{3} {py3.7,py3.8,py3.11}-redis-v{4} @@ -494,6 +497,9 @@ deps = pyramid-v2.0: pyramid~=2.0.0 pyramid-latest: pyramid + # Ray + ray: ray>=2.7.0 + # Quart quart: quart-auth quart: pytest-asyncio @@ -638,6 +644,7 @@ setenv = pymongo: TESTPATH=tests/integrations/pymongo pyramid: TESTPATH=tests/integrations/pyramid quart: TESTPATH=tests/integrations/quart + ray: TESTPATH=tests/integrations/ray redis: TESTPATH=tests/integrations/redis rediscluster: TESTPATH=tests/integrations/rediscluster requests: TESTPATH=tests/integrations/requests From b36895980e206308987768f904cd1616d63ea807 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Thu, 1 Aug 2024 15:22:03 +0200 Subject: [PATCH 02/13] Removed hub --- sentry_sdk/integrations/ray.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index 45d66a8c2d..ab14384c60 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -20,9 +20,10 @@ def _check_sentry_initialized(): # type: () -> None - if sentry_sdk.Hub.current.client: + if sentry_sdk.get_client().is_active(): return - # we cannot use sentry sdk logging facilities because it wasn't initialized + + # We cannot use Sentry SDK logging facilities because it wasn't initialized logger = logging.getLogger("sentry_sdk.errors") logger.warning( "[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded." @@ -62,7 +63,7 @@ def _remote_method_with_header_propagation(*args, **kwargs): ): tracing = { k: v - for k, v in sentry_sdk.Hub.current.iter_trace_propagation_headers() + for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers() } return old_remote_method(*args, **kwargs, _tracing=tracing) From b605b5ce42713adfbd1cedcc18ad3da40b56d387 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Thu, 1 Aug 2024 15:36:19 +0200 Subject: [PATCH 03/13] Some cleanup --- sentry_sdk/consts.py | 2 ++ sentry_sdk/integrations/ray.py | 28 ++++++++++++++++++---------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 82552e4084..6b23ccc032 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -462,6 +462,8 @@ class OP: QUEUE_TASK_RQ = "queue.task.rq" QUEUE_SUBMIT_HUEY = "queue.submit.huey" QUEUE_TASK_HUEY = "queue.task.huey" + QUEUE_SUBMIT_RAY = "queue.submit.ray" + QUEUE_TASK_RAY = "queue.task.ray" SUBPROCESS = "subprocess" SUBPROCESS_WAIT = "subprocess.wait" SUBPROCESS_COMMUNICATE = "subprocess.communicate" diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index ab14384c60..65fd0f27b5 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -1,4 +1,9 @@ +import logging + +import sentry_sdk from sentry_sdk.integrations import DidNotEnable, Integration +from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK +from sentry_sdk.utils import package_version try: import ray # type: ignore[import-not-found] @@ -6,11 +11,6 @@ raise DidNotEnable("Ray not installed.") import functools -from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK -import logging -import sentry_sdk -from importlib.metadata import version - from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -22,8 +22,7 @@ def _check_sentry_initialized(): # type: () -> None if sentry_sdk.get_client().is_active(): return - - # We cannot use Sentry SDK logging facilities because it wasn't initialized + logger = logging.getLogger("sentry_sdk.errors") logger.warning( "[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded." @@ -40,6 +39,7 @@ def new_remote(f, *args, **kwargs): def _f(*f_args, _tracing=None, **f_kwargs): # type: (Any, Optional[dict[str, Any]], Any) -> Any _check_sentry_initialized() + transaction = None if _tracing is not None: transaction = sentry_sdk.continue_trace( @@ -48,9 +48,10 @@ def _f(*f_args, _tracing=None, **f_kwargs): source=TRANSACTION_SOURCE_TASK, name="Ray worker transaction", ) - with sentry_sdk.start_transaction(transaction) as tx: + + with sentry_sdk.start_transaction(transaction) as transaction: result = f(*f_args, **f_kwargs) - tx.set_status("ok") + transaction.set_status("ok") return result rv = old_remote(_f, *args, *kwargs) @@ -77,10 +78,17 @@ def _remote_method_with_header_propagation(*args, **kwargs): class RayIntegration(Integration): identifier = "ray" + origin = f"auto.queue.{identifier}" @staticmethod def setup_once(): # type: () -> None - if tuple(int(x) for x in version("ray").split(".")) < (2, 7, 0): + version = package_version("ray") + + if version is None: + raise DidNotEnable("Unparsable ray version: {}".format(version)) + + if version < (2, 7, 0): raise DidNotEnable("Ray 2.7.0 or newer required") + _patch_ray_remote() From f47192e3899708eb6b95688a97d9b26754eaf12b Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Thu, 1 Aug 2024 15:41:44 +0200 Subject: [PATCH 04/13] Set origin --- sentry_sdk/integrations/ray.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index 65fd0f27b5..9f4541fca2 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -1,6 +1,7 @@ import logging import sentry_sdk +from sentry_sdk.consts import OP from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK from sentry_sdk.utils import package_version @@ -43,9 +44,10 @@ def _f(*f_args, _tracing=None, **f_kwargs): transaction = None if _tracing is not None: transaction = sentry_sdk.continue_trace( - _tracing, - op="ray.remote.receive", + environ_or_headers=_tracing, + op=OP.QUEUE_TASK_RQ, source=TRANSACTION_SOURCE_TASK, + origin=RayIntegration.origin, name="Ray worker transaction", ) @@ -60,7 +62,9 @@ def _f(*f_args, _tracing=None, **f_kwargs): def _remote_method_with_header_propagation(*args, **kwargs): # type: (*Any, **Any) -> Any with sentry_sdk.start_span( - op="ray.remote.send", description="Sending task to ray cluster." + op=OP.QUEUE_SUBMIT_RAY, + origin=RayIntegration.origin, + description="Sending task to ray cluster." ): tracing = { k: v From cddc5e83c76b49c9e43771721d167495a1b3870c Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 2 Aug 2024 08:22:42 +0200 Subject: [PATCH 05/13] Better names for transactions/spans --- sentry_sdk/integrations/ray.py | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index 9f4541fca2..b88d74dbd6 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -1,10 +1,8 @@ -import logging - import sentry_sdk from sentry_sdk.consts import OP from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK -from sentry_sdk.utils import package_version +from sentry_sdk.utils import logger, package_version, qualname_from_function try: import ray # type: ignore[import-not-found] @@ -24,7 +22,6 @@ def _check_sentry_initialized(): if sentry_sdk.get_client().is_active(): return - logger = logging.getLogger("sentry_sdk.errors") logger.warning( "[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded." ) @@ -41,15 +38,13 @@ def _f(*f_args, _tracing=None, **f_kwargs): # type: (Any, Optional[dict[str, Any]], Any) -> Any _check_sentry_initialized() - transaction = None - if _tracing is not None: - transaction = sentry_sdk.continue_trace( - environ_or_headers=_tracing, - op=OP.QUEUE_TASK_RQ, - source=TRANSACTION_SOURCE_TASK, - origin=RayIntegration.origin, - name="Ray worker transaction", - ) + transaction = sentry_sdk.continue_trace( + _tracing or {}, + op=OP.QUEUE_TASK_RAY, + name=qualname_from_function(f), + origin=RayIntegration.origin, + source=TRANSACTION_SOURCE_TASK, + ) with sentry_sdk.start_transaction(transaction) as transaction: result = f(*f_args, **f_kwargs) @@ -63,8 +58,8 @@ def _remote_method_with_header_propagation(*args, **kwargs): # type: (*Any, **Any) -> Any with sentry_sdk.start_span( op=OP.QUEUE_SUBMIT_RAY, + description=qualname_from_function(f), origin=RayIntegration.origin, - description="Sending task to ray cluster." ): tracing = { k: v @@ -77,7 +72,6 @@ def _remote_method_with_header_propagation(*args, **kwargs): return rv ray.remote = new_remote - return class RayIntegration(Integration): From 3d7905ee47c1660a6daaeafdfed306bb7c6faecd Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 2 Aug 2024 08:23:36 +0200 Subject: [PATCH 06/13] Removed hub from tests --- tests/integrations/ray/test_ray.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 97c0858b54..b6a021a5cb 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -30,7 +30,7 @@ def test_ray(): def _task(): with sentry_sdk.start_span(op="task", description="example task step"): time.sleep(0.1) - return sentry_sdk.Hub.current.client.transport.envelopes + return sentry_sdk.get_client().transport.envelopes ray.init( runtime_env=dict(worker_process_setup_hook=_setup_ray_sentry, working_dir="./") @@ -40,7 +40,7 @@ def _task(): worker_envelopes = ray.get(_task.remote()) _assert_envelopes_are_associated_with_same_trace_id( - sentry_sdk.Hub.current.client.transport.envelopes[0], worker_envelopes[0] + sentry_sdk.Hub.get_client().transport.envelopes[0], worker_envelopes[0] ) From 2804490e22c4a4561ccaa43cfaa6800b26305cf2 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 2 Aug 2024 08:58:02 +0200 Subject: [PATCH 07/13] Improved tests --- tests/integrations/ray/test_ray.py | 96 ++++++++++++++++++++++-------- 1 file changed, 72 insertions(+), 24 deletions(-) diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index b6a021a5cb..fb3c1f0cd6 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -1,4 +1,4 @@ -import time +import pytest import ray @@ -10,48 +10,96 @@ class RayTestTransport(TestTransport): def __init__(self): - self.events = [] self.envelopes = [] - super().__init__(self.events.append, self.envelopes.append) + super().__init__() + def capture_envelope(self, envelope: Envelope) -> None: + self.envelopes.append(envelope) -def _setup_ray_sentry(): + +def setup_sentry(): sentry_sdk.init( - traces_sample_rate=1.0, integrations=[RayIntegration()], transport=RayTestTransport(), + traces_sample_rate=1.0, ) -def test_ray(): - _setup_ray_sentry() +@pytest.mark.forked +def test_ray_tracing(): + setup_sentry() @ray.remote - def _task(): + def example_task(): with sentry_sdk.start_span(op="task", description="example task step"): - time.sleep(0.1) + ... + return sentry_sdk.get_client().transport.envelopes ray.init( - runtime_env=dict(worker_process_setup_hook=_setup_ray_sentry, working_dir="./") + runtime_env={ + "worker_process_setup_hook": setup_sentry, + "working_dir": "./", + } ) with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - worker_envelopes = ray.get(_task.remote()) + worker_envelopes = ray.get(example_task.remote()) + + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_transaction = client_envelope.get_transaction_event() + worker_envelope = worker_envelopes[0] + worker_transaction = worker_envelope.get_transaction_event() - _assert_envelopes_are_associated_with_same_trace_id( - sentry_sdk.Hub.get_client().transport.envelopes[0], worker_envelopes[0] + assert ( + client_transaction["contexts"]["trace"]["trace_id"] + == client_transaction["contexts"]["trace"]["trace_id"] ) + for span in client_transaction["spans"]: + assert ( + span["trace_id"] + == client_transaction["contexts"]["trace"]["trace_id"] + == client_transaction["contexts"]["trace"]["trace_id"] + ) + + for span in worker_transaction["spans"]: + assert ( + span["trace_id"] + == client_transaction["contexts"]["trace"]["trace_id"] + == client_transaction["contexts"]["trace"]["trace_id"] + ) + + +@pytest.mark.forked +def test_ray_spans(): + setup_sentry() + + @ray.remote + def example_task(): + ... + + return sentry_sdk.get_client().transport.envelopes + + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry, + "working_dir": "./", + } + ) + + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + worker_envelopes = ray.get(example_task.remote()) + + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_transaction = client_envelope.get_transaction_event() + worker_envelope = worker_envelopes[0] + worker_transaction = worker_envelope.get_transaction_event() + + for span in client_transaction["spans"]: + assert span["op"] == "queue.submit.ray" + assert span["origin"] == "auto.queue.ray" -def _assert_envelopes_are_associated_with_same_trace_id( - client_side_envelope: Envelope, worker_envelope: Envelope -): - client_side_envelope_dict = client_side_envelope.get_transaction_event() - worker_envelope_dict = worker_envelope.get_transaction_event() - trace_id = client_side_envelope_dict["contexts"]["trace"]["trace_id"] - for span in client_side_envelope_dict["spans"]: - assert span["trace_id"] == trace_id - for span in worker_envelope_dict["spans"]: - assert span["trace_id"] == trace_id - assert worker_envelope_dict["contexts"]["trace"]["trace_id"] == trace_id + for span in worker_transaction["spans"]: + assert span["op"] == "queue.task.ray" + assert span["origin"] == "auto.queue.ray" From 40d397acfebab245d7ec275dc93733d12a60d9a3 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 2 Aug 2024 09:16:31 +0200 Subject: [PATCH 08/13] Added error handling --- sentry_sdk/integrations/ray.py | 52 ++++++++++++++++++++++++++---- tests/integrations/ray/test_ray.py | 2 -- 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index b88d74dbd6..2c60a0c819 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -1,8 +1,15 @@ +import sys import sentry_sdk -from sentry_sdk.consts import OP +from sentry_sdk.consts import OP, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK -from sentry_sdk.utils import logger, package_version, qualname_from_function +from sentry_sdk.utils import ( + event_from_exception, + logger, + package_version, + qualname_from_function, + reraise, +) try: import ray # type: ignore[import-not-found] @@ -15,6 +22,7 @@ if TYPE_CHECKING: from collections.abc import Callable from typing import Any, Optional + from sentry_sdk.utils import ExcInfo def _check_sentry_initialized(): @@ -47,8 +55,15 @@ def _f(*f_args, _tracing=None, **f_kwargs): ) with sentry_sdk.start_transaction(transaction) as transaction: - result = f(*f_args, **f_kwargs) - transaction.set_status("ok") + try: + result = f(*f_args, **f_kwargs) + transaction.set_status(SPANSTATUS.OK) + except Exception: + transaction.set_status(SPANSTATUS.INTERNAL_ERROR) + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + return result rv = old_remote(_f, *args, *kwargs) @@ -60,12 +75,21 @@ def _remote_method_with_header_propagation(*args, **kwargs): op=OP.QUEUE_SUBMIT_RAY, description=qualname_from_function(f), origin=RayIntegration.origin, - ): + ) as span: tracing = { k: v for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers() } - return old_remote_method(*args, **kwargs, _tracing=tracing) + try: + result = old_remote_method(*args, **kwargs, _tracing=tracing) + span.set_status(SPANSTATUS.OK) + except Exception: + span.set_status(SPANSTATUS.INTERNAL_ERROR) + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result rv.remote = _remote_method_with_header_propagation @@ -74,6 +98,22 @@ def _remote_method_with_header_propagation(*args, **kwargs): ray.remote = new_remote +def _capture_exception(exc_info, **kwargs): + # type: (ExcInfo, **Any) -> None + client = sentry_sdk.get_client() + + event, hint = event_from_exception( + exc_info, + client_options=client.options, + mechanism={ + "handled": False, + "type": RayIntegration.identifier, + }, + ) + + sentry_sdk.capture_event(event, hint=hint) + + class RayIntegration(Integration): identifier = "ray" origin = f"auto.queue.{identifier}" diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index fb3c1f0cd6..4acb61de81 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -77,8 +77,6 @@ def test_ray_spans(): @ray.remote def example_task(): - ... - return sentry_sdk.get_client().transport.envelopes ray.init( From 4c5b16db1ebdff59c80d5590b8e9abc8352355f5 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 2 Aug 2024 11:57:37 +0200 Subject: [PATCH 09/13] Improved tests --- sentry_sdk/integrations/ray.py | 8 ++- tests/integrations/ray/test_ray.py | 83 +++++++++++++++++++++++++----- 2 files changed, 77 insertions(+), 14 deletions(-) diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index 2c60a0c819..82c00bae64 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -42,8 +42,12 @@ def _patch_ray_remote(): @functools.wraps(old_remote) def new_remote(f, *args, **kwargs): # type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any] + def _f(*f_args, _tracing=None, **f_kwargs): # type: (Any, Optional[dict[str, Any]], Any) -> Any + """ + Ray Worker + """ _check_sentry_initialized() transaction = sentry_sdk.continue_trace( @@ -71,6 +75,9 @@ def _f(*f_args, _tracing=None, **f_kwargs): def _remote_method_with_header_propagation(*args, **kwargs): # type: (*Any, **Any) -> Any + """ + Ray Client + """ with sentry_sdk.start_span( op=OP.QUEUE_SUBMIT_RAY, description=qualname_from_function(f), @@ -110,7 +117,6 @@ def _capture_exception(exc_info, **kwargs): "type": RayIntegration.identifier, }, ) - sentry_sdk.capture_event(event, hint=hint) diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 4acb61de81..ebccbe012f 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -1,3 +1,5 @@ +import json +import os import pytest import ray @@ -17,10 +19,22 @@ def capture_envelope(self, envelope: Envelope) -> None: self.envelopes.append(envelope) -def setup_sentry(): +class RayLoggingTransport(TestTransport): + def __init__(self): + super().__init__() + + def capture_envelope(self, envelope: Envelope) -> None: + print(envelope.serialize().decode("utf-8", "replace")) + + +def setup_sentry_with_logging_transport(): + setup_sentry(transport=RayLoggingTransport()) + + +def setup_sentry(transport=None): sentry_sdk.init( integrations=[RayIntegration()], - transport=RayTestTransport(), + transport=RayTestTransport() if transport is None else transport, traces_sample_rate=1.0, ) @@ -29,13 +43,6 @@ def setup_sentry(): def test_ray_tracing(): setup_sentry() - @ray.remote - def example_task(): - with sentry_sdk.start_span(op="task", description="example task step"): - ... - - return sentry_sdk.get_client().transport.envelopes - ray.init( runtime_env={ "worker_process_setup_hook": setup_sentry, @@ -43,6 +50,13 @@ def example_task(): } ) + @ray.remote + def example_task(): + with sentry_sdk.start_span(op="task", description="example task step"): + ... + + return sentry_sdk.get_client().transport.envelopes + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): worker_envelopes = ray.get(example_task.remote()) @@ -75,10 +89,6 @@ def example_task(): def test_ray_spans(): setup_sentry() - @ray.remote - def example_task(): - return sentry_sdk.get_client().transport.envelopes - ray.init( runtime_env={ "worker_process_setup_hook": setup_sentry, @@ -86,6 +96,10 @@ def example_task(): } ) + @ray.remote + def example_task(): + return sentry_sdk.get_client().transport.envelopes + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): worker_envelopes = ray.get(example_task.remote()) @@ -101,3 +115,46 @@ def example_task(): for span in worker_transaction["spans"]: assert span["op"] == "queue.task.ray" assert span["origin"] == "auto.queue.ray" + + +@pytest.mark.forked +def test_ray_errors(): + setup_sentry_with_logging_transport() + + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry_with_logging_transport, + "working_dir": "./", + } + ) + + @ray.remote + def example_task(): + 1 / 0 + + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + with pytest.raises(ZeroDivisionError): + future = example_task.remote() + ray.get(future) + + job_id = future.job_id().hex() + + # Read the worker log output containing the error + log_dir = "/tmp/ray/session_latest/logs/" + log_file = [ + f + for f in os.listdir(log_dir) + if "worker" in f and job_id in f and f.endswith(".out") + ][0] + with open(os.path.join(log_dir, log_file), "r") as file: + lines = file.readlines() + # parse error object from log line + error = json.loads(lines[4][:-1]) + + assert error["level"] == "error" + assert ( + error["transaction"] + == "tests.integrations.ray.test_ray.test_ray_errors..example_task" + ) # its in the worker, not the client thus not "ray test transaction" + assert error["exception"]["values"][0]["mechanism"]["type"] == "ray" + assert not error["exception"]["values"][0]["mechanism"]["handled"] From 1d37b52a33e2621c980e59ba0c6d5539bd83c3e3 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 7 Aug 2024 14:02:18 +0200 Subject: [PATCH 10/13] Prevent breaking user code when Ray actors are used --- sentry_sdk/integrations/ray.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index 82c00bae64..25478aa203 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -1,4 +1,6 @@ +import inspect import sys + import sentry_sdk from sentry_sdk.consts import OP, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration @@ -42,6 +44,12 @@ def _patch_ray_remote(): @functools.wraps(old_remote) def new_remote(f, *args, **kwargs): # type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any] + if inspect.isclass(f): + # Ray Actors + # (https://docs.ray.io/en/latest/ray-core/actors.html) + # are not supported + # (Only Ray Tasks are supported) + return old_remote(f, *args, *kwargs) def _f(*f_args, _tracing=None, **f_kwargs): # type: (Any, Optional[dict[str, Any]], Any) -> Any From f8d9d98c4abec9446efe9dc198d534b863c34064 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 7 Aug 2024 14:16:28 +0200 Subject: [PATCH 11/13] Updated tests --- tests/integrations/ray/test_ray.py | 45 ++++++++++++++++++++++++++++++ tox.ini | 6 ++-- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index ebccbe012f..6a4127d8fa 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -158,3 +158,48 @@ def example_task(): ) # its in the worker, not the client thus not "ray test transaction" assert error["exception"]["values"][0]["mechanism"]["type"] == "ray" assert not error["exception"]["values"][0]["mechanism"]["handled"] + + +# @pytest.mark.forked +def test_ray_actor(): + setup_sentry() + + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry, + "working_dir": "./", + } + ) + + @ray.remote + class Counter(object): + def __init__(self): + self.n = 0 + + def increment(self): + with sentry_sdk.start_span(op="task", description="example task step"): + self.n += 1 + + return sentry_sdk.get_client().transport.envelopes + + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + counter = Counter.remote() + worker_envelopes = ray.get(counter.increment.remote()) + + # Currently no transactions/spans are captured in actors + assert worker_envelopes == [] + + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_transaction = client_envelope.get_transaction_event() + + assert ( + client_transaction["contexts"]["trace"]["trace_id"] + == client_transaction["contexts"]["trace"]["trace_id"] + ) + + for span in client_transaction["spans"]: + assert ( + span["trace_id"] + == client_transaction["contexts"]["trace"]["trace_id"] + == client_transaction["contexts"]["trace"]["trace_id"] + ) diff --git a/tox.ini b/tox.ini index 3a87ca7c53..6f697bc05a 100644 --- a/tox.ini +++ b/tox.ini @@ -205,7 +205,8 @@ envlist = {py3.8,py3.11,py3.12}-quart-latest # Ray - {py3.10,py3.11}-ray + {py3.10,py3.11}-ray-v{2.34} + {py3.10,py3.11}-ray-latest # Redis {py3.6,py3.8}-redis-v{3} @@ -547,7 +548,8 @@ deps = pyramid-latest: pyramid # Ray - ray: ray>=2.7.0 + ray-v2.34: ray~=2.34.0 + ray-latest: ray # Quart quart: quart-auth From 3055ff21a6fdac9fa28a487b094f4d1046b06efb Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Thu, 8 Aug 2024 15:25:20 +0200 Subject: [PATCH 12/13] Cleanup --- tests/integrations/ray/test_ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 6a4127d8fa..83d8b04b67 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -160,7 +160,7 @@ def example_task(): assert not error["exception"]["values"][0]["mechanism"]["handled"] -# @pytest.mark.forked +@pytest.mark.forked def test_ray_actor(): setup_sentry() From 182e7aa03569e398e67af7d15677fff5969683c6 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 13 Aug 2024 13:59:06 +0200 Subject: [PATCH 13/13] Update sentry_sdk/integrations/ray.py Co-authored-by: Ivana Kellyer --- sentry_sdk/integrations/ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index 25478aa203..bafd42c8d6 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -32,7 +32,7 @@ def _check_sentry_initialized(): if sentry_sdk.get_client().is_active(): return - logger.warning( + logger.debug( "[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded." )