Skip to content

Commit

Permalink
Fix clickhouse breadcrumbs (#3687)
Browse files Browse the repository at this point in the history
  • Loading branch information
sentrivana authored Nov 20, 2024
1 parent cae5a32 commit 83871a0
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 56 deletions.
85 changes: 42 additions & 43 deletions sentry_sdk/integrations/clickhouse_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
ensure_integration_enabled,
)

from typing import TYPE_CHECKING, TypeVar
from typing import TYPE_CHECKING, Any, Dict, TypeVar

# Hack to get new Python features working in older versions
# without introducing a hard dependency on `typing_extensions`
Expand Down Expand Up @@ -94,17 +94,17 @@ def _inner(*args: P.args, **kwargs: P.kwargs) -> T:

connection._sentry_span = span # type: ignore[attr-defined]

_set_db_data(span, connection)

if should_send_default_pii():
span.set_attribute("db.query.text", query)
data = _get_db_data(connection)
data["db.query.text"] = query

if query_id:
span.set_attribute("db.query_id", query_id)
data["db.query_id"] = query_id

if params and should_send_default_pii():
connection._sentry_db_params = params
span.set_attribute("db.params", _serialize_span_attribute(params))
data["db.params"] = params

connection._sentry_db_data = data # type: ignore[attr-defined]
_set_on_span(span, data)

# run the original code
ret = f(*args, **kwargs)
Expand All @@ -117,69 +117,68 @@ def _inner(*args: P.args, **kwargs: P.kwargs) -> T:
def _wrap_end(f: Callable[P, T]) -> Callable[P, T]:
def _inner_end(*args: P.args, **kwargs: P.kwargs) -> T:
res = f(*args, **kwargs)
instance = args[0]
span = getattr(instance.connection, "_sentry_span", None) # type: ignore[attr-defined]
connection = args[0].connection

span = getattr(connection, "_sentry_span", None) # type: ignore[attr-defined]
if span is not None:
data = getattr(connection, "_sentry_db_data", {})

if res is not None and should_send_default_pii():
data["db.result"] = res
span.set_attribute("db.result", _serialize_span_attribute(res))

with capture_internal_exceptions():
query = span.get_attribute("db.query.text")
query = data.pop("db.query.text", None)
if query:
data = {}
for attr in (
"db.params",
"db.result",
SPANDATA.DB_SYSTEM,
SPANDATA.DB_USER,
SPANDATA.SERVER_ADDRESS,
SPANDATA.SERVER_PORT,
):
if span.get_attribute(attr):
data[attr] = span.get_attribute(attr)

sentry_sdk.add_breadcrumb(
message=query, category="query", data=data
)

span.finish()

try:
del connection._sentry_db_data
del connection._sentry_span
except AttributeError:
pass

return res

return _inner_end


def _wrap_send_data(f: Callable[P, T]) -> Callable[P, T]:
def _inner_send_data(*args: P.args, **kwargs: P.kwargs) -> T:
instance = args[0] # type: clickhouse_driver.client.Client
data = args[2]
span = getattr(instance.connection, "_sentry_span", None)
connection = args[0].connection
db_params_data = args[2]
span = getattr(connection, "_sentry_span", None)

if span is not None:
_set_db_data(span, instance.connection)
data = _get_db_data(connection)
_set_on_span(span, data)

if should_send_default_pii():
db_params = (
getattr(instance.connection, "_sentry_db_params", None) or []
)
db_params.extend(data)
saved_db_data = getattr(connection, "_sentry_db_data", {})
db_params = saved_db_data.get("db.params") or []
db_params.extend(db_params_data)
saved_db_data["db.params"] = db_params
span.set_attribute("db.params", _serialize_span_attribute(db_params))
try:
del instance.connection._sentry_db_params
except AttributeError:
pass

return f(*args, **kwargs)

return _inner_send_data


def _set_db_data(
span: Span, connection: clickhouse_driver.connection.Connection
) -> None:
span.set_attribute(SPANDATA.DB_SYSTEM, "clickhouse")
span.set_attribute(SPANDATA.SERVER_ADDRESS, connection.host)
span.set_attribute(SPANDATA.SERVER_PORT, connection.port)
span.set_attribute(SPANDATA.DB_NAME, connection.database)
span.set_attribute(SPANDATA.DB_USER, connection.user)
def _get_db_data(connection: clickhouse_driver.connection.Connection) -> Dict[str, str]:
return {
SPANDATA.DB_SYSTEM: "clickhouse",
SPANDATA.SERVER_ADDRESS: connection.host,
SPANDATA.SERVER_PORT: connection.port,
SPANDATA.DB_NAME: connection.database,
SPANDATA.DB_USER: connection.user,
}


