Skip to content

Commit

Permalink
Support SparkIntegration activation after SparkContext created (#3411)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Anton Pirker <[email protected]>
  • Loading branch information
seyoon-lim and antonpirker authored Dec 20, 2024
1 parent 54aede3 commit 6e4cc36
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 135 deletions.
121 changes: 74 additions & 47 deletions sentry_sdk/integrations/spark/spark_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Optional

from sentry_sdk._types import Event, Hint
from pyspark import SparkContext


class SparkIntegration(Integration):
Expand All @@ -17,7 +18,7 @@ class SparkIntegration(Integration):
@staticmethod
def setup_once():
# type: () -> None
patch_spark_context_init()
_setup_sentry_tracing()


def _set_app_properties():
Expand All @@ -37,7 +38,7 @@ def _set_app_properties():


def _start_sentry_listener(sc):
# type: (Any) -> None
# type: (SparkContext) -> None
"""
Start java gateway server to add custom `SparkListener`
"""
Expand All @@ -49,7 +50,51 @@ def _start_sentry_listener(sc):
sc._jsc.sc().addSparkListener(listener)


def patch_spark_context_init():
def _add_event_processor(sc):
# type: (SparkContext) -> None
scope = sentry_sdk.get_isolation_scope()

@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
return event

if sc._active_spark_context is None:
return event

event.setdefault("user", {}).setdefault("id", sc.sparkUser())

event.setdefault("tags", {}).setdefault(
"executor.id", sc._conf.get("spark.executor.id")
)
event["tags"].setdefault(
"spark-submit.deployMode",
sc._conf.get("spark.submit.deployMode"),
)
event["tags"].setdefault("driver.host", sc._conf.get("spark.driver.host"))
event["tags"].setdefault("driver.port", sc._conf.get("spark.driver.port"))
event["tags"].setdefault("spark_version", sc.version)
event["tags"].setdefault("app_name", sc.appName)
event["tags"].setdefault("application_id", sc.applicationId)
event["tags"].setdefault("master", sc.master)
event["tags"].setdefault("spark_home", sc.sparkHome)

event.setdefault("extra", {}).setdefault("web_url", sc.uiWebUrl)

return event


def _activate_integration(sc):
# type: (SparkContext) -> None

_start_sentry_listener(sc)
_set_app_properties()
_add_event_processor(sc)


def _patch_spark_context_init():
# type: () -> None
from pyspark import SparkContext

Expand All @@ -59,51 +104,22 @@ def patch_spark_context_init():
def _sentry_patched_spark_context_init(self, *args, **kwargs):
# type: (SparkContext, *Any, **Any) -> Optional[Any]
rv = spark_context_init(self, *args, **kwargs)
_start_sentry_listener(self)
_set_app_properties()

scope = sentry_sdk.get_isolation_scope()

@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
return event

if self._active_spark_context is None:
return event

event.setdefault("user", {}).setdefault("id", self.sparkUser())

event.setdefault("tags", {}).setdefault(
"executor.id", self._conf.get("spark.executor.id")
)
event["tags"].setdefault(
"spark-submit.deployMode",
self._conf.get("spark.submit.deployMode"),
)
event["tags"].setdefault(
"driver.host", self._conf.get("spark.driver.host")
)
event["tags"].setdefault(
"driver.port", self._conf.get("spark.driver.port")
)
event["tags"].setdefault("spark_version", self.version)
event["tags"].setdefault("app_name", self.appName)
event["tags"].setdefault("application_id", self.applicationId)
event["tags"].setdefault("master", self.master)
event["tags"].setdefault("spark_home", self.sparkHome)

event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)

return event

_activate_integration(self)
return rv

SparkContext._do_init = _sentry_patched_spark_context_init


def _setup_sentry_tracing():
# type: () -> None
from pyspark import SparkContext

if SparkContext._active_spark_context is not None:
_activate_integration(SparkContext._active_spark_context)
return
_patch_spark_context_init()


class SparkListener:
def onApplicationEnd(self, applicationEnd): # noqa: N802,N803
# type: (Any) -> None
Expand Down Expand Up @@ -208,10 +224,21 @@ class Java:


class SentryListener(SparkListener):
def _add_breadcrumb(
self,
level, # type: str
message, # type: str
data=None, # type: Optional[dict[str, Any]]
):
# type: (...) -> None
sentry_sdk.get_global_scope().add_breadcrumb(
level=level, message=message, data=data
)

def onJobStart(self, jobStart): # noqa: N802,N803
# type: (Any) -> None
message = "Job {} Started".format(jobStart.jobId())
sentry_sdk.add_breadcrumb(level="info", message=message)
self._add_breadcrumb(level="info", message=message)
_set_app_properties()

def onJobEnd(self, jobEnd): # noqa: N802,N803
Expand All @@ -227,14 +254,14 @@ def onJobEnd(self, jobEnd): # noqa: N802,N803
level = "warning"
message = "Job {} Failed".format(jobEnd.jobId())

sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
self._add_breadcrumb(level=level, message=message, data=data)

def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803
# type: (Any) -> None
stage_info = stageSubmitted.stageInfo()
message = "Stage {} Submitted".format(stage_info.stageId())
data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}
sentry_sdk.add_breadcrumb(level="info", message=message, data=data)
self._add_breadcrumb(level="info", message=message, data=data)
_set_app_properties()

def onStageCompleted(self, stageCompleted): # noqa: N802,N803
Expand All @@ -255,4 +282,4 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803
message = "Stage {} Completed".format(stage_info.stageId())
level = "info"

sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
self._add_breadcrumb(level=level, message=message, data=data)
1 change: 0 additions & 1 deletion tests/integrations/asgi/test_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ async def app(scope, receive, send):

@pytest.fixture
def asgi3_custom_transaction_app():

async def app(scope, receive, send):
sentry_sdk.get_current_scope().set_transaction_name("foobar", source="custom")
await send(
Expand Down
Loading

0 comments on commit 6e4cc36

Please sign in to comment.