diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 02c08c2a9e..95ab4ad0fa 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -39,8 +39,27 @@ def setup_sentry(transport=None): ) +def read_error_from_log(job_id): + log_dir = "/tmp/ray/session_latest/logs/" + log_file = [ + f + for f in os.listdir(log_dir) + if "worker" in f and job_id in f and f.endswith(".out") + ][0] + with open(os.path.join(log_dir, log_file), "r") as file: + lines = file.readlines() + + try: + # parse error object from log line + error = json.loads(lines[4][:-1]) + except IndexError: + error = None + + return error + + @pytest.mark.forked -def test_ray_tracing(): +def test_tracing_in_ray_tasks(): setup_sentry() ray.init( @@ -50,6 +69,7 @@ def test_ray_tracing(): } ) + # Setup ray task @ray.remote def example_task(): with sentry_sdk.start_span(op="task", name="example task step"): @@ -62,63 +82,42 @@ def example_task(): client_envelope = sentry_sdk.get_client().transport.envelopes[0] client_transaction = client_envelope.get_transaction_event() + assert client_transaction["transaction"] == "ray test transaction" + assert client_transaction["transaction_info"] == {"source": "custom"} + worker_envelope = worker_envelopes[0] worker_transaction = worker_envelope.get_transaction_event() - assert ( - client_transaction["contexts"]["trace"]["trace_id"] - == client_transaction["contexts"]["trace"]["trace_id"] + worker_transaction["transaction"] + == "tests.integrations.ray.test_ray.test_tracing_in_ray_tasks..example_task" ) + assert worker_transaction["transaction_info"] == {"source": "task"} - for span in client_transaction["spans"]: - assert ( - span["trace_id"] - == client_transaction["contexts"]["trace"]["trace_id"] - == client_transaction["contexts"]["trace"]["trace_id"] - ) - - for span in worker_transaction["spans"]: - assert ( - span["trace_id"] - == client_transaction["contexts"]["trace"]["trace_id"] - == client_transaction["contexts"]["trace"]["trace_id"] - ) - - -@pytest.mark.forked -def test_ray_spans(): - setup_sentry() - - ray.init( - runtime_env={ - "worker_process_setup_hook": setup_sentry, - "working_dir": "./", - } + (span,) = client_transaction["spans"] + assert span["op"] == "queue.submit.ray" + assert span["origin"] == "auto.queue.ray" + assert ( + span["description"] + == "tests.integrations.ray.test_ray.test_tracing_in_ray_tasks..example_task" ) + assert span["parent_span_id"] == client_transaction["contexts"]["trace"]["span_id"] + assert span["trace_id"] == client_transaction["contexts"]["trace"]["trace_id"] - @ray.remote - def example_task(): - return sentry_sdk.get_client().transport.envelopes + (span,) = worker_transaction["spans"] + assert span["op"] == "task" + assert span["origin"] == "manual" + assert span["description"] == "example task step" + assert span["parent_span_id"] == worker_transaction["contexts"]["trace"]["span_id"] + assert span["trace_id"] == worker_transaction["contexts"]["trace"]["trace_id"] - with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - worker_envelopes = ray.get(example_task.remote()) - - client_envelope = sentry_sdk.get_client().transport.envelopes[0] - client_transaction = client_envelope.get_transaction_event() - worker_envelope = worker_envelopes[0] - worker_transaction = worker_envelope.get_transaction_event() - - for span in client_transaction["spans"]: - assert span["op"] == "queue.submit.ray" - assert span["origin"] == "auto.queue.ray" - - for span in worker_transaction["spans"]: - assert span["op"] == "queue.task.ray" - assert span["origin"] == "auto.queue.ray" + assert ( + client_transaction["contexts"]["trace"]["trace_id"] + == worker_transaction["contexts"]["trace"]["trace_id"] + ) @pytest.mark.forked -def test_ray_errors(): +def test_errors_in_ray_tasks(): setup_sentry_with_logging_transport() ray.init( @@ -128,6 +127,7 @@ def test_ray_errors(): } ) + # Setup ray task @ray.remote def example_task(): 1 / 0 @@ -138,30 +138,19 @@ def example_task(): ray.get(future) job_id = future.job_id().hex() - - # Read the worker log output containing the error - log_dir = "/tmp/ray/session_latest/logs/" - log_file = [ - f - for f in os.listdir(log_dir) - if "worker" in f and job_id in f and f.endswith(".out") - ][0] - with open(os.path.join(log_dir, log_file), "r") as file: - lines = file.readlines() - # parse error object from log line - error = json.loads(lines[4][:-1]) + error = read_error_from_log(job_id) assert error["level"] == "error" assert ( error["transaction"] - == "tests.integrations.ray.test_ray.test_ray_errors..example_task" - ) # its in the worker, not the client thus not "ray test transaction" + == "tests.integrations.ray.test_ray.test_errors_in_ray_tasks..example_task" + ) assert error["exception"]["values"][0]["mechanism"]["type"] == "ray" assert not error["exception"]["values"][0]["mechanism"]["handled"] @pytest.mark.forked -def test_ray_actor(): +def test_tracing_in_ray_actors(): setup_sentry() ray.init( @@ -171,13 +160,14 @@ def test_ray_actor(): } ) + # Setup ray actor @ray.remote class Counter: def __init__(self): self.n = 0 def increment(self): - with sentry_sdk.start_span(op="task", name="example task step"): + with sentry_sdk.start_span(op="task", name="example actor execution"): self.n += 1 return sentry_sdk.get_client().transport.envelopes @@ -186,20 +176,47 @@ def increment(self): counter = Counter.remote() worker_envelopes = ray.get(counter.increment.remote()) - # Currently no transactions/spans are captured in actors - assert worker_envelopes == [] - client_envelope = sentry_sdk.get_client().transport.envelopes[0] client_transaction = client_envelope.get_transaction_event() - assert ( - client_transaction["contexts"]["trace"]["trace_id"] - == client_transaction["contexts"]["trace"]["trace_id"] + # Spans for submitting the actor task are not created (actors are not supported yet) + assert client_transaction["spans"] == [] + + # Transaction are not yet created when executing ray actors (actors are not supported yet) + assert worker_envelopes == [] + + +@pytest.mark.forked +def test_errors_in_ray_actors(): + setup_sentry_with_logging_transport() + + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry_with_logging_transport, + "working_dir": "./", + } ) - for span in client_transaction["spans"]: - assert ( - span["trace_id"] - == client_transaction["contexts"]["trace"]["trace_id"] - == client_transaction["contexts"]["trace"]["trace_id"] - ) + # Setup ray actor + @ray.remote + class Counter: + def __init__(self): + self.n = 0 + + def increment(self): + with sentry_sdk.start_span(op="task", name="example actor execution"): + 1 / 0 + + return sentry_sdk.get_client().transport.envelopes + + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + with pytest.raises(ZeroDivisionError): + counter = Counter.remote() + future = counter.increment.remote() + ray.get(future) + + job_id = future.job_id().hex() + error = read_error_from_log(job_id) + + # We do not capture errors in ray actors yet + assert error is None