def _set_on_span(span: Span, data: Dict[str, Any]):
for key, value in data.items():
span.set_attribute(key, _serialize_span_attribute(value))
24 changes: 11 additions & 13 deletions tests/integrations/clickhouse_driver/test_clickhouse_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def test_clickhouse_client_breadcrumbs_with_pii(sentry_init, capture_events) ->
"db.user": "default",
"server.address": "localhost",
"server.port": 9000,
"db.params": '[{"x": 100}]',
"db.params": [{"x": 100}],
},
"message": "INSERT INTO test (x) VALUES",
"type": "default",
Expand All @@ -181,7 +181,7 @@ def test_clickhouse_client_breadcrumbs_with_pii(sentry_init, capture_events) ->
"db.user": "default",
"server.address": "localhost",
"server.port": 9000,
"db.params": "[[170], [200]]",
"db.params": [[170], [200]],
},
"message": "INSERT INTO test (x) VALUES",
"type": "default",
Expand All @@ -194,8 +194,8 @@ def test_clickhouse_client_breadcrumbs_with_pii(sentry_init, capture_events) ->
"db.user": "default",
"server.address": "localhost",
"server.port": 9000,
"db.result": "[[370]]",
"db.params": '{"minv": 150}',
"db.result": [[370]],
"db.params": {"minv": 150},
},
"message": "SELECT sum(x) FROM test WHERE x > 150",
"type": "default",
Expand Down Expand Up @@ -348,9 +348,7 @@ def test_clickhouse_client_spans(
assert event["spans"] == expected_spans


def test_clickhouse_client_spans_with_pii(
sentry_init, capture_events, capture_envelopes
) -> None:
def test_clickhouse_client_spans_with_pii(sentry_init, capture_events) -> None:
sentry_init(
integrations=[ClickhouseDriverIntegration()],
_experiments={"record_sql_params": True},
Expand Down Expand Up @@ -620,7 +618,7 @@ def test_clickhouse_dbapi_breadcrumbs_with_pii(sentry_init, capture_events) -> N
"db.user": "default",
"server.address": "localhost",
"server.port": 9000,
"db.result": "[[], []]",
"db.result": [[], []],
},
"message": "DROP TABLE IF EXISTS test",
"type": "default",
Expand All @@ -633,7 +631,7 @@ def test_clickhouse_dbapi_breadcrumbs_with_pii(sentry_init, capture_events) -> N
"db.user": "default",
"server.address": "localhost",
"server.port": 9000,
"db.result": "[[], []]",
"db.result": [[], []],
},
"message": "CREATE TABLE test (x Int32) ENGINE = Memory",
"type": "default",
Expand All @@ -646,7 +644,7 @@ def test_clickhouse_dbapi_breadcrumbs_with_pii(sentry_init, capture_events) -> N
"db.user": "default",
"server.address": "localhost",
"server.port": 9000,
"db.params": '[{"x": 100}]',
"db.params": [{"x": 100}],
},
"message": "INSERT INTO test (x) VALUES",
"type": "default",
Expand All @@ -659,7 +657,7 @@ def test_clickhouse_dbapi_breadcrumbs_with_pii(sentry_init, capture_events) -> N
"db.user": "default",
"server.address": "localhost",
"server.port": 9000,
"db.params": "[[170], [200]]",
"db.params": [[170], [200]],
},
"message": "INSERT INTO test (x) VALUES",
"type": "default",
Expand All @@ -672,8 +670,8 @@ def test_clickhouse_dbapi_breadcrumbs_with_pii(sentry_init, capture_events) -> N
"db.user": "default",
"server.address": "localhost",
"server.port": 9000,
"db.params": '{"minv": 150}',
"db.result": '[[["370"]], [["\'sum(x)\'", "\'Int64\'"]]]',
"db.params": {"minv": 150},
"db.result": [[["370"]], [["'sum(x)'", "'Int64'"]]],
},
"message": "SELECT sum(x) FROM test WHERE x > 150",
"type": "default",
Expand Down

0 comments on commit 83871a0

Please sign in to comment.