Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integrations): add support for cluster clients from redis sdk #2394

Merged
merged 13 commits into from
Dec 7, 2023
134 changes: 112 additions & 22 deletions sentry_sdk/integrations/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)

if TYPE_CHECKING:
from typing import Any, Dict, Sequence
from typing import Any, Dict, Sequence, Callable

Check warning on line 16 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L16

Added line #L16 was not covered by tests
md384 marked this conversation as resolved.
Show resolved Hide resolved
from sentry_sdk.tracing import Span

_SINGLE_KEY_COMMANDS = frozenset(
Expand Down Expand Up @@ -83,8 +83,7 @@
):
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
span.set_tag("redis.is_cluster", is_cluster)
transaction = is_transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)
span.set_tag("redis.transaction", is_transaction)

commands = []
for i, arg in enumerate(command_stack):
Expand Down Expand Up @@ -118,7 +117,7 @@
span.set_tag("redis.key", args[0])


def _set_db_data(span, connection_params):
def _set_db_data_on_span(span, connection_params):
# type: (Span, Dict[str, Any]) -> None
span.set_data(SPANDATA.DB_SYSTEM, "redis")

Expand All @@ -135,8 +134,34 @@
span.set_data(SPANDATA.SERVER_PORT, port)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
# type: (Any, bool, Any) -> None
def _set_db_data(span, redis_instance):
# type: (Span, Any) -> None
md384 marked this conversation as resolved.
Show resolved Hide resolved
_set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs)


def _set_cluster_db_data(span, redis_cluster_instance):
# type: (Span, Any) -> None
md384 marked this conversation as resolved.
Show resolved Hide resolved
default_node = redis_cluster_instance.get_default_node()
if default_node:
md384 marked this conversation as resolved.
Show resolved Hide resolved
_set_db_data_on_span(
span, {"host": default_node.host, "port": default_node.port}
szokeasaurusrex marked this conversation as resolved.
Show resolved Hide resolved
)


def _set_async_cluster_db_data(span, async_redis_cluster_instance):
# type: (Span, Any) -> None
md384 marked this conversation as resolved.
Show resolved Hide resolved
default_node = async_redis_cluster_instance.get_default_node()
if default_node and default_node.connection_kwargs:
_set_db_data_on_span(span, default_node.connection_kwargs)


def _set_async_cluster_pipeline_db_data(span, async_redis_cluster_pipeline_instance):
# type: (Span, Any) -> None
md384 marked this conversation as resolved.
Show resolved Hide resolved
_set_async_cluster_db_data(span, async_redis_cluster_pipeline_instance._client)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn):
# type: (Any, bool, Any, Callable[[Span, Any], None]) -> None
old_execute = pipeline_cls.execute

def sentry_patched_execute(self, *args, **kwargs):
Expand All @@ -150,12 +175,12 @@
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
is_cluster,
get_command_args_fn,
self.transaction,
False if is_cluster else self.transaction,
self.command_stack,
)

Expand All @@ -164,8 +189,8 @@
pipeline_cls.execute = sentry_patched_execute


