Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ray integration support (#2400) #2444

Merged
merged 21 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/test-integrations-data-processing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ jobs:
run: |
set -x # print commands that are executed
./scripts/runtox.sh "py${{ matrix.python-version }}-huey-latest"
- name: Test ray latest
run: |
set -x # print commands that are executed
./scripts/runtox.sh "py${{ matrix.python-version }}-ray-latest"
- name: Test rq latest
run: |
set -x # print commands that are executed
Expand Down Expand Up @@ -139,6 +143,10 @@ jobs:
run: |
set -x # print commands that are executed
./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-huey"
- name: Test ray pinned
run: |
set -x # print commands that are executed
./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-ray"
- name: Test rq pinned
run: |
set -x # print commands that are executed
Expand Down
1 change: 1 addition & 0 deletions scripts/split-tox-gh-actions/split-tox-gh-actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"celery",
"dramatiq",
"huey",
"ray",
"rq",
"spark",
],
Expand Down
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ class OP:
QUEUE_TASK_RQ = "queue.task.rq"
QUEUE_SUBMIT_HUEY = "queue.submit.huey"
QUEUE_TASK_HUEY = "queue.task.huey"
QUEUE_SUBMIT_RAY = "queue.submit.ray"
QUEUE_TASK_RAY = "queue.task.ray"
SUBPROCESS = "subprocess"
SUBPROCESS_WAIT = "subprocess.wait"
SUBPROCESS_COMMUNICATE = "subprocess.communicate"
Expand Down
146 changes: 146 additions & 0 deletions sentry_sdk/integrations/ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import inspect
import sys

import sentry_sdk
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import (
event_from_exception,
logger,
package_version,
qualname_from_function,
reraise,
)

try:
import ray # type: ignore[import-not-found]
except ImportError:
raise DidNotEnable("Ray not installed.")
import functools

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Optional
from sentry_sdk.utils import ExcInfo


def _check_sentry_initialized():
# type: () -> None
if sentry_sdk.get_client().is_active():
return

logger.debug(
"[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded."
)


def _patch_ray_remote():
# type: () -> None
old_remote = ray.remote

@functools.wraps(old_remote)
def new_remote(f, *args, **kwargs):
# type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any]
if inspect.isclass(f):
# Ray Actors
# (https://docs.ray.io/en/latest/ray-core/actors.html)
# are not supported
# (Only Ray Tasks are supported)
return old_remote(f, *args, *kwargs)

def _f(*f_args, _tracing=None, **f_kwargs):
# type: (Any, Optional[dict[str, Any]], Any) -> Any
"""
Ray Worker
"""
_check_sentry_initialized()

transaction = sentry_sdk.continue_trace(
_tracing or {},
op=OP.QUEUE_TASK_RAY,
name=qualname_from_function(f),
origin=RayIntegration.origin,
source=TRANSACTION_SOURCE_TASK,
)

with sentry_sdk.start_transaction(transaction) as transaction:
try:
result = f(*f_args, **f_kwargs)
transaction.set_status(SPANSTATUS.OK)
except Exception:
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)

return result

rv = old_remote(_f, *args, *kwargs)
old_remote_method = rv.remote

def _remote_method_with_header_propagation(*args, **kwargs):
# type: (*Any, **Any) -> Any
"""
Ray Client
"""
with sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_RAY,
description=qualname_from_function(f),
origin=RayIntegration.origin,
) as span:
tracing = {
k: v
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
}
try:
result = old_remote_method(*args, **kwargs, _tracing=tracing)
span.set_status(SPANSTATUS.OK)
except Exception:
span.set_status(SPANSTATUS.INTERNAL_ERROR)
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)

return result

rv.remote = _remote_method_with_header_propagation

return rv

ray.remote = new_remote


def _capture_exception(exc_info, **kwargs):
# type: (ExcInfo, **Any) -> None
client = sentry_sdk.get_client()

event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={
"handled": False,
"type": RayIntegration.identifier,
},
)
sentry_sdk.capture_event(event, hint=hint)


class RayIntegration(Integration):
identifier = "ray"
origin = f"auto.queue.{identifier}"

