Skip to content

Commit

Permalink
Add redis.asyncio.Connection instrumentation (#919)
Browse files Browse the repository at this point in the history
* Add async connection instrumentation

* Remove unsupported flask tests

* Remove old instrumentation from coverage analysis

* [Mega-Linter] Apply linters fixes

* Trigger tests

---------

Co-authored-by: lrafeei <[email protected]>
  • Loading branch information
lrafeei and lrafeei authored Sep 12, 2023
1 parent b1be563 commit e371b02
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 30 deletions.
29 changes: 15 additions & 14 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2760,20 +2760,6 @@ def _process_module_builtin_defaults():
"aioredis.connection", "newrelic.hooks.datastore_aioredis", "instrument_aioredis_connection"
)

# Redis v4.2+
_process_module_definition(
"redis.asyncio.client", "newrelic.hooks.datastore_redis", "instrument_asyncio_redis_client"
)

# Redis v4.2+
_process_module_definition(
"redis.asyncio.commands", "newrelic.hooks.datastore_redis", "instrument_asyncio_redis_client"
)

_process_module_definition(
"redis.asyncio.connection", "newrelic.hooks.datastore_aioredis", "instrument_aioredis_connection"
)

# v7 and below
_process_module_definition(
"elasticsearch.client",
Expand Down Expand Up @@ -2930,6 +2916,21 @@ def _process_module_builtin_defaults():
"instrument_pymongo_collection",
)

# Redis v4.2+
_process_module_definition(
"redis.asyncio.client", "newrelic.hooks.datastore_redis", "instrument_asyncio_redis_client"
)

# Redis v4.2+
_process_module_definition(
"redis.asyncio.commands", "newrelic.hooks.datastore_redis", "instrument_asyncio_redis_client"
)

# Redis v4.2+
_process_module_definition(
"redis.asyncio.connection", "newrelic.hooks.datastore_redis", "instrument_asyncio_redis_connection"
)

