diff --git a/sentry_sdk/integrations/redis/__init__.py b/sentry_sdk/integrations/redis/__init__.py index 07e08ccd7a..e09f9ccea4 100644 --- a/sentry_sdk/integrations/redis/__init__.py +++ b/sentry_sdk/integrations/redis/__init__.py @@ -13,7 +13,13 @@ ) if TYPE_CHECKING: + from collections.abc import Callable from typing import Any, Dict, Sequence + from redis import Redis, RedisCluster + from redis.asyncio.cluster import ( + RedisCluster as AsyncRedisCluster, + ClusterPipeline as AsyncClusterPipeline, + ) from sentry_sdk.tracing import Span _SINGLE_KEY_COMMANDS = frozenset( @@ -83,8 +89,7 @@ def _set_pipeline_data( ): # type: (Span, bool, Any, bool, Sequence[Any]) -> None span.set_tag("redis.is_cluster", is_cluster) - transaction = is_transaction if not is_cluster else False - span.set_tag("redis.transaction", transaction) + span.set_tag("redis.transaction", is_transaction) commands = [] for i, arg in enumerate(command_stack): @@ -118,7 +123,7 @@ def _set_client_data(span, is_cluster, name, *args): span.set_tag("redis.key", args[0]) -def _set_db_data(span, connection_params): +def _set_db_data_on_span(span, connection_params): # type: (Span, Dict[str, Any]) -> None span.set_data(SPANDATA.DB_SYSTEM, "redis") @@ -135,8 +140,43 @@ def _set_db_data(span, connection_params): span.set_data(SPANDATA.SERVER_PORT, port) -def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn): - # type: (Any, bool, Any) -> None +def _set_db_data(span, redis_instance): + # type: (Span, Redis[Any]) -> None + try: + _set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs) + except AttributeError: + pass # connections_kwargs may be missing in some cases + + +def _set_cluster_db_data(span, redis_cluster_instance): + # type: (Span, RedisCluster[Any]) -> None + default_node = redis_cluster_instance.get_default_node() + if default_node is not None: + _set_db_data_on_span( + span, {"host": default_node.host, "port": default_node.port} + ) + + +def _set_async_cluster_db_data(span, async_redis_cluster_instance): + # type: (Span, AsyncRedisCluster[Any]) -> None + default_node = async_redis_cluster_instance.get_default_node() + if default_node is not None and default_node.connection_kwargs is not None: + _set_db_data_on_span(span, default_node.connection_kwargs) + + +def _set_async_cluster_pipeline_db_data(span, async_redis_cluster_pipeline_instance): + # type: (Span, AsyncClusterPipeline[Any]) -> None + with capture_internal_exceptions(): + _set_async_cluster_db_data( + span, + # the AsyncClusterPipeline has always had a `_client` attr but it is private so potentially problematic and mypy + # does not recognize it - see https://github.com/redis/redis-py/blame/v5.0.0/redis/asyncio/cluster.py#L1386 + async_redis_cluster_pipeline_instance._client, # type: ignore[attr-defined] + ) + + +def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn): + # type: (Any, bool, Any, Callable[[Span, Any], None]) -> None old_execute = pipeline_cls.execute def sentry_patched_execute(self, *args, **kwargs): @@ -150,12 +190,12 @@ def sentry_patched_execute(self, *args, **kwargs): op=OP.DB_REDIS, description="redis.pipeline.execute" ) as span: with capture_internal_exceptions(): - _set_db_data(span, self.connection_pool.connection_kwargs) + set_db_data_fn(span, self) _set_pipeline_data( span, is_cluster, get_command_args_fn, - self.transaction, + False if is_cluster else self.transaction, self.command_stack, ) @@ -164,8 +204,8 @@ def sentry_patched_execute(self, *args, **kwargs): pipeline_cls.execute = sentry_patched_execute -def patch_redis_client(cls, is_cluster): - # type: (Any, bool) -> None +def patch_redis_client(cls, is_cluster, set_db_data_fn): + # type: (Any, bool, Callable[[Span, Any], None]) -> None """ This function can be used to instrument custom redis client classes or subclasses. @@ -189,11 +229,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs): description = description[: integration.max_data_size - len("...")] + "..." with hub.start_span(op=OP.DB_REDIS, description=description) as span: - try: - _set_db_data(span, self.connection_pool.connection_kwargs) - except AttributeError: - pass # connections_kwargs may be missing in some cases - + set_db_data_fn(span, self) _set_client_data(span, is_cluster, name, *args) return old_execute_command(self, name, *args, **kwargs) @@ -203,14 +239,16 @@ def sentry_patched_execute_command(self, name, *args, **kwargs): def _patch_redis(StrictRedis, client): # noqa: N803 # type: (Any, Any) -> None - patch_redis_client(StrictRedis, is_cluster=False) - patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args) + patch_redis_client(StrictRedis, is_cluster=False, set_db_data_fn=_set_db_data) + patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args, _set_db_data) try: strict_pipeline = client.StrictPipeline except AttributeError: pass else: - patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args) + patch_redis_pipeline( + strict_pipeline, False, _get_redis_command_args, _set_db_data + ) try: import redis.asyncio @@ -222,8 +260,56 @@ def _patch_redis(StrictRedis, client): # noqa: N803 patch_redis_async_pipeline, ) - patch_redis_async_client(redis.asyncio.client.StrictRedis) - patch_redis_async_pipeline(redis.asyncio.client.Pipeline) + patch_redis_async_client( + redis.asyncio.client.StrictRedis, + is_cluster=False, + set_db_data_fn=_set_db_data, + ) + patch_redis_async_pipeline( + redis.asyncio.client.Pipeline, + False, + _get_redis_command_args, + set_db_data_fn=_set_db_data, + ) + + +def _patch_redis_cluster(): + # type: () -> None + """Patches the cluster module on redis SDK (as opposed to rediscluster library)""" + try: + from redis import RedisCluster, cluster + except ImportError: + pass + else: + patch_redis_client(RedisCluster, True, _set_cluster_db_data) + patch_redis_pipeline( + cluster.ClusterPipeline, + True, + _parse_rediscluster_command, + _set_cluster_db_data, + ) + + try: + from redis.asyncio import cluster as async_cluster + except ImportError: + pass + else: + from sentry_sdk.integrations.redis.asyncio import ( + patch_redis_async_client, + patch_redis_async_pipeline, + ) + + patch_redis_async_client( + async_cluster.RedisCluster, + is_cluster=True, + set_db_data_fn=_set_async_cluster_db_data, + ) + patch_redis_async_pipeline( + async_cluster.ClusterPipeline, + True, + _parse_rediscluster_command, + set_db_data_fn=_set_async_cluster_pipeline_db_data, + ) def _patch_rb(): @@ -233,9 +319,15 @@ def _patch_rb(): except ImportError: pass else: - patch_redis_client(rb.clients.FanoutClient, is_cluster=False) - patch_redis_client(rb.clients.MappingClient, is_cluster=False) - patch_redis_client(rb.clients.RoutingClient, is_cluster=False) + patch_redis_client( + rb.clients.FanoutClient, is_cluster=False, set_db_data_fn=_set_db_data + ) + patch_redis_client( + rb.clients.MappingClient, is_cluster=False, set_db_data_fn=_set_db_data + ) + patch_redis_client( + rb.clients.RoutingClient, is_cluster=False, set_db_data_fn=_set_db_data + ) def _patch_rediscluster(): @@ -245,7 +337,9 @@ def _patch_rediscluster(): except ImportError: return - patch_redis_client(rediscluster.RedisCluster, is_cluster=True) + patch_redis_client( + rediscluster.RedisCluster, is_cluster=True, set_db_data_fn=_set_db_data + ) # up to v1.3.6, __version__ attribute is a tuple # from v2.0.0, __version__ is a string and VERSION a tuple @@ -255,11 +349,17 @@ def _patch_rediscluster(): # https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst if (0, 2, 0) < version < (2, 0, 0): pipeline_cls = rediscluster.pipeline.StrictClusterPipeline - patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True) + patch_redis_client( + rediscluster.StrictRedisCluster, + is_cluster=True, + set_db_data_fn=_set_db_data, + ) else: pipeline_cls = rediscluster.pipeline.ClusterPipeline - patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command) + patch_redis_pipeline( + pipeline_cls, True, _parse_rediscluster_command, set_db_data_fn=_set_db_data + ) class RedisIntegration(Integration): @@ -278,6 +378,7 @@ def setup_once(): raise DidNotEnable("Redis client not installed") _patch_redis(StrictRedis, client) + _patch_redis_cluster() _patch_rb() try: diff --git a/sentry_sdk/integrations/redis/asyncio.py b/sentry_sdk/integrations/redis/asyncio.py index 70decdcbd4..09fad3426a 100644 --- a/sentry_sdk/integrations/redis/asyncio.py +++ b/sentry_sdk/integrations/redis/asyncio.py @@ -4,21 +4,25 @@ from sentry_sdk.consts import OP from sentry_sdk.integrations.redis import ( RedisIntegration, - _get_redis_command_args, _get_span_description, _set_client_data, - _set_db_data, _set_pipeline_data, ) from sentry_sdk._types import TYPE_CHECKING +from sentry_sdk.tracing import Span from sentry_sdk.utils import capture_internal_exceptions if TYPE_CHECKING: - from typing import Any + from collections.abc import Callable + from typing import Any, Union + from redis.asyncio.client import Pipeline, StrictRedis + from redis.asyncio.cluster import ClusterPipeline, RedisCluster -def patch_redis_async_pipeline(pipeline_cls): - # type: (Any) -> None +def patch_redis_async_pipeline( + pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn +): + # type: (Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]], bool, Any, Callable[[Span, Any], None]) -> None old_execute = pipeline_cls.execute async def _sentry_execute(self, *args, **kwargs): @@ -32,22 +36,22 @@ async def _sentry_execute(self, *args, **kwargs): op=OP.DB_REDIS, description="redis.pipeline.execute" ) as span: with capture_internal_exceptions(): - _set_db_data(span, self.connection_pool.connection_kwargs) + set_db_data_fn(span, self) _set_pipeline_data( span, - False, - _get_redis_command_args, - self.is_transaction, - self.command_stack, + is_cluster, + get_command_args_fn, + False if is_cluster else self.is_transaction, + self._command_stack if is_cluster else self.command_stack, ) return await old_execute(self, *args, **kwargs) - pipeline_cls.execute = _sentry_execute + pipeline_cls.execute = _sentry_execute # type: ignore[method-assign] -def patch_redis_async_client(cls): - # type: (Any) -> None +def patch_redis_async_client(cls, is_cluster, set_db_data_fn): + # type: (Union[type[StrictRedis[Any]], type[RedisCluster[Any]]], bool, Callable[[Span, Any], None]) -> None old_execute_command = cls.execute_command async def _sentry_execute_command(self, name, *args, **kwargs): @@ -60,9 +64,9 @@ async def _sentry_execute_command(self, name, *args, **kwargs): description = _get_span_description(name, *args) with hub.start_span(op=OP.DB_REDIS, description=description) as span: - _set_db_data(span, self.connection_pool.connection_kwargs) - _set_client_data(span, False, name, *args) + set_db_data_fn(span, self) + _set_client_data(span, is_cluster, name, *args) return await old_execute_command(self, name, *args, **kwargs) - cls.execute_command = _sentry_execute_command + cls.execute_command = _sentry_execute_command # type: ignore[method-assign] diff --git a/tests/integrations/redis/cluster/__init__.py b/tests/integrations/redis/cluster/__init__.py new file mode 100644 index 0000000000..008b24295f --- /dev/null +++ b/tests/integrations/redis/cluster/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("redis.cluster") diff --git a/tests/integrations/redis/cluster/test_redis_cluster.py b/tests/integrations/redis/cluster/test_redis_cluster.py new file mode 100644 index 0000000000..1e1e59e254 --- /dev/null +++ b/tests/integrations/redis/cluster/test_redis_cluster.py @@ -0,0 +1,141 @@ +import pytest +from sentry_sdk import capture_message +from sentry_sdk.consts import SPANDATA +from sentry_sdk.api import start_transaction +from sentry_sdk.integrations.redis import RedisIntegration + +import redis + + +@pytest.fixture(autouse=True) +def monkeypatch_rediscluster_class(reset_integrations): + pipeline_cls = redis.cluster.ClusterPipeline + redis.cluster.NodesManager.initialize = lambda *_, **__: None + redis.RedisCluster.command = lambda *_: [] + redis.RedisCluster.pipeline = lambda *_, **__: pipeline_cls(None, None) + redis.RedisCluster.get_default_node = lambda *_, **__: redis.cluster.ClusterNode( + "localhost", 6379 + ) + pipeline_cls.execute = lambda *_, **__: None + redis.RedisCluster.execute_command = lambda *_, **__: [] + + +def test_rediscluster_breadcrumb(sentry_init, capture_events): + sentry_init(integrations=[RedisIntegration()]) + events = capture_events() + + rc = redis.RedisCluster(host="localhost", port=6379) + rc.get("foobar") + capture_message("hi") + + (event,) = events + crumbs = event["breadcrumbs"]["values"] + + # on initializing a RedisCluster, a COMMAND call is made - this is not important for the test + # but must be accounted for + assert len(crumbs) in (1, 2) + assert len(crumbs) == 1 or crumbs[0]["message"] == "COMMAND" + + crumb = crumbs[-1] + + assert crumb == { + "category": "redis", + "message": "GET 'foobar'", + "data": { + "db.operation": "GET", + "redis.key": "foobar", + "redis.command": "GET", + "redis.is_cluster": True, + }, + "timestamp": crumb["timestamp"], + "type": "redis", + } + + +@pytest.mark.parametrize( + "send_default_pii, description", + [ + (False, "SET 'bar' [Filtered]"), + (True, "SET 'bar' 1"), + ], +) +def test_rediscluster_basic(sentry_init, capture_events, send_default_pii, description): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + with start_transaction(): + rc = redis.RedisCluster(host="localhost", port=6379) + rc.set("bar", 1) + + (event,) = events + spans = event["spans"] + + # on initializing a RedisCluster, a COMMAND call is made - this is not important for the test + # but must be accounted for + assert len(spans) in (1, 2) + assert len(spans) == 1 or spans[0]["description"] == "COMMAND" + + span = spans[-1] + assert span["op"] == "db.redis" + assert span["description"] == description + assert span["data"] == { + SPANDATA.DB_SYSTEM: "redis", + # ClusterNode converts localhost to 127.0.0.1 + SPANDATA.SERVER_ADDRESS: "127.0.0.1", + SPANDATA.SERVER_PORT: 6379, + } + assert span["tags"] == { + "db.operation": "SET", + "redis.command": "SET", + "redis.is_cluster": True, + "redis.key": "bar", + } + + +@pytest.mark.parametrize( + "send_default_pii, expected_first_ten", + [ + (False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]), + (True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]), + ], +) +def test_rediscluster_pipeline( + sentry_init, capture_events, send_default_pii, expected_first_ten +): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + rc = redis.RedisCluster(host="localhost", port=6379) + with start_transaction(): + pipeline = rc.pipeline() + pipeline.get("foo") + pipeline.set("bar", 1) + pipeline.set("baz", 2) + pipeline.execute() + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == "redis.pipeline.execute" + assert span["data"] == { + "redis.commands": { + "count": 3, + "first_ten": expected_first_ten, + }, + SPANDATA.DB_SYSTEM: "redis", + # ClusterNode converts localhost to 127.0.0.1 + SPANDATA.SERVER_ADDRESS: "127.0.0.1", + SPANDATA.SERVER_PORT: 6379, + } + assert span["tags"] == { + "redis.transaction": False, # For Cluster, this is always False + "redis.is_cluster": True, + } diff --git a/tests/integrations/redis/cluster_asyncio/__init__.py b/tests/integrations/redis/cluster_asyncio/__init__.py new file mode 100644 index 0000000000..663979a4e2 --- /dev/null +++ b/tests/integrations/redis/cluster_asyncio/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("redis.asyncio.cluster") diff --git a/tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py b/tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py new file mode 100644 index 0000000000..ad78b79e27 --- /dev/null +++ b/tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py @@ -0,0 +1,142 @@ +import pytest + +from sentry_sdk import capture_message, start_transaction +from sentry_sdk.consts import SPANDATA +from sentry_sdk.integrations.redis import RedisIntegration + +from redis.asyncio import cluster + + +async def fake_initialize(*_, **__): + return None + + +async def fake_execute_command(*_, **__): + return [] + + +async def fake_execute(*_, **__): + return None + + +@pytest.fixture(autouse=True) +def monkeypatch_rediscluster_asyncio_class(reset_integrations): + pipeline_cls = cluster.ClusterPipeline + cluster.NodesManager.initialize = fake_initialize + cluster.RedisCluster.get_default_node = lambda *_, **__: cluster.ClusterNode( + "localhost", 6379 + ) + cluster.RedisCluster.pipeline = lambda self, *_, **__: pipeline_cls(self) + pipeline_cls.execute = fake_execute + cluster.RedisCluster.execute_command = fake_execute_command + + +@pytest.mark.asyncio +async def test_async_breadcrumb(sentry_init, capture_events): + sentry_init(integrations=[RedisIntegration()]) + events = capture_events() + + connection = cluster.RedisCluster(host="localhost", port=6379) + + await connection.get("foobar") + capture_message("hi") + + (event,) = events + (crumb,) = event["breadcrumbs"]["values"] + + assert crumb == { + "category": "redis", + "message": "GET 'foobar'", + "data": { + "db.operation": "GET", + "redis.key": "foobar", + "redis.command": "GET", + "redis.is_cluster": True, + }, + "timestamp": crumb["timestamp"], + "type": "redis", + } + + +@pytest.mark.parametrize( + "send_default_pii, description", + [ + (False, "SET 'bar' [Filtered]"), + (True, "SET 'bar' 1"), + ], +) +@pytest.mark.asyncio +async def test_async_basic(sentry_init, capture_events, send_default_pii, description): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + connection = cluster.RedisCluster(host="localhost", port=6379) + with start_transaction(): + await connection.set("bar", 1) + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == description + assert span["data"] == { + SPANDATA.DB_SYSTEM: "redis", + # ClusterNode converts localhost to 127.0.0.1 + SPANDATA.SERVER_ADDRESS: "127.0.0.1", + SPANDATA.SERVER_PORT: 6379, + } + assert span["tags"] == { + "redis.is_cluster": True, + "db.operation": "SET", + "redis.command": "SET", + "redis.key": "bar", + } + + +@pytest.mark.parametrize( + "send_default_pii, expected_first_ten", + [ + (False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]), + (True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]), + ], +) +@pytest.mark.asyncio +async def test_async_redis_pipeline( + sentry_init, capture_events, send_default_pii, expected_first_ten +): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + connection = cluster.RedisCluster(host="localhost", port=6379) + with start_transaction(): + pipeline = connection.pipeline() + pipeline.get("foo") + pipeline.set("bar", 1) + pipeline.set("baz", 2) + await pipeline.execute() + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == "redis.pipeline.execute" + assert span["data"] == { + "redis.commands": { + "count": 3, + "first_ten": expected_first_ten, + }, + SPANDATA.DB_SYSTEM: "redis", + # ClusterNode converts localhost to 127.0.0.1 + SPANDATA.SERVER_ADDRESS: "127.0.0.1", + SPANDATA.SERVER_PORT: 6379, + } + assert span["tags"] == { + "redis.transaction": False, + "redis.is_cluster": True, + }