@staticmethod
def setup_once():
# type: () -> None
version = package_version("ray")

if version is None:
raise DidNotEnable("Unparsable ray version: {}".format(version))

if version < (2, 7, 0):
raise DidNotEnable("Ray 2.7.0 or newer required")

_patch_ray_remote()
3 changes: 3 additions & 0 deletions tests/integrations/ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("ray")
205 changes: 205 additions & 0 deletions tests/integrations/ray/test_ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import json
import os
import pytest

import ray

import sentry_sdk
from sentry_sdk.envelope import Envelope
from sentry_sdk.integrations.ray import RayIntegration
from tests.conftest import TestTransport


class RayTestTransport(TestTransport):
def __init__(self):
self.envelopes = []
super().__init__()

def capture_envelope(self, envelope: Envelope) -> None:
self.envelopes.append(envelope)


class RayLoggingTransport(TestTransport):
def __init__(self):
super().__init__()

def capture_envelope(self, envelope: Envelope) -> None:
print(envelope.serialize().decode("utf-8", "replace"))


def setup_sentry_with_logging_transport():
setup_sentry(transport=RayLoggingTransport())


def setup_sentry(transport=None):
sentry_sdk.init(
integrations=[RayIntegration()],
transport=RayTestTransport() if transport is None else transport,
traces_sample_rate=1.0,
)


@pytest.mark.forked
def test_ray_tracing():
setup_sentry()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry,
"working_dir": "./",
}
)

@ray.remote
def example_task():
with sentry_sdk.start_span(op="task", description="example task step"):
...

return sentry_sdk.get_client().transport.envelopes

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
worker_envelopes = ray.get(example_task.remote())

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()
worker_envelope = worker_envelopes[0]
worker_transaction = worker_envelope.get_transaction_event()

assert (
client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)

for span in client_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)

for span in worker_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)


@pytest.mark.forked
def test_ray_spans():
setup_sentry()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry,
"working_dir": "./",
}
)

@ray.remote
def example_task():
return sentry_sdk.get_client().transport.envelopes

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
worker_envelopes = ray.get(example_task.remote())

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()
worker_envelope = worker_envelopes[0]
worker_transaction = worker_envelope.get_transaction_event()

for span in client_transaction["spans"]:
assert span["op"] == "queue.submit.ray"
assert span["origin"] == "auto.queue.ray"

for span in worker_transaction["spans"]:
assert span["op"] == "queue.task.ray"
assert span["origin"] == "auto.queue.ray"


@pytest.mark.forked
def test_ray_errors():
setup_sentry_with_logging_transport()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry_with_logging_transport,
"working_dir": "./",
}
)

@ray.remote
def example_task():
1 / 0

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
with pytest.raises(ZeroDivisionError):
future = example_task.remote()
ray.get(future)

job_id = future.job_id().hex()

# Read the worker log output containing the error
log_dir = "/tmp/ray/session_latest/logs/"
log_file = [
f
for f in os.listdir(log_dir)
if "worker" in f and job_id in f and f.endswith(".out")
][0]
with open(os.path.join(log_dir, log_file), "r") as file:
lines = file.readlines()
# parse error object from log line
error = json.loads(lines[4][:-1])

assert error["level"] == "error"
assert (
error["transaction"]
== "tests.integrations.ray.test_ray.test_ray_errors.<locals>.example_task"
) # its in the worker, not the client thus not "ray test transaction"
assert error["exception"]["values"][0]["mechanism"]["type"] == "ray"
assert not error["exception"]["values"][0]["mechanism"]["handled"]


@pytest.mark.forked
def test_ray_actor():
setup_sentry()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry,
"working_dir": "./",
}
)

@ray.remote
class Counter(object):
def __init__(self):
self.n = 0

def increment(self):
with sentry_sdk.start_span(op="task", description="example task step"):
self.n += 1

return sentry_sdk.get_client().transport.envelopes

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
counter = Counter.remote()
worker_envelopes = ray.get(counter.increment.remote())

# Currently no transactions/spans are captured in actors
assert worker_envelopes == []

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()

assert (
client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)

for span in client_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)
Loading
Loading