_process_module_definition(
"redis.connection",
"newrelic.hooks.datastore_redis",
Expand Down
66 changes: 64 additions & 2 deletions newrelic/hooks/datastore_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import re

from newrelic.api.datastore_trace import DatastoreTrace
from newrelic.api.time_trace import current_trace
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import function_wrapper, wrap_function_wrapper


_redis_client_sync_methods = {
"acl_dryrun",
"auth",
Expand Down Expand Up @@ -545,6 +545,59 @@ def _nr_wrapper_asyncio_Redis_method_(wrapped, instance, args, kwargs):
wrap_function_wrapper(module, name, _nr_wrapper_asyncio_Redis_method_)


async def wrap_async_Connection_send_command(wrapped, instance, args, kwargs):
transaction = current_transaction()
if not transaction:
return await wrapped(*args, **kwargs)

host, port_path_or_id, db = (None, None, None)

try:
dt = transaction.settings.datastore_tracer
if dt.instance_reporting.enabled or dt.database_name_reporting.enabled:
conn_kwargs = _conn_attrs_to_dict(instance)
host, port_path_or_id, db = _instance_info(conn_kwargs)
except Exception:
pass

# Older Redis clients would when sending multi part commands pass
# them in as separate arguments to send_command(). Need to therefore
# detect those and grab the next argument from the set of arguments.

operation = args[0].strip().lower()

# If it's not a multi part command, there's no need to trace it, so
# we can return early.

if (
operation.split()[0] not in _redis_multipart_commands
): # Set the datastore info on the DatastoreTrace containing this function call.
trace = current_trace()

# Find DatastoreTrace no matter how many other traces are inbetween
while trace is not None and not isinstance(trace, DatastoreTrace):
trace = getattr(trace, "parent", None)

if trace is not None:
trace.host = host
trace.port_path_or_id = port_path_or_id
trace.database_name = db

return await wrapped(*args, **kwargs)

# Convert multi args to single arg string

if operation in _redis_multipart_commands and len(args) > 1:
operation = "%s %s" % (operation, args[1].strip().lower())

operation = _redis_operation_re.sub("_", operation)

with DatastoreTrace(
product="Redis", target=None, operation=operation, host=host, port_path_or_id=port_path_or_id, database_name=db
):
return await wrapped(*args, **kwargs)


def _nr_Connection_send_command_wrapper_(wrapped, instance, args, kwargs):
transaction = current_transaction()

Expand Down Expand Up @@ -613,6 +666,7 @@ def instrument_asyncio_redis_client(module):
if hasattr(class_, operation):
_wrap_asyncio_Redis_method_wrapper(module, "Redis", operation)


def instrument_redis_commands_core(module):
_instrument_redis_commands_module(module, "CoreCommands")

Expand Down Expand Up @@ -658,4 +712,12 @@ def _instrument_redis_commands_module(module, class_name):


def instrument_redis_connection(module):
wrap_function_wrapper(module, "Connection.send_command", _nr_Connection_send_command_wrapper_)
if hasattr(module, "Connection"):
if hasattr(module.Connection, "send_command"):
wrap_function_wrapper(module, "Connection.send_command", _nr_Connection_send_command_wrapper_)


def instrument_asyncio_redis_connection(module):
if hasattr(module, "Connection"):
if hasattr(module.Connection, "send_command"):
wrap_function_wrapper(module, "Connection.send_command", wrap_async_Connection_send_command)
4 changes: 1 addition & 3 deletions newrelic/hooks/framework_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def _nr_wrapper_error_handler_(wrapped, instance, args, kwargs):
return FunctionTraceWrapper(wrapped, name=name)(*args, **kwargs)


def _nr_wrapper_Flask__register_error_handler_(wrapped, instance, args, kwargs):
def _nr_wrapper_Flask__register_error_handler_(wrapped, instance, args, kwargs): # pragma: no cover
def _bind_params(key, code_or_exception, f):
return key, code_or_exception, f

Expand All @@ -189,7 +189,6 @@ def _bind_params(code_or_exception, f):


def _nr_wrapper_Flask_try_trigger_before_first_request_functions_(wrapped, instance, args, kwargs):

transaction = current_transaction()

if transaction is None:
Expand Down Expand Up @@ -355,7 +354,6 @@ def _nr_wrapper_Blueprint_endpoint_(wrapped, instance, args, kwargs):

@function_wrapper
def _nr_wrapper_Blueprint_before_request_wrapped_(wrapped, instance, args, kwargs):

transaction = current_transaction()

if transaction is None:
Expand Down
67 changes: 60 additions & 7 deletions tests/datastore_redis/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,53 @@
DB_SETTINGS = redis_settings()[0]
REDIS_PY_VERSION = get_package_version_tuple("redis")

# Metrics
# Metrics for publish test

datastore_all_metric_count = 5 if REDIS_PY_VERSION >= (5, 0) else 3

_base_scoped_metrics = [("Datastore/operation/Redis/publish", 3)]

if REDIS_PY_VERSION >= (5, 0):
_base_scoped_metrics.append(('Datastore/operation/Redis/client_setinfo', 2),)

datastore_all_metric_count = 5 if REDIS_PY_VERSION >= (5, 0) else 3
_base_scoped_metrics.append(
("Datastore/operation/Redis/client_setinfo", 2),
)

_base_rollup_metrics = [
("Datastore/all", datastore_all_metric_count),
("Datastore/allOther", datastore_all_metric_count),
("Datastore/Redis/all", datastore_all_metric_count),
("Datastore/Redis/allOther", datastore_all_metric_count),
("Datastore/operation/Redis/publish", 3),
("Datastore/instance/Redis/%s/%s" % (instance_hostname(DB_SETTINGS["host"]), DB_SETTINGS["port"]), datastore_all_metric_count),
(
"Datastore/instance/Redis/%s/%s" % (instance_hostname(DB_SETTINGS["host"]), DB_SETTINGS["port"]),
datastore_all_metric_count,
),
]
if REDIS_PY_VERSION >= (5, 0):
_base_rollup_metrics.append(('Datastore/operation/Redis/client_setinfo', 2),)
_base_rollup_metrics.append(
("Datastore/operation/Redis/client_setinfo", 2),
)


# Metrics for connection pool test

_base_pool_scoped_metrics = [
("Datastore/operation/Redis/get", 1),
("Datastore/operation/Redis/set", 1),
("Datastore/operation/Redis/client_list", 1),
]

_base_pool_rollup_metrics = [
("Datastore/all", 3),
("Datastore/allOther", 3),
("Datastore/Redis/all", 3),
("Datastore/Redis/allOther", 3),
("Datastore/operation/Redis/get", 1),
("Datastore/operation/Redis/set", 1),
("Datastore/operation/Redis/client_list", 1),
("Datastore/instance/Redis/%s/%s" % (instance_hostname(DB_SETTINGS["host"]), DB_SETTINGS["port"]), 3),
]


# Tests

Expand All @@ -60,6 +88,31 @@ def client(loop): # noqa
return loop.run_until_complete(redis.asyncio.Redis(host=DB_SETTINGS["host"], port=DB_SETTINGS["port"], db=0))


@pytest.fixture()
def client_pool(loop): # noqa
import redis.asyncio

connection_pool = redis.asyncio.ConnectionPool(host=DB_SETTINGS["host"], port=DB_SETTINGS["port"], db=0)
return loop.run_until_complete(redis.asyncio.Redis(connection_pool=connection_pool))


@pytest.mark.skipif(REDIS_PY_VERSION < (4, 2), reason="This functionality exists in Redis 4.2+")
@validate_transaction_metrics(
"test_asyncio:test_async_connection_pool",
scoped_metrics=_base_pool_scoped_metrics,
rollup_metrics=_base_pool_rollup_metrics,
background_task=True,
)
@background_task()
def test_async_connection_pool(client_pool, loop): # noqa
async def _test_async_pool(client_pool):
await client_pool.set("key1", "value1")
await client_pool.get("key1")
await client_pool.execute_command("CLIENT", "LIST")

loop.run_until_complete(_test_async_pool(client_pool))


@pytest.mark.skipif(REDIS_PY_VERSION < (4, 2), reason="This functionality exists in Redis 4.2+")
@validate_transaction_metrics("test_asyncio:test_async_pipeline", background_task=True)
@background_task()
Expand Down Expand Up @@ -104,4 +157,4 @@ async def _test_pubsub():
await future

loop.run_until_complete(_test_pubsub())
assert messages_received == ["Hello", "World", "NOPE"]
assert messages_received == ["Hello", "World", "NOPE"]
4 changes: 0 additions & 4 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ envlist =
# Falcon master branch failing on 3.11 currently.
python-framework_falcon-py311-falcon0200,
python-framework_fastapi-{py37,py38,py39,py310,py311},
python-framework_flask-{pypy27,py27}-flask0012,
python-framework_flask-{pypy27,py27,py37,py38,py39,py310,py311,pypy38}-flask0101,
; temporarily disabling flaskmaster tests
python-framework_flask-{py37,py38,py39,py310,py311,pypy38}-flasklatest,
python-framework_graphene-{py37,py38,py39,py310,py311}-graphenelatest,
Expand Down Expand Up @@ -302,8 +300,6 @@ deps =
framework_flask: markupsafe<2.1
framework_flask: jinja2<3.1
framework_flask: Flask-Compress
framework_flask-flask0012: flask<0.13
framework_flask-flask0101: flask<1.2
framework_flask-flasklatest: flask[async]
framework_flask-flaskmaster: https://github.com/pallets/werkzeug/archive/main.zip
framework_flask-flaskmaster: https://github.com/pallets/flask/archive/main.zip#egg=flask[async]
Expand Down

0 comments on commit e371b02

Please sign in to comment.