def patch_redis_client(cls, is_cluster):
# type: (Any, bool) -> None
def patch_redis_client(cls, is_cluster, set_db_data_fn):
# type: (Any, bool, Callable[[Span, Any], None]) -> None
"""
This function can be used to instrument custom redis client classes or
subclasses.
Expand All @@ -189,7 +214,7 @@
description = description[: integration.max_data_size - len("...")] + "..."

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
szokeasaurusrex marked this conversation as resolved.
Show resolved Hide resolved
_set_client_data(span, is_cluster, name, *args)

return old_execute_command(self, name, *args, **kwargs)
Expand All @@ -199,14 +224,16 @@

def _patch_redis(StrictRedis, client): # noqa: N803
# type: (Any, Any) -> None
patch_redis_client(StrictRedis, is_cluster=False)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
patch_redis_client(StrictRedis, is_cluster=False, set_db_data_fn=_set_db_data)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args, _set_db_data)
try:
strict_pipeline = client.StrictPipeline
except AttributeError:
pass
else:
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
patch_redis_pipeline(
strict_pipeline, False, _get_redis_command_args, _set_db_data
)

try:
import redis.asyncio
Expand All @@ -218,8 +245,56 @@
patch_redis_async_pipeline,
)

patch_redis_async_client(redis.asyncio.client.StrictRedis)
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
patch_redis_async_client(
redis.asyncio.client.StrictRedis,
is_cluster=False,
set_db_data_fn=_set_db_data,
)
patch_redis_async_pipeline(
redis.asyncio.client.Pipeline,
False,
_get_redis_command_args,
set_db_data_fn=_set_db_data,
)


def _patch_redis_cluster():
# type: () -> None
"""Patches the cluster module on redis SDK (as opposed to rediscluster library)"""
try:
from redis import RedisCluster, cluster
except ImportError:
pass
else:
patch_redis_client(RedisCluster, True, _set_cluster_db_data)
patch_redis_pipeline(
cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
_set_cluster_db_data,
)

try:
from redis.asyncio import cluster as async_cluster
except ImportError:
pass
else:
from sentry_sdk.integrations.redis.asyncio import (
patch_redis_async_client,
patch_redis_async_pipeline,
)

patch_redis_async_client(
async_cluster.RedisCluster,
is_cluster=True,
set_db_data_fn=_set_async_cluster_db_data,
)
patch_redis_async_pipeline(
async_cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
set_db_data_fn=_set_async_cluster_pipeline_db_data,
)


def _patch_rb():
Expand All @@ -229,9 +304,15 @@
except ImportError:
pass
else:
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
patch_redis_client(

Check warning on line 307 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L307

Added line #L307 was not covered by tests
rb.clients.FanoutClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(

Check warning on line 310 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L310

Added line #L310 was not covered by tests
rb.clients.MappingClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(

Check warning on line 313 in sentry_sdk/integrations/redis/__init__.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/__init__.py#L313

Added line #L313 was not covered by tests
rb.clients.RoutingClient, is_cluster=False, set_db_data_fn=_set_db_data
)


def _patch_rediscluster():
Expand All @@ -241,7 +322,9 @@
except ImportError:
return

patch_redis_client(rediscluster.RedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.RedisCluster, is_cluster=True, set_db_data_fn=_set_db_data
)

# up to v1.3.6, __version__ attribute is a tuple
# from v2.0.0, __version__ is a string and VERSION a tuple
Expand All @@ -251,11 +334,17 @@
# https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst
if (0, 2, 0) < version < (2, 0, 0):
pipeline_cls = rediscluster.pipeline.StrictClusterPipeline
patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.StrictRedisCluster,
is_cluster=True,
set_db_data_fn=_set_db_data,
)
else:
pipeline_cls = rediscluster.pipeline.ClusterPipeline

patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command)
patch_redis_pipeline(
pipeline_cls, True, _parse_rediscluster_command, set_db_data_fn=_set_db_data
)


class RedisIntegration(Integration):
Expand All @@ -274,6 +363,7 @@
raise DidNotEnable("Redis client not installed")

_patch_redis(StrictRedis, client)
_patch_redis_cluster()
_patch_rb()

try:
Expand Down
29 changes: 15 additions & 14 deletions sentry_sdk/integrations/redis/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@
from sentry_sdk.consts import OP
from sentry_sdk.integrations.redis import (
RedisIntegration,
_get_redis_command_args,
_get_span_description,
_set_client_data,
_set_db_data,
_set_pipeline_data,
)
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.tracing import Span
from sentry_sdk.utils import capture_internal_exceptions

if TYPE_CHECKING:
from typing import Any
from typing import Any, Callable

Check warning on line 16 in sentry_sdk/integrations/redis/asyncio.py

View check run for this annotation

Codecov / codecov/patch

sentry_sdk/integrations/redis/asyncio.py#L16

Added line #L16 was not covered by tests
md384 marked this conversation as resolved.
Show resolved Hide resolved


def patch_redis_async_pipeline(pipeline_cls):
# type: (Any) -> None
def patch_redis_async_pipeline(
pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn
):
# type: (Any, bool, Any, Callable[[Span, Any], None]) -> None
md384 marked this conversation as resolved.
Show resolved Hide resolved
old_execute = pipeline_cls.execute

async def _sentry_execute(self, *args, **kwargs):
Expand All @@ -32,22 +33,22 @@
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
False,
_get_redis_command_args,
self.is_transaction,
self.command_stack,
is_cluster,
get_command_args_fn,
False if is_cluster else self.is_transaction,
self._command_stack if is_cluster else self.command_stack,
)

return await old_execute(self, *args, **kwargs)

pipeline_cls.execute = _sentry_execute


def patch_redis_async_client(cls):
# type: (Any) -> None
def patch_redis_async_client(cls, is_cluster, set_db_data_fn):
# type: (Any, bool, Callable[[Span, Any], None]) -> None
md384 marked this conversation as resolved.
Show resolved Hide resolved
old_execute_command = cls.execute_command

async def _sentry_execute_command(self, name, *args, **kwargs):
Expand All @@ -60,8 +61,8 @@
description = _get_span_description(name, *args)

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_db_data(span, self.connection_pool.connection_kwargs)
_set_client_data(span, False, name, *args)
set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)

return await old_execute_command(self, name, *args, **kwargs)

Expand Down
6 changes: 5 additions & 1 deletion tests/integrations/redis/asyncio/test_redis_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@

@pytest.mark.asyncio
async def test_async_basic(sentry_init, capture_events):
sentry_init(integrations=[RedisIntegration()])
sentry_init(
integrations=[RedisIntegration()],
traces_sample_rate=1.0,
send_default_pii=True,
md384 marked this conversation as resolved.
Show resolved Hide resolved
)
events = capture_events()

connection = FakeRedis()
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/redis/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("redis.cluster")
Loading
Loading