-
Notifications
You must be signed in to change notification settings - Fork 517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support SparkIntegration activation after SparkContext created #3411
Support SparkIntegration activation after SparkContext created #3411
Conversation
849069e
to
4c673d6
Compare
c3a5982
to
2f0d7be
Compare
Codecov ReportAttention: Patch coverage is
✅ All tests successful. No failed tests found.
Additional details and impacted files@@ Coverage Diff @@
## master #3411 +/- ##
==========================================
- Coverage 79.97% 79.91% -0.06%
==========================================
Files 139 139
Lines 15445 15458 +13
Branches 2624 2625 +1
==========================================
+ Hits 12352 12354 +2
- Misses 2219 2232 +13
+ Partials 874 872 -2
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😎
scope = sentry_sdk.get_isolation_scope() | ||
|
||
@scope.add_event_processor | ||
def process_event(event, hint): | ||
# type: (Event, Hint) -> Optional[Event] | ||
with capture_internal_exceptions(): | ||
if sentry_sdk.get_client().get_integration(SparkIntegration) is None: | ||
return event | ||
|
||
if self._active_spark_context is None: | ||
return event | ||
|
||
event.setdefault("user", {}).setdefault("id", self.sparkUser()) | ||
|
||
event.setdefault("tags", {}).setdefault( | ||
"executor.id", self._conf.get("spark.executor.id") | ||
) | ||
event["tags"].setdefault( | ||
"spark-submit.deployMode", | ||
self._conf.get("spark.submit.deployMode"), | ||
) | ||
event["tags"].setdefault( | ||
"driver.host", self._conf.get("spark.driver.host") | ||
) | ||
event["tags"].setdefault( | ||
"driver.port", self._conf.get("spark.driver.port") | ||
) | ||
event["tags"].setdefault("spark_version", self.version) | ||
event["tags"].setdefault("app_name", self.appName) | ||
event["tags"].setdefault("application_id", self.applicationId) | ||
event["tags"].setdefault("master", self.master) | ||
event["tags"].setdefault("spark_home", self.sparkHome) | ||
|
||
event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl) | ||
|
||
return event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have separated this part into a distinct function.
def _activate_integration(sc): | ||
# type: (SparkContext) -> None | ||
from pyspark import SparkContext | ||
|
||
_start_sentry_listener(sc) | ||
_set_app_properties() | ||
_add_event_processor(sc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have bundled the functions that need to be executed using integration.
if SparkContext._active_spark_context is not None: | ||
_activate_integration(SparkContext._active_spark_context) | ||
return | ||
_patch_spark_context_init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the Spark context already exists, _activate_integration
is called instead of applying the patch.
I have tested this change with both Spark master local and YARN, and there's currently an issue with applying Sentry after the session is created. The problem is that the breadcrumbs are not being sent, and I think the issue might be related to the scope. Unlike when it works properly, the scope gets set to None at one point and then its address changes. The listener is working correctly. I would appreciate any help you can provide! master local🟢 sentry_init before create spark session
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration
if __name__ == "__main__":
sentry_sdk.init(
integrations=[SparkIntegration()],
dsn="{{ my_dsn }}",
)
spark = SparkSession.builder \
.appName("Simple Example") \
.master("local[*]") \
.getOrCreate()
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
result_rdd = rdd.map(lambda x: x * x)
result = result_rdd.collect()
print(result)
spark.read.csv("/path/deos/not/exist/error/raise") 🔴 sentry_init after create spark session
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration
if __name__ == "__main__":
spark = SparkSession.builder \
.appName("Simple Example") \
.master("local[*]") \
.getOrCreate()
sentry_sdk.init(
integrations=[SparkIntegration()],
dsn="{{ my_dsn }}",
)
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
result_rdd = rdd.map(lambda x: x * x)
result = result_rdd.collect()
print(result)
spark.read.csv("/path/deos/not/exist/error/raise") master yarn🟢 sentry_init before create spark session
🔴 sentry_init after create spark session
|
The cause seems to be that the scope used when 🟢 sentry init before create session ============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = None
********** breadcrumb = None
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 4372678016
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 4372678016
24/08/10 22:09:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 4372678016
============== _activate_integration(sc) <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 6147485696
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26803, thread ident: 6164312064
========== onJobStart branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 6164312064
========== onStageSubmitted branch feature/support-exists-context
[1, 4, 9, 16, 25]
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 6164312064
========== onStageCompleted branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 6164312064
========== onJobEnd branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 35407, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 35407, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 35407, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 35407, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 4372678016
Traceback (most recent call last):
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py", line 23, in <module>
spark.read.csv("/path/deos/not/exist/error/raise")
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise. 🔴 sentry init after create session ============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = None
********** breadcrumb = None
--------- pid: 26871, thread ident: 4335519104
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/10 22:09:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26871, thread ident: 4335519104
============== _activate_integration(sc) <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = None
********** breadcrumb = None
--------- pid: 26871, thread ident: 6185103360
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1059a5d40 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26871, thread ident: 6201929728
========== onJobStart branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1059a5d40 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 512980, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26871, thread ident: 6201929728
========== onStageSubmitted branch feature/support-exists-context
[1, 4, 9, 16, 25]
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1059a5d40 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 512980, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:21'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 535910, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26871, thread ident: 6201929728
========== onStageCompleted branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1059a5d40 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 512980, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:21'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 535910, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:21'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 51, 349502, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26871, thread ident: 6201929728
========== onJobEnd branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() = <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb = deque([])
--------- pid: 26871, thread ident: 4335519104
Traceback (most recent call last):
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py", line 24, in <module>
spark.read.csv("/path/deos/not/exist/error/raise")
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise. |
This is my temporary fix logic. I tested it, and it works well with these changes (master |
6323b2a
to
541a00f
Compare
541a00f
to
33e22b5
Compare
data=None, # type: Optional[dict[str, Any]] | ||
): | ||
# type: (...) -> None | ||
Scope.set_isolation_scope(Scope.get_global_scope()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not be setting the isolation scope to the global scope.
So I can suggest a better alternative, what are you trying to accomplish here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szokeasaurusrex
First, thank you for the review!
-
Summary: When sentry_init is called after SparkContext has been created, the breadcrumbs are not transmitted (if sentry_init is called before SparkContext is created, it works fine). To resolve this issue, I set the isolation_scope to global_scope, and as a result, confirmed that the breadcrumbs are being properly transmitted.
-
Issue: If sentry_init is invoked after SparkContext has been created, the breadcrumbs in the thread handling error raising contain no data.
-
Suspected Reason: When add_breadcrumb is called within the SparkListener, it seems to store the breadcrumb in a separate scope that is not the same scope handling exceptions.
(id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs)
is different) -
Verification: Inserted print statements at the relevant points of the code.
print insert
class SentryListener(SparkListener):
def _add_breadcrumb(
self,
level, # type: str
message, # type: str
data=None, # type: Optional[dict[str, Any]]
):
# type: (...) -> None
# Scope.set_isolation_scope(Scope.get_global_scope())
print(f"* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: {os.getpid()}, current thread: {threading.get_ident()}")
print(f"** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: {sentry_sdk.Scope.get_isolation_scope()._breadcrumbs}")
print(f"*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): {id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs)}")
sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
def capture_sql_exception(f: Callable[..., Any]) -> Callable[..., Any]:
def deco(*a: Any, **kw: Any) -> Any:
try:
return f(*a, **kw)
except Py4JJavaError as e:
converted = convert_exception(e.java_exception)
import sentry_sdk
import os
import threading
print(f"- pyspark/errors/exceptions/captuted.py current pid: {os.getpid()}, current thread: {threading.get_ident()}")
print(f"-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: {sentry_sdk.Scope.get_isolation_scope()._breadcrumbs}")
print(f"--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): {id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs)}")
if not isinstance(converted, UnknownException):
# Hide where the exception came from that shows a non-Pythonic
# JVM exception message.
raise converted from None
else:
raise
return deco
test
- When sentry_init is called after SparkContext has been created
code
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration
import os
import threading
if __name__ == "__main__":
spark = SparkSession.builder \
.appName("Simple Example") \
.master("local[*]") \
.getOrCreate()
sentry_sdk.init(
integrations=[SparkIntegration()],
dsn="",
)
print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
result_rdd = rdd.map(lambda x: x * x)
result = result_rdd.collect()
print(result)
print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")
spark.read.csv("/path/deos/not/exist/error/raise")
output
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/27 23:54:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
====== main() pid: 19639, current thread: 4306142592
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
[1, 4, 9, 16, 25]
====== main() pid: 19639, current thread: 4306142592
- pyspark/errors/exceptions/captuted.py current pid: 19639, current thread: 6232780800
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 435786, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 435786, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 435786, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 29, 173172, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
- pyspark/errors/exceptions/captuted.py current pid: 19639, current thread: 4306142592
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4372017600
Traceback (most recent call last):
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py", line 28, in <module>
spark.read.csv("/path/deos/not/exist/error/raise")
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 181, in deco
raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise.
Process finished with exit code 1
- When sentry_init is called before SparkContext has been created
code
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration
import os
import threading
if __name__ == "__main__":
sentry_sdk.init(
integrations=[SparkIntegration()],
dsn="",
)
print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")
spark = SparkSession.builder \
.appName("Simple Example") \
.master("local[*]") \
.getOrCreate()
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
result_rdd = rdd.map(lambda x: x * x)
result = result_rdd.collect()
print(result)
print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")
spark.read.csv("/path/deos/not/exist/error/raise")
output
====== main() pid: 19741, current thread: 4370892160
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/27 23:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
[1, 4, 9, 16, 25]
====== main() pid: 19741, current thread: 4370892160
- pyspark/errors/exceptions/captuted.py current pid: 19741, current thread: 6166802432
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 38, 718804, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
- pyspark/errors/exceptions/captuted.py current pid: 19741, current thread: 4370892160
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 38, 718804, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 38, 721299, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
Traceback (most recent call last):
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py", line 28, in <module>
spark.read.csv("/path/deos/not/exist/error/raise")
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 181, in deco
raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise.
Process finished with exit code 1
If you have any questions, please feel free to let me know!
Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. We need to find a different solution here though, because we cannot set the global scope to the isolation scope. Doing so will likely mess up isolation elsewhere, and cause data unrelated to other events to be sent along with them.
Maybe we need to fork the isolation or current scope somewhere in the Spark integration? I can also try to take a look at this later if you are struggling to figure out how to avoid setting the global scope to the isolation scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see.
I will look into this further and work on fixing the issue.
I will update you after conducting some more tests.
Besides the item I mentioned above, this PR looks mostly good. Once it is addressed, I will re-review, and hopefully we can merge the PR then. Please also fix the failing linter CI action. Thanks for the contribution! |
f8defa0
to
1828149
Compare
This issue has gone three weeks without activity. In another week, I will close it. But! If you comment or otherwise update it, I will reset the clock, and if you remove the label "A weed is but an unloved flower." ― Ella Wheeler Wilcox 🥀 |
@szokeasaurusrex resetting the clock on this one, let's try to get it in if it's okay now |
@sl0thentr0py this comment has still not been addressed, the PR still sets the global scope as the isolation scope, which as far as I am aware is a big no-no. @seyoon-lim are you still interested in working on this? If not, it might be time for our team to take over work on this PR |
@sl0thentr0py @szokeasaurusrex Hey, firstly, thanks for reminding me about this PR! In my use case, I haven't noticed any major issues with the current state of the PR, so I haven't looked into it further! It would be great if this issue could be taken over and the above functionality could be used in an official version of Sentry someday—that would be very, very welcome! Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey! I have picked that up and have a small suggestion, that probably fixes the remaining problem.
Hey @seyoon-lim, |
@seyoon-lim could you give me permission to push to your branch? Then I can push my fixes for the failing tests. Thanks. |
…ters/sentry-python into pr/seyoon-lim/3411
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scopes thing is fixed, so I guess this is good now.
@@ -128,7 +128,6 @@ async def app(scope, receive, send): | |||
|
|||
@pytest.fixture | |||
def asgi3_custom_transaction_app(): | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] this whitespace change is out of scope for the PR (guessing it was accidentally committed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Hey @seyoon-lim thanks again for the contribution! It took a while to merge it, but we want to make sure everything we merge is of good quality |
@antonpirker , @szokeasaurusrex |
I created an issue #3410
Thank you for contributing to
sentry-python
! Please add tests to validate your changes, and lint your code usingtox -e linters
.Running the test suite on your PR might require maintainer approval. The AWS Lambda tests additionally require a maintainer to add a special label, and they will fail until this label is added.