From 73f319740252b4cdd1d51b8b8e741b98fc6a80b2 Mon Sep 17 00:00:00 2001 From: Timothy Pansino <11214426+TimPansino@users.noreply.github.com> Date: Mon, 10 Jun 2024 13:20:17 -0700 Subject: [PATCH] Psycopg3 Instrumentation (#1155) * Sort tox file * Update pytest * Add pscyopg3 test suite Remove psycopg 3.0 tests Fix tox for psycopg-binary * Add new error message to slow sql validator * Add DBAPI2 async instrumentation * Add psycopg3 instrumentation * Upgrade postgres in CI * Add separate postgres 16 and 9 testing versions * Format and lint * Add more __enter__ comments * Don't needlessly append rollup metrics * [Mega-Linter] Apply linters fixes * Bump tests * Rename dsn to conninfo --------- Co-authored-by: TimPansino Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .github/workflows/tests.yml | 71 ++- newrelic/config.py | 3 + newrelic/hooks/database_dbapi2.py | 101 ++-- newrelic/hooks/database_dbapi2_async.py | 160 ++++++ newrelic/hooks/database_psycopg.py | 514 ++++++++++++++++++ tests/datastore_psycopg/conftest.py | 116 ++++ tests/datastore_psycopg/test_as_string.py | 112 ++++ tests/datastore_psycopg/test_connection.py | 221 ++++++++ tests/datastore_psycopg/test_cursor.py | 233 ++++++++ .../test_database_instance_info.py | 226 ++++++++ tests/datastore_psycopg/test_explain_plans.py | 201 +++++++ .../datastore_psycopg/test_forward_compat.py | 41 ++ tests/datastore_psycopg/test_multiple_dbs.py | 138 +++++ tests/datastore_psycopg/test_obfuscation.py | 139 +++++ tests/datastore_psycopg/test_register.py | 85 +++ tests/datastore_psycopg/test_rollback.py | 104 ++++ tests/datastore_psycopg/test_slow_sql.py | 135 +++++ tests/datastore_psycopg/test_span_event.py | 136 +++++ tests/datastore_psycopg/test_trace_node.py | 104 ++++ tests/testing_support/db_settings.py | 9 +- .../validate_transaction_slow_sql_count.py | 6 +- tox.ini | 39 +- 22 files changed, 2831 insertions(+), 63 deletions(-) create mode 100644 newrelic/hooks/database_dbapi2_async.py create mode 100644 newrelic/hooks/database_psycopg.py create mode 100644 tests/datastore_psycopg/conftest.py create mode 100644 tests/datastore_psycopg/test_as_string.py create mode 100644 tests/datastore_psycopg/test_connection.py create mode 100644 tests/datastore_psycopg/test_cursor.py create mode 100644 tests/datastore_psycopg/test_database_instance_info.py create mode 100644 tests/datastore_psycopg/test_explain_plans.py create mode 100644 tests/datastore_psycopg/test_forward_compat.py create mode 100644 tests/datastore_psycopg/test_multiple_dbs.py create mode 100644 tests/datastore_psycopg/test_obfuscation.py create mode 100644 tests/datastore_psycopg/test_register.py create mode 100644 tests/datastore_psycopg/test_rollback.py create mode 100644 tests/datastore_psycopg/test_slow_sql.py create mode 100644 tests/datastore_psycopg/test_span_event.py create mode 100644 tests/datastore_psycopg/test_trace_node.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7e0e4a3ab..c54e56b5e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -43,7 +43,8 @@ jobs: - mongodb - mssql - mysql - - postgres + - postgres16 + - postgres9 - rabbitmq - redis - rediscluster @@ -207,7 +208,7 @@ jobs: path: ./**/.coverage.* retention-days: 1 - postgres: + postgres16: env: TOTAL_GROUPS: 2 @@ -223,7 +224,71 @@ jobs: --add-host=host.docker.internal:host-gateway timeout-minutes: 30 services: - postgres: + postgres16: + image: postgres:16 + env: + POSTGRES_PASSWORD: postgres + ports: + - 8080:5432 + - 8081:5432 + # Set health checks to wait until postgres has started + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # 4.1.1 + + - name: Fetch git tags + run: | + git config --global --add safe.directory "$GITHUB_WORKSPACE" + git fetch --tags origin + + - name: Configure pip cache + run: | + mkdir -p /github/home/.cache/pip + chown -R $(whoami) /github/home/.cache/pip + + - name: Get Environments + id: get-envs + run: | + echo "envs=$(tox -l | grep '^${{ github.job }}\-' | ./.github/workflows/get-envs.py)" >> $GITHUB_OUTPUT + env: + GROUP_NUMBER: ${{ matrix.group-number }} + + - name: Test + run: | + tox -vv -e ${{ steps.get-envs.outputs.envs }} -p auto + env: + TOX_PARALLEL_NO_SPINNER: 1 + PY_COLORS: 0 + + - name: Upload Coverage Artifacts + uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # 4.3.1 + with: + name: coverage-${{ github.job }}-${{ strategy.job-index }} + path: ./**/.coverage.* + retention-days: 1 + + postgres9: + env: + TOTAL_GROUPS: 1 + + strategy: + fail-fast: false + matrix: + group-number: [1] + + runs-on: ubuntu-20.04 + container: + image: ghcr.io/newrelic/newrelic-python-agent-ci:latest + options: >- + --add-host=host.docker.internal:host-gateway + timeout-minutes: 30 + services: + postgres9: image: postgres:9 env: POSTGRES_PASSWORD: postgres diff --git a/newrelic/config.py b/newrelic/config.py index 87449cfed..6c4d4b27f 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -3100,6 +3100,9 @@ def _process_module_builtin_defaults(): _process_module_definition("pymssql", "newrelic.hooks.database_pymssql", "instrument_pymssql") + _process_module_definition("psycopg", "newrelic.hooks.database_psycopg", "instrument_psycopg") + _process_module_definition("psycopg.sql", "newrelic.hooks.database_psycopg", "instrument_psycopg_sql") + _process_module_definition("psycopg2", "newrelic.hooks.database_psycopg2", "instrument_psycopg2") _process_module_definition( "psycopg2._psycopg2", diff --git a/newrelic/hooks/database_dbapi2.py b/newrelic/hooks/database_dbapi2.py index 9506b0be4..3621cac3b 100644 --- a/newrelic/hooks/database_dbapi2.py +++ b/newrelic/hooks/database_dbapi2.py @@ -14,13 +14,12 @@ from newrelic.api.database_trace import DatabaseTrace, register_database_client from newrelic.api.function_trace import FunctionTrace -from newrelic.api.transaction import current_transaction from newrelic.common.object_names import callable_name -from newrelic.common.object_wrapper import wrap_object, ObjectProxy -from newrelic.core.config import global_settings +from newrelic.common.object_wrapper import ObjectProxy, wrap_object DEFAULT = object() + class CursorWrapper(ObjectProxy): def __init__(self, cursor, dbapi2_module, connect_params, cursor_params): @@ -31,15 +30,25 @@ def __init__(self, cursor, dbapi2_module, connect_params, cursor_params): def execute(self, sql, parameters=DEFAULT, *args, **kwargs): if parameters is not DEFAULT: - with DatabaseTrace(sql, self._nr_dbapi2_module, - self._nr_connect_params, self._nr_cursor_params, - parameters, (args, kwargs), source=self.__wrapped__.execute): - return self.__wrapped__.execute(sql, parameters, - *args, **kwargs) + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=self._nr_cursor_params, + sql_parameters=parameters, + execute_params=(args, kwargs), + source=self.__wrapped__.execute, + ): + return self.__wrapped__.execute(sql, parameters, *args, **kwargs) else: - with DatabaseTrace(sql, self._nr_dbapi2_module, - self._nr_connect_params, self._nr_cursor_params, - None, (args, kwargs), source=self.__wrapped__.execute): + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=self._nr_cursor_params, + execute_params=(args, kwargs), + source=self.__wrapped__.execute, + ): return self.__wrapped__.execute(sql, **kwargs) def executemany(self, sql, seq_of_parameters): @@ -49,23 +58,38 @@ def executemany(self, sql, seq_of_parameters): except (TypeError, IndexError): parameters = DEFAULT if parameters is not DEFAULT: - with DatabaseTrace(sql, self._nr_dbapi2_module, - self._nr_connect_params, self._nr_cursor_params, - parameters, source=self.__wrapped__.executemany): + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=self._nr_cursor_params, + sql_parameters=parameters, + source=self.__wrapped__.executemany, + ): return self.__wrapped__.executemany(sql, seq_of_parameters) else: - with DatabaseTrace(sql, self._nr_dbapi2_module, - self._nr_connect_params, self._nr_cursor_params, source=self.__wrapped__.executemany): + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=self._nr_cursor_params, + source=self.__wrapped__.executemany, + ): return self.__wrapped__.executemany(sql, seq_of_parameters) def callproc(self, procname, parameters=DEFAULT): - with DatabaseTrace('CALL %s' % procname, - self._nr_dbapi2_module, self._nr_connect_params, source=self.__wrapped__.callproc): + with DatabaseTrace( + sql="CALL %s" % procname, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.callproc, + ): if parameters is not DEFAULT: return self.__wrapped__.callproc(procname, parameters) else: return self.__wrapped__.callproc(procname) + class ConnectionWrapper(ObjectProxy): __cursor_wrapper__ = CursorWrapper @@ -76,20 +100,29 @@ def __init__(self, connection, dbapi2_module, connect_params): self._nr_connect_params = connect_params def cursor(self, *args, **kwargs): - return self.__cursor_wrapper__(self.__wrapped__.cursor( - *args, **kwargs), self._nr_dbapi2_module, - self._nr_connect_params, (args, kwargs)) + return self.__cursor_wrapper__( + self.__wrapped__.cursor(*args, **kwargs), self._nr_dbapi2_module, self._nr_connect_params, (args, kwargs) + ) def commit(self): - with DatabaseTrace('COMMIT', self._nr_dbapi2_module, - self._nr_connect_params, source=self.__wrapped__.commit): + with DatabaseTrace( + sql="COMMIT", + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.commit, + ): return self.__wrapped__.commit() def rollback(self): - with DatabaseTrace('ROLLBACK', self._nr_dbapi2_module, - self._nr_connect_params, source=self.__wrapped__.rollback): + with DatabaseTrace( + sql="ROLLBACK", + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.rollback, + ): return self.__wrapped__.rollback() + class ConnectionFactory(ObjectProxy): __connection_wrapper__ = ConnectionWrapper @@ -99,17 +132,15 @@ def __init__(self, connect, dbapi2_module): self._nr_dbapi2_module = dbapi2_module def __call__(self, *args, **kwargs): - rollup = [] - rollup.append('Datastore/all') - rollup.append('Datastore/%s/all' % - self._nr_dbapi2_module._nr_database_product) + rollup = ["Datastore/all", "Datastore/%s/all" % self._nr_dbapi2_module._nr_database_product] + + with FunctionTrace(name=callable_name(self.__wrapped__), terminal=True, rollup=rollup, source=self.__wrapped__): + return self.__connection_wrapper__( + self.__wrapped__(*args, **kwargs), self._nr_dbapi2_module, (args, kwargs) + ) - with FunctionTrace(callable_name(self.__wrapped__), - terminal=True, rollup=rollup, source=self.__wrapped__): - return self.__connection_wrapper__(self.__wrapped__( - *args, **kwargs), self._nr_dbapi2_module, (args, kwargs)) def instrument(module): - register_database_client(module, 'DBAPI2', 'single') + register_database_client(module, "DBAPI2", "single") - wrap_object(module, 'connect', ConnectionFactory, (module,)) + wrap_object(module, "connect", ConnectionFactory, (module,)) diff --git a/newrelic/hooks/database_dbapi2_async.py b/newrelic/hooks/database_dbapi2_async.py new file mode 100644 index 000000000..fa777feb8 --- /dev/null +++ b/newrelic/hooks/database_dbapi2_async.py @@ -0,0 +1,160 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from newrelic.api.database_trace import DatabaseTrace, register_database_client +from newrelic.api.function_trace import FunctionTrace +from newrelic.common.object_names import callable_name +from newrelic.common.object_wrapper import ObjectProxy, wrap_object +from newrelic.hooks.database_dbapi2 import DEFAULT + + +class AsyncCursorWrapper(ObjectProxy): + def __init__(self, cursor, dbapi2_module, connect_params, cursor_params): + super(AsyncCursorWrapper, self).__init__(cursor) + self._nr_dbapi2_module = dbapi2_module + self._nr_connect_params = connect_params + self._nr_cursor_params = cursor_params + + async def execute(self, sql, parameters=DEFAULT, *args, **kwargs): + if parameters is not DEFAULT: + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=self._nr_cursor_params, + sql_parameters=parameters, + execute_params=(args, kwargs), + source=self.__wrapped__.execute, + ): + return await self.__wrapped__.execute(sql, parameters, *args, **kwargs) + else: + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=self._nr_cursor_params, + execute_params=(args, kwargs), + source=self.__wrapped__.execute, + ): + return await self.__wrapped__.execute(sql, **kwargs) + + async def executemany(self, sql, seq_of_parameters): + try: + seq_of_parameters = list(seq_of_parameters) + parameters = seq_of_parameters[0] + except (TypeError, IndexError): + parameters = DEFAULT + if parameters is not DEFAULT: + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=self._nr_cursor_params, + sql_parameters=parameters, + source=self.__wrapped__.executemany, + ): + return await self.__wrapped__.executemany(sql, seq_of_parameters) + else: + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=self._nr_cursor_params, + source=self.__wrapped__.executemany, + ): + return await self.__wrapped__.executemany(sql, seq_of_parameters) + + async def callproc(self, procname, parameters=DEFAULT): + with DatabaseTrace( + sql="CALL %s" % procname, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.callproc, + ): + if parameters is not DEFAULT: + return await self.__wrapped__.callproc(procname, parameters) + else: + return await self.__wrapped__.callproc(procname) + + def __aiter__(self): + return self.__wrapped__.__aiter__() + + async def __aenter__(self): + await self.__wrapped__.__aenter__() + return self + + async def __aexit__(self, exc=None, val=None, tb=None): + return await self.__wrapped__.__aexit__(exc, val, tb) + + +class AsyncConnectionWrapper(ObjectProxy): + + __cursor_wrapper__ = AsyncCursorWrapper + + def __init__(self, connection, dbapi2_module, connect_params): + super(AsyncConnectionWrapper, self).__init__(connection) + self._nr_dbapi2_module = dbapi2_module + self._nr_connect_params = connect_params + + def cursor(self, *args, **kwargs): + return self.__cursor_wrapper__( + self.__wrapped__.cursor(*args, **kwargs), self._nr_dbapi2_module, self._nr_connect_params, (args, kwargs) + ) + + async def commit(self): + with DatabaseTrace( + sql="COMMIT", + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.commit, + ): + return await self.__wrapped__.commit() + + async def rollback(self): + with DatabaseTrace( + sql="ROLLBACK", + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.rollback, + ): + return await self.__wrapped__.rollback() + + async def __aenter__(self): + await self.__wrapped__.__aenter__() + return self + + async def __aexit__(self, exc=None, val=None, tb=None): + return await self.__wrapped__.__aexit__(exc, val, tb) + + +class AsyncConnectionFactory(ObjectProxy): + + __connection_wrapper__ = AsyncConnectionWrapper + + def __init__(self, connect, dbapi2_module): + super(AsyncConnectionFactory, self).__init__(connect) + self._nr_dbapi2_module = dbapi2_module + + async def __call__(self, *args, **kwargs): + rollup = ["Datastore/all", "Datastore/%s/all" % self._nr_dbapi2_module._nr_database_product] + + with FunctionTrace(name=callable_name(self.__wrapped__), terminal=True, rollup=rollup, source=self.__wrapped__): + connection = await self.__wrapped__(*args, **kwargs) + return self.__connection_wrapper__(connection, self._nr_dbapi2_module, (args, kwargs)) + + +def instrument(module): + register_database_client(module, "DBAPI2", "single") + + wrap_object(module, "connect", AsyncConnectionFactory, (module,)) diff --git a/newrelic/hooks/database_psycopg.py b/newrelic/hooks/database_psycopg.py new file mode 100644 index 000000000..a392ff7e1 --- /dev/null +++ b/newrelic/hooks/database_psycopg.py @@ -0,0 +1,514 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect +import os + +from newrelic.api.database_trace import DatabaseTrace, register_database_client +from newrelic.api.function_trace import FunctionTrace +from newrelic.common.object_names import callable_name +from newrelic.common.object_wrapper import ( + ObjectProxy, + wrap_function_wrapper, + wrap_object, +) +from newrelic.hooks.database_dbapi2 import DEFAULT +from newrelic.hooks.database_dbapi2 import ConnectionFactory as DBAPI2ConnectionFactory +from newrelic.hooks.database_dbapi2 import ConnectionWrapper as DBAPI2ConnectionWrapper +from newrelic.hooks.database_dbapi2 import CursorWrapper as DBAPI2CursorWrapper +from newrelic.hooks.database_dbapi2_async import ( + AsyncConnectionFactory as DBAPI2AsyncConnectionFactory, +) +from newrelic.hooks.database_dbapi2_async import ( + AsyncConnectionWrapper as DBAPI2AsyncConnectionWrapper, +) +from newrelic.hooks.database_dbapi2_async import ( + AsyncCursorWrapper as DBAPI2AsyncCursorWrapper, +) + +try: + from urllib import unquote +except ImportError: + from urllib.parse import unquote +try: + from urlparse import parse_qsl +except ImportError: + from urllib.parse import parse_qsl + +from newrelic.packages.urllib3 import util as ul3_util + +# These functions return True if a non-default connection or cursor class is +# used. If the default connection and cursor are used without any unknown +# arguments, we can safely drop all cursor parameters to generate explain +# plans. Explain plans do not work with named cursors, so dropping the name +# allows explain plans to continue to function. +PsycopgConnection = None +PsycopgAsyncConnection = None + + +def should_preserve_connection_args(self, conninfo="", cursor_factory=None, **kwargs): + try: + if cursor_factory: + return True + + return self._nr_last_object.__self__ not in (PsycopgConnection, PsycopgAsyncConnection) + except Exception: + pass + + return False + + +def should_preserve_cursor_args( + name=None, binary=False, row_factory=None, scrollable=None, withhold=False, *args, **kwargs +): + return bool(args or kwargs) + + +class CursorWrapper(DBAPI2CursorWrapper): + def __enter__(self): + self.__wrapped__.__enter__() + + # Must return a reference to self as otherwise will be + # returning the inner cursor object. If 'as' is used + # with the 'with' statement this will mean no longer + # using the wrapped cursor object and nothing will be + # tracked. + + return self + + def execute(self, sql, parameters=DEFAULT, *args, **kwargs): + if hasattr(sql, "as_string"): + sql = sql.as_string(self) + + return super(CursorWrapper, self).execute(sql, parameters, *args, **kwargs) + + def executemany(self, sql, seq_of_parameters): + if hasattr(sql, "as_string"): + sql = sql.as_string(self) + + return super(CursorWrapper, self).executemany(sql, seq_of_parameters) + + +class ConnectionSaveParamsWrapper(DBAPI2ConnectionWrapper): + + __cursor_wrapper__ = CursorWrapper + + def execute(self, sql, parameters=DEFAULT, *args, **kwargs): + if hasattr(sql, "as_string"): + sql = sql.as_string(self) + + if parameters is not DEFAULT: + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=None, + sql_parameters=parameters, + execute_params=(args, kwargs), + source=self.__wrapped__.execute, + ): + cursor = self.__wrapped__.execute(sql, parameters, *args, **kwargs) + else: + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=None, + sql_parameters=None, + execute_params=(args, kwargs), + source=self.__wrapped__.execute, + ): + cursor = self.__wrapped__.execute(sql, **kwargs) + + return self.__cursor_wrapper__(cursor, self._nr_dbapi2_module, self._nr_connect_params, (args, kwargs)) + + def __enter__(self): + name = callable_name(self.__wrapped__.__enter__) + with FunctionTrace(name, source=self.__wrapped__.__enter__): + self.__wrapped__.__enter__() + + # Must return a reference to self as otherwise will be + # returning the inner connection object. If 'as' is used + # with the 'with' statement this will mean no longer + # using the wrapped connection object and nothing will be + # tracked. + + return self + + def __exit__(self, exc, value, tb): + name = callable_name(self.__wrapped__.__exit__) + with FunctionTrace(name, source=self.__wrapped__.__exit__): + if exc is None: + with DatabaseTrace( + sql="COMMIT", + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.__exit__, + ): + return self.__wrapped__.__exit__(exc, value, tb) + else: + with DatabaseTrace( + sql="ROLLBACK", + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.__exit__, + ): + return self.__wrapped__.__exit__(exc, value, tb) + + +# This connection wrapper does not save cursor parameters for explain plans. It +# is only used for the default connection class. +class ConnectionWrapper(ConnectionSaveParamsWrapper): + def cursor(self, *args, **kwargs): + # If any unknown cursor params are detected or a cursor factory is + # used, store params for explain plans later. + if should_preserve_cursor_args(*args, **kwargs): + cursor_params = (args, kwargs) + else: + cursor_params = None + + return self.__cursor_wrapper__( + self.__wrapped__.cursor(*args, **kwargs), self._nr_dbapi2_module, self._nr_connect_params, cursor_params + ) + + +class ConnectionFactory(DBAPI2ConnectionFactory): + + __connection_wrapper__ = ConnectionWrapper + + def __call__(self, *args, **kwargs): + if should_preserve_connection_args(self, *args, **kwargs): + self.__connection_wrapper__ = ConnectionSaveParamsWrapper + + return super(ConnectionFactory, self).__call__(*args, **kwargs) + + +# Due to our explain plan feature requiring the use of synchronous DBAPI2 compliant modules, we can't support the use +# of AsyncConnection or any derivative to retrieve explain plans. There's also no longer a connection_factory argument +# on psycopg.connect() that was present in psycopg2, which affects the logic that attempted to use the same Connection +# class for explain plans. This is only relevant for users subclassing psycopg.Connection or psycopg.AsyncConnection. +# With no easy way to preserve or use the same class, and given that using the AsyncConnection class would never +# function with our explain plan feature, we always attempt to use the DBAPI2 compliant method of instantiating a new +# Connection, that being psycopg.connect(). That function is an alias of psycopg.Connection.connect(), and returns an +# instance of psycopg.Connection. +# +# Additionally, care is taken to preserve the cursor_factory argument and to use custom cursor classes. However, with +# AsyncConnection the compatible cursors will be async, which will not be compatible with the explain plan's +# synchronous Connection instance. To avoid this issue, we refuse to preserve the cursor_factory arument for +# AsyncConnection and instead fall back to using the default psycopg.Cursor class. +# +# This should allow the largest number of users to still have explain plans function for their applications, whether or +# not they are using AsyncConnection or custom classes. The issue of using a synchronous Connection object in an async +# application should be somewhat mitigated by the fact that our explain plan feature functions on the harvest thread. + + +class AsyncCursorWrapper(DBAPI2AsyncCursorWrapper): + def __init__(self, cursor, dbapi2_module, connect_params, cursor_params): + # Remove async cursor_factory so it doesn't interfere with sync Connections for explain plans + args, kwargs = connect_params + kwargs = dict(kwargs) + kwargs.pop("cursor_factory", None) + + super().__init__(cursor, dbapi2_module, (args, kwargs), cursor_params) + + async def __aenter__(self): + await self.__wrapped__.__aenter__() + + # Must return a reference to self as otherwise will be + # returning the inner cursor object. If 'as' is used + # with the 'with' statement this will mean no longer + # using the wrapped cursor object and nothing will be + # tracked. + + return self + + async def execute(self, sql, parameters=DEFAULT, *args, **kwargs): + if hasattr(sql, "as_string"): + sql = sql.as_string(self) + + return await super(AsyncCursorWrapper, self).execute(sql, parameters, *args, **kwargs) + + async def executemany(self, sql, seq_of_parameters): + if hasattr(sql, "as_string"): + sql = sql.as_string(self) + + return await super(AsyncCursorWrapper, self).executemany(sql, seq_of_parameters) + + +class AsyncConnectionSaveParamsWrapper(DBAPI2AsyncConnectionWrapper): + + __cursor_wrapper__ = AsyncCursorWrapper + + async def execute(self, sql, parameters=DEFAULT, *args, **kwargs): + if hasattr(sql, "as_string"): + sql = sql.as_string(self) + + if parameters is not DEFAULT: + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=None, + sql_parameters=parameters, + execute_params=(args, kwargs), + source=self.__wrapped__.execute, + ): + cursor = await self.__wrapped__.execute(sql, parameters, *args, **kwargs) + else: + with DatabaseTrace( + sql=sql, + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + cursor_params=None, + sql_parameters=None, + execute_params=(args, kwargs), + source=self.__wrapped__.execute, + ): + cursor = await self.__wrapped__.execute(sql, **kwargs) + + return self.__cursor_wrapper__(cursor, self._nr_dbapi2_module, self._nr_connect_params, (args, kwargs)) + + async def __aenter__(self): + name = callable_name(self.__wrapped__.__aenter__) + with FunctionTrace(name, source=self.__wrapped__.__aenter__): + await self.__wrapped__.__aenter__() + + # Must return a reference to self as otherwise will be + # returning the inner connection object. If 'as' is used + # with the 'with' statement this will mean no longer + # using the wrapped connection object and nothing will be + # tracked. + + return self + + async def __aexit__(self, exc, value, tb): + name = callable_name(self.__wrapped__.__aexit__) + with FunctionTrace(name, source=self.__wrapped__.__aexit__): + if exc is None: + with DatabaseTrace( + sql="COMMIT", + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.__aexit__, + ): + return await self.__wrapped__.__aexit__(exc, value, tb) + else: + with DatabaseTrace( + sql="ROLLBACK", + dbapi2_module=self._nr_dbapi2_module, + connect_params=self._nr_connect_params, + source=self.__wrapped__.__aexit__, + ): + return await self.__wrapped__.__aexit__(exc, value, tb) + + +# This connection wrapper does not save cursor parameters for explain plans. It +# is only used for the default connection class. +class AsyncConnectionWrapper(AsyncConnectionSaveParamsWrapper): + def cursor(self, *args, **kwargs): + # If any unknown cursor params are detected or a cursor factory is + # used, store params for explain plans later. + if should_preserve_cursor_args(*args, **kwargs): + cursor_params = (args, kwargs) + else: + cursor_params = None + + return self.__cursor_wrapper__( + self.__wrapped__.cursor(*args, **kwargs), self._nr_dbapi2_module, self._nr_connect_params, cursor_params + ) + + +class AsyncConnectionFactory(DBAPI2AsyncConnectionFactory): + + __connection_wrapper__ = AsyncConnectionWrapper + + async def __call__(self, *args, **kwargs): + if should_preserve_connection_args(self, *args, **kwargs): + self.__connection_wrapper__ = AsyncConnectionSaveParamsWrapper + + return await super(AsyncConnectionFactory, self).__call__(*args, **kwargs) + + +def instance_info(args, kwargs): + + p_host, p_hostaddr, p_port, p_dbname = _parse_connect_params(args, kwargs) + host, port, db_name = _add_defaults(p_host, p_hostaddr, p_port, p_dbname) + + return (host, port, db_name) + + +def _parse_connect_params(args, kwargs): + def _bind_params(conninfo=None, *args, **kwargs): + return conninfo + + dsn = _bind_params(*args, **kwargs) + + try: + if dsn and (dsn.startswith("postgres://") or dsn.startswith("postgresql://")): + + # Parse dsn as URI + # + # According to PGSQL, connect URIs are in the format of RFC 3896 + # https://www.postgresql.org/docs/9.5/static/libpq-connect.html + + parsed_uri = ul3_util.parse_url(dsn) + + host = parsed_uri.hostname or None + host = host and unquote(host) + + # ipv6 brackets [] are contained in the URI hostname + # and should be removed + host = host and host.strip("[]") + + port = parsed_uri.port + + db_name = parsed_uri.path + db_name = db_name and db_name.lstrip("/") + db_name = db_name or None + + query = parsed_uri.query or "" + qp = dict(parse_qsl(query)) + + # Query parameters override hierarchical values in URI. + + host = qp.get("host") or host or None + hostaddr = qp.get("hostaddr") + port = qp.get("port") or port + db_name = qp.get("dbname") or db_name + + elif dsn: + + # Parse dsn as a key-value connection string + + kv = dict([pair.split("=", 2) for pair in dsn.split()]) + host = kv.get("host") + hostaddr = kv.get("hostaddr") + port = kv.get("port") + db_name = kv.get("dbname") + + else: + + # No dsn, so get the instance info from keyword arguments. + + host = kwargs.get("host") + hostaddr = kwargs.get("hostaddr") + port = kwargs.get("port") + db_name = kwargs.get("dbname") + + # Ensure non-None values are strings. + + (host, hostaddr, port, db_name) = [str(s) if s is not None else s for s in (host, hostaddr, port, db_name)] + + except Exception: + host = "unknown" + hostaddr = "unknown" + port = "unknown" + db_name = "unknown" + + return (host, hostaddr, port, db_name) + + +def _add_defaults(parsed_host, parsed_hostaddr, parsed_port, parsed_database): + + # ENV variables set the default values + + parsed_host = parsed_host or os.environ.get("PGHOST") + parsed_hostaddr = parsed_hostaddr or os.environ.get("PGHOSTADDR") + parsed_port = parsed_port or os.environ.get("PGPORT") + database = parsed_database or os.environ.get("PGDATABASE") or "default" + + # If hostaddr is present, we use that, since host is used for auth only. + + parsed_host = parsed_hostaddr or parsed_host + + if parsed_host is None: + host = "localhost" + port = "default" + elif parsed_host.startswith("/"): + host = "localhost" + port = "%s/.s.PGSQL.%s" % (parsed_host, parsed_port or "5432") + else: + host = parsed_host + port = parsed_port or "5432" + + return (host, port, database) + + +def wrapper_psycopg_as_string(wrapped, instance, args, kwargs): + def _bind_params(context, *args, **kwargs): + return context, args, kwargs + + context, _args, _kwargs = _bind_params(*args, **kwargs) + + # Unwrap the context for string conversion since psycopg uses duck typing + # and a TypeError will be raised if a wrapper is used. + if hasattr(context, "__wrapped__"): + context = context.__wrapped__ + + return wrapped(context, *_args, **_kwargs) + + +def wrap_Connection_connect(module): + def _wrap_Connection_connect(wrapped, instance, args, kwargs): + return ConnectionFactory(wrapped, module)(*args, **kwargs) + + return _wrap_Connection_connect + + +def wrap_AsyncConnection_connect(module): + async def _wrap_AsyncConnection_connect(wrapped, instance, args, kwargs): + return await AsyncConnectionFactory(wrapped, module)(*args, **kwargs) + + return _wrap_AsyncConnection_connect + + +def instrument_psycopg(module): + global PsycopgConnection, PsycopgAsyncConnection + + PsycopgConnection = module.Connection + PsycopgAsyncConnection = module.AsyncConnection + + register_database_client( + module, + database_product="Postgres", + quoting_style="single+dollar", + explain_query="explain", + explain_stmts=("select", "insert", "update", "delete"), + instance_info=instance_info, + ) + + if hasattr(module, "Connection"): + if hasattr(module.Connection, "connect"): + if not isinstance(module.Connection.connect, ObjectProxy): + wrap_function_wrapper(module, "Connection.connect", wrap_Connection_connect(module)) + + if hasattr(module, "connect"): + if not isinstance(module.connect, ObjectProxy): + wrap_object(module, "connect", ConnectionFactory, (module,)) + + if hasattr(module, "AsyncConnection") and hasattr(module.AsyncConnection, "connect"): + if not isinstance(module.AsyncConnection.connect, ObjectProxy): + wrap_function_wrapper(module, "AsyncConnection.connect", wrap_AsyncConnection_connect(module)) + + +def instrument_psycopg_sql(module): + if hasattr(module, "Composable") and hasattr(module.Composable, "as_string"): + for name, cls in inspect.getmembers(module): + if not inspect.isclass(cls): + continue + + if not issubclass(cls, module.Composable): + continue + + wrap_function_wrapper(module, name + ".as_string", wrapper_psycopg_as_string) diff --git a/tests/datastore_psycopg/conftest.py b/tests/datastore_psycopg/conftest.py new file mode 100644 index 000000000..4a066086e --- /dev/null +++ b/tests/datastore_psycopg/conftest.py @@ -0,0 +1,116 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from testing_support.db_settings import postgresql_settings +from testing_support.fixture.event_loop import event_loop as loop # noqa: F401 +from testing_support.fixtures import ( # noqa: F401; pylint: disable=W0611 + collector_agent_registration_fixture, + collector_available_fixture, +) + +_default_settings = { + "transaction_tracer.explain_threshold": 0.0, + "transaction_tracer.transaction_threshold": 0.0, + "transaction_tracer.stack_trace_threshold": 0.0, + "debug.log_data_collector_payloads": True, + "debug.record_transaction_failure": True, + "debug.log_explain_plan_queries": True, +} + +collector_agent_registration = collector_agent_registration_fixture( + app_name="Python Agent Test (datastore_psycopg)", + default_settings=_default_settings, + linked_applications=["Python Agent Test (datastore)"], +) + + +DB_MULTIPLE_SETTINGS = postgresql_settings() +DB_SETTINGS = DB_MULTIPLE_SETTINGS[0] + + +@pytest.fixture(scope="session", params=["sync", "async"]) +def is_async(request): + return request.param == "async" + + +@pytest.fixture(scope="function") +def connection(loop, is_async): + import psycopg + + if not is_async: + connection = psycopg.connect( + dbname=DB_SETTINGS["name"], + user=DB_SETTINGS["user"], + password=DB_SETTINGS["password"], + host=DB_SETTINGS["host"], + port=DB_SETTINGS["port"], + ) + else: + connection = loop.run_until_complete( + psycopg.AsyncConnection.connect( + dbname=DB_SETTINGS["name"], + user=DB_SETTINGS["user"], + password=DB_SETTINGS["password"], + host=DB_SETTINGS["host"], + port=DB_SETTINGS["port"], + ) + ) + + yield connection + loop.run_until_complete(maybe_await(connection.close())) + + +@pytest.fixture(scope="function") +def multiple_connections(loop, is_async): + import psycopg + + if len(DB_MULTIPLE_SETTINGS) < 2: + pytest.skip(reason="Test environment not configured with multiple databases.") + + connections = [] + for DB_SETTINGS in DB_MULTIPLE_SETTINGS: + if not is_async: + connections.append( + psycopg.connect( + dbname=DB_SETTINGS["name"], + user=DB_SETTINGS["user"], + password=DB_SETTINGS["password"], + host=DB_SETTINGS["host"], + port=DB_SETTINGS["port"], + ) + ) + else: + connections.append( + loop.run_until_complete( + psycopg.AsyncConnection.connect( + dbname=DB_SETTINGS["name"], + user=DB_SETTINGS["user"], + password=DB_SETTINGS["password"], + host=DB_SETTINGS["host"], + port=DB_SETTINGS["port"], + ) + ) + ) + + yield connections + for connection in connections: + loop.run_until_complete(maybe_await(connection.close())) + + +async def maybe_await(value): + if hasattr(value, "__await__"): + return await value + + return value diff --git a/tests/datastore_psycopg/test_as_string.py b/tests/datastore_psycopg/test_as_string.py new file mode 100644 index 000000000..675ce2c90 --- /dev/null +++ b/tests/datastore_psycopg/test_as_string.py @@ -0,0 +1,112 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +try: + from psycopg import sql +except ImportError: + sql = None + +from newrelic.api.background_task import background_task + + +@background_task() +def test_as_string_1(connection): + + # All of these are similar to those described in the doctests in + # psycopg/lib/sql.py + + comp = sql.Composed([sql.SQL("insert into "), sql.Identifier("table")]) + result = comp.as_string(connection) + assert result == 'insert into "table"' + + +@background_task() +def test_as_string_2(connection): + fields = sql.Identifier("foo") + sql.Identifier("bar") # a Composed + result = fields.join(", ").as_string(connection) + assert result == '"foo", "bar"' + + +@background_task() +def test_as_string_3(connection): + query = sql.SQL("select {0} from {1}").format( + sql.SQL(", ").join([sql.Identifier("foo"), sql.Identifier("bar")]), sql.Identifier("table") + ) + result = query.as_string(connection) + assert result == 'select "foo", "bar" from "table"' + + +@background_task() +def test_as_string_4(connection): + result = ( + sql.SQL("select * from {0} where {1} = %s") + .format(sql.Identifier("people"), sql.Identifier("id")) + .as_string(connection) + ) + assert result == 'select * from "people" where "id" = %s' + + +@background_task() +def test_as_string_5(connection): + result = ( + sql.SQL("select * from {tbl} where {pkey} = %s") + .format(tbl=sql.Identifier("people"), pkey=sql.Identifier("id")) + .as_string(connection) + ) + assert result == 'select * from "people" where "id" = %s' + + +@background_task() +def test_as_string_6(connection): + snip = sql.SQL(", ").join(sql.Identifier(n) for n in ["foo", "bar", "baz"]) + result = snip.as_string(connection) + assert result == '"foo", "bar", "baz"' + + +@background_task() +def test_as_string_7(connection): + t1 = sql.Identifier("foo") + t2 = sql.Identifier("ba'r") + t3 = sql.Identifier('ba"z') + result = sql.SQL(", ").join([t1, t2, t3]).as_string(connection) + assert result == '"foo", "ba\'r", "ba""z"' + + +@background_task() +def test_as_string_8(connection): + s1 = sql.Literal("foo") + s2 = sql.Literal("ba'r") + s3 = sql.Literal(42) + result = sql.SQL(", ").join([s1, s2, s3]).as_string(connection) + assert result == "'foo', 'ba''r', 42" + + +@background_task() +def test_as_string_9(connection): + names = ["foo", "bar", "baz"] + q1 = sql.SQL("insert into table ({0}) values ({1})").format( + sql.SQL(", ").join(map(sql.Identifier, names)), sql.SQL(", ").join(sql.Placeholder() * len(names)) + ) + result = q1.as_string(connection) + assert result == 'insert into table ("foo", "bar", "baz") values (%s, %s, %s)' + + +@background_task() +def test_as_string_10(connection): + names = ["foo", "bar", "baz"] + q2 = sql.SQL("insert into table ({0}) values ({1})").format( + sql.SQL(", ").join(map(sql.Identifier, names)), sql.SQL(", ").join(map(sql.Placeholder, names)) + ) + result = q2.as_string(connection) + assert result == 'insert into table ("foo", "bar", "baz") ' "values (%(foo)s, %(bar)s, %(baz)s)" diff --git a/tests/datastore_psycopg/test_connection.py b/tests/datastore_psycopg/test_connection.py new file mode 100644 index 000000000..f0d9d0026 --- /dev/null +++ b/tests/datastore_psycopg/test_connection.py @@ -0,0 +1,221 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import psycopg +import pytest + +try: + from psycopg import sql +except ImportError: + sql = None + +from conftest import DB_SETTINGS, maybe_await +from testing_support.fixtures import override_application_settings +from testing_support.util import instance_hostname +from testing_support.validators.validate_database_trace_inputs import ( + validate_database_trace_inputs, +) +from testing_support.validators.validate_transaction_metrics import ( + validate_transaction_metrics, +) + +from newrelic.api.background_task import background_task + +# Settings +_enable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": True, +} +_disable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": False, +} + + +# Metrics +_base_scoped_metrics = ( + ("Datastore/operation/Postgres/commit", 2), + ("Datastore/operation/Postgres/create", 2), + ("Datastore/operation/Postgres/drop", 1), + ("Datastore/operation/Postgres/rollback", 1), + ("Datastore/statement/Postgres/%s/call" % DB_SETTINGS["procedure_name"], 1), + ("Datastore/statement/Postgres/%s/delete" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/insert" % DB_SETTINGS["table_name"], 3), + ("Datastore/statement/Postgres/%s/select" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/update" % DB_SETTINGS["table_name"], 1), +) + +_base_rollup_metrics = ( + ("Datastore/all", 14), + ("Datastore/allOther", 14), + ("Datastore/Postgres/all", 14), + ("Datastore/Postgres/allOther", 14), + ("Datastore/operation/Postgres/call", 1), + ("Datastore/operation/Postgres/commit", 2), + ("Datastore/operation/Postgres/create", 2), + ("Datastore/operation/Postgres/delete", 1), + ("Datastore/operation/Postgres/drop", 1), + ("Datastore/operation/Postgres/insert", 3), + ("Datastore/operation/Postgres/rollback", 1), + ("Datastore/operation/Postgres/select", 1), + ("Datastore/operation/Postgres/update", 1), + ("Datastore/statement/Postgres/%s/call" % DB_SETTINGS["procedure_name"], 1), + ("Datastore/statement/Postgres/%s/delete" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/insert" % DB_SETTINGS["table_name"], 3), + ("Datastore/statement/Postgres/%s/select" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/update" % DB_SETTINGS["table_name"], 1), +) + +_disable_scoped_metrics = list(_base_scoped_metrics) +_disable_rollup_metrics = list(_base_rollup_metrics) + +_enable_scoped_metrics = list(_base_scoped_metrics) +_enable_rollup_metrics = list(_base_rollup_metrics) + +_host = instance_hostname(DB_SETTINGS["host"]) +_port = DB_SETTINGS["port"] + +_instance_metric_name = "Datastore/instance/Postgres/%s/%s" % (_host, _port) + +_enable_rollup_metrics.append((_instance_metric_name, 13)) + +_disable_rollup_metrics.append((_instance_metric_name, None)) + + +# Query +async def _execute(connection, row_type, wrapper): + sql = "drop table if exists %s" % DB_SETTINGS["table_name"] + await maybe_await(connection.execute(wrapper(sql))) + + sql = "create table %s (a integer, b real, c text)" % DB_SETTINGS["table_name"] + await maybe_await(connection.execute(wrapper(sql))) + + for params in [(1, 1.0, "1.0"), (2, 2.2, "2.2"), (3, 3.3, "3.3")]: + sql = "insert into %s " % DB_SETTINGS["table_name"] + "values (%s, %s, %s)" + await maybe_await(connection.execute(wrapper(sql), params)) + + sql = "select * from %s" % DB_SETTINGS["table_name"] + cursor = await maybe_await(connection.execute(wrapper(sql))) + + if hasattr(cursor, "__aiter__"): + async for row in cursor: + assert isinstance(row, row_type) + else: + for row in cursor: + assert isinstance(row, row_type) + + # Reuse cursor to ensure it is also wrapped + sql = "update %s" % DB_SETTINGS["table_name"] + " set a=%s, b=%s, c=%s where a=%s" + params = (4, 4.0, "4.0", 1) + await maybe_await(cursor.execute(wrapper(sql), params)) + + sql = "delete from %s where a=2" % DB_SETTINGS["table_name"] + await maybe_await(connection.execute(wrapper(sql))) + + await maybe_await(connection.commit()) + + await maybe_await( + connection.execute( + "create or replace procedure %s() \nlanguage plpgsql as $$ begin perform now(); end; $$" + % DB_SETTINGS["procedure_name"] + ) + ) + await maybe_await(connection.execute("call %s()" % DB_SETTINGS["procedure_name"])) + + await maybe_await(connection.rollback()) + await maybe_await(connection.commit()) + + +async def _exercise_db(is_async, row_type=tuple, wrapper=str): + # Connect here instead of using the fixture to capture the FunctionTrace around connect + if not is_async: + connection = psycopg.connect( + dbname=DB_SETTINGS["name"], + user=DB_SETTINGS["user"], + password=DB_SETTINGS["password"], + host=DB_SETTINGS["host"], + port=DB_SETTINGS["port"], + ) + else: + connection = await psycopg.AsyncConnection.connect( + dbname=DB_SETTINGS["name"], + user=DB_SETTINGS["user"], + password=DB_SETTINGS["password"], + host=DB_SETTINGS["host"], + port=DB_SETTINGS["port"], + ) + + try: + await _execute(connection, row_type, wrapper) + finally: + await maybe_await(connection.close()) + + +_test_matrix = [ + "wrapper", + [ + str, + sql.SQL, + lambda q: sql.Composed([sql.SQL(q)]), + ], +] + + +# Tests +@pytest.mark.parametrize(*_test_matrix) +@override_application_settings(_enable_instance_settings) +def test_execute_via_connection_enable_instance(loop, is_async, wrapper): + if not is_async: + connect_metric = ("Function/psycopg:Connection.connect", 1) + else: + connect_metric = ("Function/psycopg:AsyncConnection.connect", 1) + + _scoped_metrics = list(_enable_scoped_metrics) + _scoped_metrics.append(connect_metric) + + @validate_transaction_metrics( + "test_execute_via_connection_enable_instance", + scoped_metrics=_scoped_metrics, + rollup_metrics=_enable_rollup_metrics, + background_task=True, + ) + @validate_database_trace_inputs(sql_parameters_type=tuple) + @background_task(name="test_execute_via_connection_enable_instance") + def test(): + loop.run_until_complete(_exercise_db(is_async, row_type=tuple, wrapper=wrapper)) + + test() + + +@pytest.mark.parametrize(*_test_matrix) +@override_application_settings(_disable_instance_settings) +def test_execute_via_connection_disable_instance(loop, is_async, wrapper): + if not is_async: + connect_metric = ("Function/psycopg:Connection.connect", 1) + else: + connect_metric = ("Function/psycopg:AsyncConnection.connect", 1) + + _scoped_metrics = list(_disable_scoped_metrics) + _scoped_metrics.append(connect_metric) + + @validate_transaction_metrics( + "test_execute_via_connection_disable_instance", + scoped_metrics=_scoped_metrics, + rollup_metrics=_disable_rollup_metrics, + background_task=True, + ) + @validate_database_trace_inputs(sql_parameters_type=tuple) + @background_task(name="test_execute_via_connection_disable_instance") + def test(): + loop.run_until_complete(_exercise_db(is_async, row_type=tuple, wrapper=wrapper)) + + test() diff --git a/tests/datastore_psycopg/test_cursor.py b/tests/datastore_psycopg/test_cursor.py new file mode 100644 index 000000000..3f9330012 --- /dev/null +++ b/tests/datastore_psycopg/test_cursor.py @@ -0,0 +1,233 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import psycopg +import pytest + +try: + from psycopg import sql +except ImportError: + sql = None + +from conftest import DB_SETTINGS, maybe_await +from testing_support.fixtures import override_application_settings +from testing_support.util import instance_hostname +from testing_support.validators.validate_database_trace_inputs import ( + validate_database_trace_inputs, +) +from testing_support.validators.validate_transaction_metrics import ( + validate_transaction_metrics, +) + +from newrelic.api.background_task import background_task + +# Settings +_enable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": True, +} +_disable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": False, +} + + +# Metrics +_base_scoped_metrics = ( + ("Datastore/operation/Postgres/commit", 2), + ("Datastore/operation/Postgres/create", 2), + ("Datastore/operation/Postgres/drop", 1), + ("Datastore/operation/Postgres/rollback", 1), + ("Datastore/statement/Postgres/%s/call" % DB_SETTINGS["procedure_name"], 1), + ("Datastore/statement/Postgres/%s/delete" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/insert" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/select" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/update" % DB_SETTINGS["table_name"], 1), +) + +_base_rollup_metrics = ( + ("Datastore/all", 11), + ("Datastore/allOther", 11), + ("Datastore/Postgres/all", 11), + ("Datastore/Postgres/allOther", 11), + ("Datastore/operation/Postgres/call", 1), + ("Datastore/operation/Postgres/commit", 2), + ("Datastore/operation/Postgres/create", 2), + ("Datastore/operation/Postgres/delete", 1), + ("Datastore/operation/Postgres/drop", 1), + ("Datastore/operation/Postgres/insert", 1), + ("Datastore/operation/Postgres/rollback", 1), + ("Datastore/operation/Postgres/select", 1), + ("Datastore/operation/Postgres/update", 1), + ("Datastore/statement/Postgres/%s/call" % DB_SETTINGS["procedure_name"], 1), + ("Datastore/statement/Postgres/%s/delete" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/insert" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/select" % DB_SETTINGS["table_name"], 1), + ("Datastore/statement/Postgres/%s/update" % DB_SETTINGS["table_name"], 1), +) + +_disable_scoped_metrics = list(_base_scoped_metrics) +_disable_rollup_metrics = list(_base_rollup_metrics) + +_enable_scoped_metrics = list(_base_scoped_metrics) +_enable_rollup_metrics = list(_base_rollup_metrics) + +_host = instance_hostname(DB_SETTINGS["host"]) +_port = DB_SETTINGS["port"] + +_instance_metric_name = "Datastore/instance/Postgres/%s/%s" % (_host, _port) + +_enable_rollup_metrics.append((_instance_metric_name, 11)) + +_disable_rollup_metrics.append((_instance_metric_name, None)) + + +# Query +async def _execute(connection, cursor, row_type, wrapper): + sql = "drop table if exists %s" % DB_SETTINGS["table_name"] + await maybe_await(cursor.execute(wrapper(sql))) + + sql = "create table %s (a integer, b real, c text)" % DB_SETTINGS["table_name"] + await maybe_await(cursor.execute(wrapper(sql))) + + sql = "insert into %s " % DB_SETTINGS["table_name"] + "values (%s, %s, %s)" + params = [(1, 1.0, "1.0"), (2, 2.2, "2.2"), (3, 3.3, "3.3")] + await maybe_await(cursor.executemany(wrapper(sql), params)) + + sql = "select * from %s" % DB_SETTINGS["table_name"] + await maybe_await(cursor.execute(wrapper(sql))) + + if hasattr(cursor, "__aiter__"): + async for row in cursor: + assert isinstance(row, row_type) + else: + # Iterate on sync cursor + for row in cursor: + assert isinstance(row, row_type) + + sql = "update %s" % DB_SETTINGS["table_name"] + " set a=%s, b=%s, c=%s where a=%s" + params = (4, 4.0, "4.0", 1) + await maybe_await(cursor.execute(wrapper(sql), params)) + + sql = "delete from %s where a=2" % DB_SETTINGS["table_name"] + await maybe_await(cursor.execute(wrapper(sql))) + + await maybe_await(connection.commit()) + + await maybe_await( + cursor.execute( + "create or replace procedure %s() \nlanguage plpgsql as $$ begin perform now(); end; $$" + % DB_SETTINGS["procedure_name"] + ) + ) + await maybe_await(cursor.execute("call %s()" % DB_SETTINGS["procedure_name"])) + + await maybe_await(connection.rollback()) + await maybe_await(connection.commit()) + + +async def _exercise_db(connection, row_factory=None, use_cur_context=False, row_type=tuple, wrapper=str): + kwargs = {"row_factory": row_factory} if row_factory else {} + + try: + cursor = connection.cursor(**kwargs) + if use_cur_context: + if hasattr(cursor, "__aenter__"): + async with cursor: + await _execute(connection, cursor, row_type, wrapper) + else: + with cursor: + await _execute(connection, cursor, row_type, wrapper) + else: + await _execute(connection, cursor, row_type, wrapper) + finally: + await maybe_await(connection.close()) + + +_test_matrix = [ + "wrapper,use_cur_context", + [ + (str, False), + (str, True), + (sql.SQL, False), + (sql.SQL, True), + (lambda q: sql.Composed([sql.SQL(q)]), False), + (lambda q: sql.Composed([sql.SQL(q)]), True), + ], +] + + +# Tests +@pytest.mark.parametrize(*_test_matrix) +@override_application_settings(_enable_instance_settings) +@validate_transaction_metrics( + "test_cursor:test_execute_via_cursor_enable_instance", + scoped_metrics=_enable_scoped_metrics, + rollup_metrics=_enable_rollup_metrics, + background_task=True, +) +@validate_database_trace_inputs(sql_parameters_type=tuple) +@background_task() +def test_execute_via_cursor_enable_instance(loop, connection, wrapper, use_cur_context): + loop.run_until_complete(_exercise_db(connection, use_cur_context=use_cur_context, row_type=tuple, wrapper=wrapper)) + + +@pytest.mark.parametrize(*_test_matrix) +@override_application_settings(_disable_instance_settings) +@validate_transaction_metrics( + "test_cursor:test_execute_via_cursor_disable_instance", + scoped_metrics=_disable_scoped_metrics, + rollup_metrics=_disable_rollup_metrics, + background_task=True, +) +@validate_database_trace_inputs(sql_parameters_type=tuple) +@background_task() +def test_execute_via_cursor_disable_instance(loop, connection, wrapper, use_cur_context): + loop.run_until_complete(_exercise_db(connection, use_cur_context=use_cur_context, row_type=tuple, wrapper=wrapper)) + + +@pytest.mark.parametrize(*_test_matrix) +@override_application_settings(_enable_instance_settings) +@validate_transaction_metrics( + "test_cursor:test_execute_via_cursor_dict_enable_instance", + scoped_metrics=_enable_scoped_metrics, + rollup_metrics=_enable_rollup_metrics, + background_task=True, +) +@validate_database_trace_inputs(sql_parameters_type=tuple) +@background_task() +def test_execute_via_cursor_dict_enable_instance(loop, connection, wrapper, use_cur_context): + dict_factory = psycopg.rows.dict_row + loop.run_until_complete( + _exercise_db( + connection, row_factory=dict_factory, use_cur_context=use_cur_context, row_type=dict, wrapper=wrapper + ) + ) + + +@pytest.mark.parametrize(*_test_matrix) +@override_application_settings(_disable_instance_settings) +@validate_transaction_metrics( + "test_cursor:test_execute_via_cursor_dict_disable_instance", + scoped_metrics=_disable_scoped_metrics, + rollup_metrics=_disable_rollup_metrics, + background_task=True, +) +@validate_database_trace_inputs(sql_parameters_type=tuple) +@background_task() +def test_execute_via_cursor_dict_disable_instance(loop, connection, wrapper, use_cur_context): + dict_factory = psycopg.rows.dict_row + loop.run_until_complete( + _exercise_db( + connection, row_factory=dict_factory, use_cur_context=use_cur_context, row_type=dict, wrapper=wrapper + ) + ) diff --git a/tests/datastore_psycopg/test_database_instance_info.py b/tests/datastore_psycopg/test_database_instance_info.py new file mode 100644 index 000000000..1caa95047 --- /dev/null +++ b/tests/datastore_psycopg/test_database_instance_info.py @@ -0,0 +1,226 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from newrelic.hooks.database_psycopg import ( + _add_defaults, + _parse_connect_params, + instance_info, +) + + +def test_kwargs(): + connect_params = ((), {"dbname": "foo", "host": "1.2.3.4", "port": 1234}) + output = _parse_connect_params(*connect_params) + assert output == ("1.2.3.4", None, "1234", "foo") + + +def test_arg_str(): + connect_params = (("host=foobar port=9876",), {}) + output = _parse_connect_params(*connect_params) + assert output == ("foobar", None, "9876", None) + + +def test_bind_conninfo(): + connect_params = ((), {"conninfo": "host=foobar port=9876"}) + output = _parse_connect_params(*connect_params) + assert output == ("foobar", None, "9876", None) + + +def test_bind_conninfo_ignore_kwargs(): + connect_params = ((), {"conninfo": "host=foobar", "port": 1234}) + output = _parse_connect_params(*connect_params) + assert output == ("foobar", None, None, None) + + +def test_kwargs_str_for_port(): + connect_params = ((), {"dbname": "foo", "host": "1.2.3.4", "port": "1234"}) + output = _parse_connect_params(*connect_params) + assert output == ("1.2.3.4", None, "1234", "foo") + + +def test_arg_str_missing_port(): + connect_params = (("host=foobar",), {}) + output = _parse_connect_params(*connect_params) + assert output == ("foobar", None, None, None) + + +def test_arg_str_multiple_host(): + connect_params = (("host=foobar host=barbaz",), {}) + output = _parse_connect_params(*connect_params) + assert output == ("barbaz", None, None, None) + + +def test_arg_str_multiple_port(): + connect_params = (("port=5555 port=7777",), {}) + output = _parse_connect_params(*connect_params) + assert output == (None, None, "7777", None) + + +def test_arg_str_missing_host(): + connect_params = (("port=5555",), {}) + output = _parse_connect_params(*connect_params) + assert output == (None, None, "5555", None) + + +def test_arg_str_missing_host_and_port(): + connect_params = (("nothing=here",), {}) + output = _parse_connect_params(*connect_params) + assert output == (None, None, None, None) + + +def test_malformed_arg_str(): + connect_params = (("this_is_malformed",), {}) + output = _parse_connect_params(*connect_params) + assert output == ("unknown", "unknown", "unknown", "unknown") + + +def test_str_in_port_arg_str(): + connect_params = (("port=foobar",), {}) + output = _parse_connect_params(*connect_params) + assert output == (None, None, "foobar", None) + + +def test_host_and_hostaddr_in_arg_str(): + connect_params = (("host=foobar hostaddr=1.2.3.4",), {}) + output = _parse_connect_params(*connect_params) + assert output == ("foobar", "1.2.3.4", None, None) + + +def test_host_and_hostaddr_in_kwarg(): + connect_params = ((), {"host": "foobar", "hostaddr": "1.2.3.4"}) + output = _parse_connect_params(*connect_params) + assert output == ("foobar", "1.2.3.4", None, None) + + +def test_only_hostaddr_in_kwarg(): + connect_params = ((), {"hostaddr": "1.2.3.4"}) + output = _parse_connect_params(*connect_params) + assert output == (None, "1.2.3.4", None, None) + + +def test_only_hostaddr_in_arg_str(): + connect_params = (("hostaddr=1.2.3.4",), {}) + output = _parse_connect_params(*connect_params) + assert output == (None, "1.2.3.4", None, None) + + +def test_env_var_default_host(monkeypatch): + monkeypatch.setenv("PGHOST", "envfoo") + output = _add_defaults(None, None, "1234", "foo") + assert output == ("envfoo", "1234", "foo") + + +def test_env_var_default_hostaddr(monkeypatch): + monkeypatch.setenv("PGHOSTADDR", "1.2.3.4") + output = _add_defaults(None, None, "1234", "foo") + assert output == ("1.2.3.4", "1234", "foo") + + +def test_env_var_default_database(monkeypatch): + monkeypatch.setenv("PGDATABASE", "dbenvfoo") + output = _add_defaults("foo", None, "1234", None) + assert output == ("foo", "1234", "dbenvfoo") + + +def test_env_var_default_port(monkeypatch): + monkeypatch.setenv("PGPORT", "9999") + output = _add_defaults("foo", None, None, "bar") + assert output == ("foo", "9999", "bar") + + +@pytest.mark.parametrize( + "connect_params,expected", + [ + ((("postgresql://",), {}), ("localhost", "default", "default")), + ((("postgresql://localhost",), {}), ("localhost", "5432", "default")), + ((("postgresql://localhost:5433",), {}), ("localhost", "5433", "default")), + ((("postgresql://localhost/mydb",), {}), ("localhost", "5432", "mydb")), + ((("postgresql://user@localhost",), {}), ("localhost", "5432", "default")), + ((("postgresql://user:secret@localhost",), {}), ("localhost", "5432", "default")), + ((("postgresql://[2001:db8::1234]/database",), {}), ("2001:db8::1234", "5432", "database")), + ((("postgresql://[2001:db8::1234]:2222/database",), {}), ("2001:db8::1234", "2222", "database")), + ( + (("postgresql:///dbname?host=/var/lib/postgresql",), {}), + ("localhost", "/var/lib/postgresql/.s.PGSQL.5432", "dbname"), + ), + ( + (("postgresql://%2Fvar%2Flib%2Fpostgresql/dbname",), {}), + ("localhost", "/var/lib/postgresql/.s.PGSQL.5432", "dbname"), + ), + ((("postgresql://other@localhost/otherdb?c=10&a=myapp",), {}), ("localhost", "5432", "otherdb")), + ((("postgresql:///",), {}), ("localhost", "default", "default")), + ((("postgresql:///dbname?host=foo",), {}), ("foo", "5432", "dbname")), + ((("postgresql:///dbname?port=1234",), {}), ("localhost", "default", "dbname")), + ((("postgresql:///dbname?host=foo&port=1234",), {}), ("foo", "1234", "dbname")), + ((("postgres:///dbname?host=foo&port=1234",), {}), ("foo", "1234", "dbname")), + ((("postgres://localhost:5444/blah?host=::1",), {}), ("::1", "5444", "blah")), + ((("postgresql:///dbname?host=foo&port=1234&host=bar",), {}), ("bar", "1234", "dbname")), + ((("postgresql://%2Ftmp:1234",), {}), ("localhost", "/tmp/.s.PGSQL.1234", "default")), + ((("postgresql:///foo?dbname=bar",), {}), ("localhost", "default", "bar")), + ((("postgresql://example.com/foo?hostaddr=1.2.3.4&host=bar",), {}), ("1.2.3.4", "5432", "foo")), + ], +) +def test_uri(connect_params, expected): + output = instance_info(*connect_params) + assert output == expected + + +@pytest.mark.parametrize( + "connect_params,expected", + [ + ((("postgresql://user:password@/?dbname=bar",), {}), ("localhost", "default", "bar")), + ((("postgresql://user:pass@host/?dbname=bar",), {}), ("host", "5432", "bar")), + ((("postgresql://user:password@@/?dbname=bar",), {}), ("localhost", "default", "bar")), + ((("postgresql://@",), {}), ("localhost", "default", "default")), + ((("postgresql://@@localhost",), {}), ("localhost", "5432", "default")), + ], +) +def test_security_sensitive_uri(connect_params, expected): + output = instance_info(*connect_params) + assert output == expected + + +def test_bad_uri(): + connect_params = (("blah:///foo",), {}) + output = instance_info(*connect_params) + assert output == ("unknown", "unknown", "unknown") + + +_test_add_defaults = [ + # TCP/IP + [("otherhost.com", None, "8888", "foobar"), ("otherhost.com", "8888", "foobar")], + [("otherhost.com", None, None, "foobar"), ("otherhost.com", "5432", "foobar")], + [("localhost", None, "8888", "foobar"), ("localhost", "8888", "foobar")], + [("localhost", None, None, "foobar"), ("localhost", "5432", "foobar")], + [("127.0.0.1", None, "8888", "foobar"), ("127.0.0.1", "8888", "foobar")], + [("127.0.0.1", None, None, "foobar"), ("127.0.0.1", "5432", "foobar")], + [("::1", None, "8888", None), ("::1", "8888", "default")], + [("::1", None, None, None), ("::1", "5432", "default")], + [("::1", None, None, ""), ("::1", "5432", "default")], + # Unix Domain Socket + [(None, None, None, None), ("localhost", "default", "default")], + [(None, None, "5432", None), ("localhost", "default", "default")], + [(None, None, "8888", None), ("localhost", "default", "default")], + [("/tmp", None, None, "cat"), ("localhost", "/tmp/.s.PGSQL.5432", "cat")], + [("/tmp", None, "5432", "dog"), ("localhost", "/tmp/.s.PGSQL.5432", "dog")], + [("/tmp", None, "8888", "db"), ("localhost", "/tmp/.s.PGSQL.8888", "db")], +] + + +@pytest.mark.parametrize("host_port,expected", _test_add_defaults) +def test_add_defaults(host_port, expected): + actual = _add_defaults(*host_port) + assert actual == expected diff --git a/tests/datastore_psycopg/test_explain_plans.py b/tests/datastore_psycopg/test_explain_plans.py new file mode 100644 index 000000000..e02a04693 --- /dev/null +++ b/tests/datastore_psycopg/test_explain_plans.py @@ -0,0 +1,201 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + +import psycopg +import pytest +from conftest import DB_SETTINGS, maybe_await +from testing_support.fixtures import override_application_settings +from testing_support.validators.validate_database_node import validate_database_node +from testing_support.validators.validate_transaction_slow_sql_count import ( + validate_transaction_slow_sql_count, +) + +from newrelic.api.background_task import background_task +from newrelic.core.database_utils import SQLConnections + + +class CustomCursor(psycopg.Cursor): + event = threading.Event() + + def execute(self, *args, **kwargs): + self.event.set() + return super().execute(*args, **kwargs) + + +class CustomAsyncCursor(psycopg.AsyncCursor): + event = threading.Event() + + async def execute(self, *args, **kwargs): + self.event.set() + return await super().execute(*args, **kwargs) + + +class CustomConnection(psycopg.Connection): + event = threading.Event() + + def cursor(self, *args, **kwargs): + self.event.set() + return super().cursor(*args, **kwargs) + + +class CustomAsyncConnection(psycopg.AsyncConnection): + event = threading.Event() + + def cursor(self, *args, **kwargs): + self.event.set() + return super().cursor(*args, **kwargs) + + +def reset_events(): + # Reset all event flags + CustomCursor.event.clear() + CustomAsyncCursor.event.clear() + CustomConnection.event.clear() + CustomAsyncConnection.event.clear() + + +async def _exercise_db(connection, cursor_kwargs=None): + cursor_kwargs = cursor_kwargs or {} + + try: + cursor = connection.cursor(**cursor_kwargs) + + await maybe_await(cursor.execute("SELECT setting from pg_settings where name=%s", ("server_version",))) + finally: + await maybe_await(connection.close()) + + +# Tests + + +def explain_plan_is_not_none(node): + with SQLConnections() as connections: + explain_plan = node.explain_plan(connections) + + assert explain_plan is not None + + +SCROLLABLE = (True, False) +WITHHOLD = (True, False) + + +@pytest.mark.parametrize("withhold", WITHHOLD) +@pytest.mark.parametrize("scrollable", SCROLLABLE) +@override_application_settings( + { + "transaction_tracer.explain_threshold": 0.0, + "transaction_tracer.record_sql": "raw", + } +) +@validate_database_node(explain_plan_is_not_none) +@validate_transaction_slow_sql_count(1) +@background_task(name="test_explain_plan_unnamed_cursors") +def test_explain_plan_unnamed_cursors(loop, connection, withhold, scrollable): + cursor_kwargs = {} + + if withhold: + cursor_kwargs["withhold"] = withhold + + if scrollable: + cursor_kwargs["scrollable"] = scrollable + + loop.run_until_complete(_exercise_db(connection, cursor_kwargs=cursor_kwargs)) + + +@pytest.mark.parametrize("withhold", WITHHOLD) +@pytest.mark.parametrize("scrollable", SCROLLABLE) +@override_application_settings( + { + "transaction_tracer.explain_threshold": 0.0, + "transaction_tracer.record_sql": "raw", + } +) +@validate_database_node(explain_plan_is_not_none) +@validate_transaction_slow_sql_count(1) +@background_task(name="test_explain_plan_named_cursors") +def test_explain_plan_named_cursors(loop, connection, withhold, scrollable): + cursor_kwargs = { + "name": "test_explain_plan_named_cursors", + } + + if withhold: + cursor_kwargs["withhold"] = withhold + + if scrollable: + cursor_kwargs["scrollable"] = scrollable + + loop.run_until_complete(_exercise_db(connection, cursor_kwargs=cursor_kwargs)) + + +# This test validates that any combination of sync or async, and default or custom connection and cursor classes will work with +# the explain plan feature. The agent should always use psycopg.connect to open a new explain plan connection and only +# use custom cursors from synchronous connections, as async cursors will not be compatible. +@pytest.mark.parametrize( + "connection_cls,cursor_cls", + [ + (psycopg.Connection, psycopg.Cursor), + (psycopg.Connection, CustomCursor), + (CustomConnection, psycopg.Cursor), + (CustomConnection, CustomCursor), + (psycopg.AsyncConnection, psycopg.AsyncCursor), + (psycopg.AsyncConnection, CustomAsyncCursor), + (CustomAsyncConnection, psycopg.AsyncCursor), + (CustomAsyncConnection, CustomAsyncCursor), + ], +) +@override_application_settings( + { + "transaction_tracer.explain_threshold": 0.0, + "transaction_tracer.record_sql": "raw", + } +) +def test_explain_plan_on_custom_classes(loop, connection_cls, cursor_cls): + @validate_database_node(explain_plan_is_not_none) + @validate_transaction_slow_sql_count(1) + @background_task(name="test_explain_plan_on_custom_connect_class") + def test(): + async def coro(): + # Connect using custom Connection classes, so connect here without the fixture. + connection = await maybe_await( + connection_cls.connect( + dbname=DB_SETTINGS["name"], + user=DB_SETTINGS["user"], + password=DB_SETTINGS["password"], + host=DB_SETTINGS["host"], + port=DB_SETTINGS["port"], + cursor_factory=cursor_cls, + ) + ) + await _exercise_db(connection) + reset_events() + + loop.run_until_complete(coro()) + + test() + + # Check that the correct classes were used AFTER the explain plan validator has run + if hasattr(connection_cls, "event"): + assert not connection_cls.event.is_set(), "Custom connection class should not be used." + if hasattr(cursor_cls, "event"): + if cursor_cls is not CustomAsyncCursor: + assert cursor_cls.event.is_set(), "Custom cursor class was not used." + else: + assert not cursor_cls.event.is_set(), "Custom async cursor class should not be used." + + +# This test will verify that arguments are preserved for an explain +# plan by forcing a failure to be generated when explain plans are created and +# arguments are preserved diff --git a/tests/datastore_psycopg/test_forward_compat.py b/tests/datastore_psycopg/test_forward_compat.py new file mode 100644 index 000000000..0f5ead853 --- /dev/null +++ b/tests/datastore_psycopg/test_forward_compat.py @@ -0,0 +1,41 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import psycopg + +from newrelic.common.object_wrapper import wrap_function_wrapper +from newrelic.hooks.database_psycopg import wrapper_psycopg_as_string + + +class TestCompatability(object): + def as_string(self, giraffe, lion, tiger=None): + assert type(giraffe) in (psycopg.Cursor, psycopg.AsyncCursor) + return "PASS" + + +wrap_function_wrapper(__name__, "TestCompatability.as_string", wrapper_psycopg_as_string) + + +def test_forward_compat_args(connection): + cursor = connection.cursor() + query = TestCompatability() + result = query.as_string(cursor, "giraffe-nomming-leaves") + assert result == "PASS" + + +def test_forward_compat_kwargs(connection): + cursor = connection.cursor() + query = TestCompatability() + result = query.as_string(cursor, lion="eats tiger", tiger="eats giraffe") + assert result == "PASS" diff --git a/tests/datastore_psycopg/test_multiple_dbs.py b/tests/datastore_psycopg/test_multiple_dbs.py new file mode 100644 index 000000000..ec69a2b9e --- /dev/null +++ b/tests/datastore_psycopg/test_multiple_dbs.py @@ -0,0 +1,138 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from conftest import DB_MULTIPLE_SETTINGS, DB_SETTINGS, maybe_await +from testing_support.fixtures import override_application_settings +from testing_support.util import instance_hostname +from testing_support.validators.validate_database_trace_inputs import ( + validate_database_trace_inputs, +) +from testing_support.validators.validate_transaction_metrics import ( + validate_transaction_metrics, +) + +from newrelic.api.background_task import background_task + +# Settings + +_enable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": True, +} +_disable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": False, +} + + +# Metrics + +_base_scoped_metrics = [ + ("Datastore/statement/Postgres/pg_settings/select", 1), + ("Datastore/operation/Postgres/drop", 1), + ("Datastore/operation/Postgres/create", 1), + ("Datastore/operation/Postgres/commit", 2), +] + +_base_rollup_metrics = [ + ("Datastore/all", 5), + ("Datastore/allOther", 5), + ("Datastore/Postgres/all", 5), + ("Datastore/Postgres/allOther", 5), + ("Datastore/statement/Postgres/pg_settings/select", 1), + ("Datastore/operation/Postgres/drop", 1), + ("Datastore/operation/Postgres/create", 1), + ("Datastore/operation/Postgres/commit", 2), +] + +_enable_scoped_metrics = list(_base_scoped_metrics) +_enable_rollup_metrics = list(_base_rollup_metrics) + +_disable_scoped_metrics = list(_base_scoped_metrics) +_disable_rollup_metrics = list(_base_rollup_metrics) + +_postgresql_1 = DB_MULTIPLE_SETTINGS[0] +_host_1 = instance_hostname(_postgresql_1["host"]) +_port_1 = _postgresql_1["port"] + +_postgresql_2 = DB_MULTIPLE_SETTINGS[1] +_host_2 = instance_hostname(_postgresql_2["host"]) +_port_2 = _postgresql_2["port"] + +_instance_metric_name_1 = "Datastore/instance/Postgres/%s/%s" % (_host_1, _port_1) +_instance_metric_name_2 = "Datastore/instance/Postgres/%s/%s" % (_host_2, _port_2) + +_enable_rollup_metrics.extend( + [ + (_instance_metric_name_1, 2), + (_instance_metric_name_2, 3), + ] +) +_disable_rollup_metrics.extend( + [ + (_instance_metric_name_1, None), + (_instance_metric_name_2, None), + ] +) + + +# Query + + +async def _exercise_db(multiple_connections): + connection = multiple_connections[0] + try: + cursor = connection.cursor() + await maybe_await(cursor.execute("SELECT setting from pg_settings where name=%s", ("server_version",))) + await maybe_await(connection.commit()) + finally: + await maybe_await(connection.close()) + + connection = multiple_connections[1] + try: + cursor = connection.cursor() + await maybe_await(cursor.execute("drop table if exists %s" % DB_SETTINGS["table_name"])) + await maybe_await( + cursor.execute("create table %s " % DB_SETTINGS["table_name"] + "(a integer, b real, c text)") + ) + await maybe_await(connection.commit()) + finally: + await maybe_await(connection.close()) + + +# Tests + + +@override_application_settings(_enable_instance_settings) +@validate_transaction_metrics( + "test_multiple_dbs:test_multiple_databases_enable_instance", + scoped_metrics=_enable_scoped_metrics, + rollup_metrics=_enable_rollup_metrics, + background_task=True, +) +@validate_database_trace_inputs(sql_parameters_type=tuple) +@background_task() +def test_multiple_databases_enable_instance(loop, multiple_connections): + loop.run_until_complete(_exercise_db(multiple_connections)) + + +@override_application_settings(_disable_instance_settings) +@validate_transaction_metrics( + "test_multiple_dbs:test_multiple_databases_disable_instance", + scoped_metrics=_disable_scoped_metrics, + rollup_metrics=_disable_scoped_metrics, + background_task=True, +) +@validate_database_trace_inputs(sql_parameters_type=tuple) +@background_task() +def test_multiple_databases_disable_instance(loop, multiple_connections): + loop.run_until_complete(_exercise_db(multiple_connections)) diff --git a/tests/datastore_psycopg/test_obfuscation.py b/tests/datastore_psycopg/test_obfuscation.py new file mode 100644 index 000000000..d764bc251 --- /dev/null +++ b/tests/datastore_psycopg/test_obfuscation.py @@ -0,0 +1,139 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from conftest import DB_SETTINGS, maybe_await +from testing_support.validators.validate_database_node import validate_database_node +from testing_support.validators.validate_sql_obfuscation import validate_sql_obfuscation + +from newrelic.api.background_task import background_task +from newrelic.core.database_utils import SQLConnections + + +@pytest.fixture() +def cursor(loop, connection): + try: + cursor = connection.cursor() + + loop.run_until_complete(maybe_await(cursor.execute("drop table if exists %s" % DB_SETTINGS["table_name"]))) + loop.run_until_complete( + maybe_await(cursor.execute("create table %s (b text, c text)" % DB_SETTINGS["table_name"])) + ) + + yield cursor + + finally: + loop.run_until_complete(maybe_await(connection.close())) + + +_quoting_style_tests = [ + ( + "SELECT * FROM %s WHERE b='2'" % DB_SETTINGS["table_name"], + "SELECT * FROM %s WHERE b=?" % DB_SETTINGS["table_name"], + ), + ( + "SELECT * FROM %s WHERE b=$func$2$func$" % DB_SETTINGS["table_name"], + "SELECT * FROM %s WHERE b=?" % DB_SETTINGS["table_name"], + ), + ( + "SELECT * FROM %s WHERE b=U&'2'" % DB_SETTINGS["table_name"], + "SELECT * FROM %s WHERE b=U&?" % DB_SETTINGS["table_name"], + ), +] + + +@pytest.mark.parametrize("sql,obfuscated", _quoting_style_tests) +def test_obfuscation_quoting_styles(loop, cursor, sql, obfuscated): + @validate_sql_obfuscation([obfuscated]) + @background_task() + def test(): + loop.run_until_complete(maybe_await(cursor.execute(sql))) + + test() + + +_parameter_tests = [ + ( + "SELECT * FROM " + DB_SETTINGS["table_name"] + " where b=%s", + "SELECT * FROM " + DB_SETTINGS["table_name"] + " where b=%s", + ), +] + + +@pytest.mark.parametrize("sql,obfuscated", _parameter_tests) +def test_obfuscation_parameters(loop, cursor, sql, obfuscated): + @validate_sql_obfuscation([obfuscated]) + @background_task() + def test(): + loop.run_until_complete(maybe_await(cursor.execute(sql, ("hello",)))) + + test() + + +def no_explain_plan(node): + sql_connections = SQLConnections() + explain_plan = node.explain_plan(sql_connections) + assert explain_plan is None + + +def any_length_explain_plan(node): + if node.statement.operation != "select": + return + + sql_connections = SQLConnections() + explain_plan = node.explain_plan(sql_connections) + assert explain_plan and len(explain_plan) > 0 + + +_test_explain_plans = [ + ( + "SELECT (b, c) FROM %s ; SELECT (b, c) FROM %s" % (DB_SETTINGS["table_name"], DB_SETTINGS["table_name"]), + no_explain_plan, + ), + ( + "SELECT (b, c) FROM %s ; SELECT (b, c) FROM %s;" % (DB_SETTINGS["table_name"], DB_SETTINGS["table_name"]), + no_explain_plan, + ), + ("SELECT (b, c) FROM %s WHERE b=';'" % DB_SETTINGS["table_name"], no_explain_plan), + (";SELECT (b, c) FROM %s" % DB_SETTINGS["table_name"], no_explain_plan), + ("SELECT (b, c) FROM %s" % DB_SETTINGS["table_name"], any_length_explain_plan), + ("SELECT (b, c) FROM %s;" % DB_SETTINGS["table_name"], any_length_explain_plan), + ( + "SELECT (b, c) FROM %s;;;;;;" % DB_SETTINGS["table_name"], + any_length_explain_plan, + ), + ( + "SELECT (b, c) FROM %s;\n\n" % DB_SETTINGS["table_name"], + any_length_explain_plan, + ), +] + + +@pytest.mark.parametrize("sql,validator", _test_explain_plans) +def test_obfuscation_explain_plans(loop, connection, sql, validator): + @validate_database_node(validator) + @background_task() + async def test(): + try: + cursor = connection.cursor() + await maybe_await(cursor.execute("drop table if exists %s" % DB_SETTINGS["table_name"])) + await maybe_await(cursor.execute("create table %s (b text, c text)" % DB_SETTINGS["table_name"])) + + await maybe_await(cursor.execute(sql)) + + finally: + await maybe_await(connection.commit()) + await maybe_await(connection.close()) + + loop.run_until_complete(test()) diff --git a/tests/datastore_psycopg/test_register.py b/tests/datastore_psycopg/test_register.py new file mode 100644 index 000000000..575ffac7b --- /dev/null +++ b/tests/datastore_psycopg/test_register.py @@ -0,0 +1,85 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import psycopg +from conftest import maybe_await +from testing_support.validators.validate_transaction_errors import ( + validate_transaction_errors, +) +from testing_support.validators.validate_transaction_metrics import ( + validate_transaction_metrics, +) + +from newrelic.api.background_task import background_task + + +@validate_transaction_metrics("test_register:test_register_json", background_task=True) +@validate_transaction_errors(errors=[]) +@background_task() +def test_register_json(loop, connection): + def test(): + cursor = connection.cursor() + + psycopg.types.json.set_json_loads(loads=lambda x: x, context=connection) + psycopg.types.json.set_json_loads(loads=lambda x: x, context=cursor) + + if hasattr(connection, "__aenter__"): + + async def coro(): + async with connection: + test() + + loop.run_until_complete(coro()) + else: + with connection: + test() + + +@validate_transaction_metrics("test_register:test_register_range", background_task=True) +@validate_transaction_errors(errors=[]) +@background_task() +def test_register_range(loop, connection): + async def test(): + type_name = "floatrange_" + str(os.getpid()) + + create_sql = "CREATE TYPE %s AS RANGE (" % type_name + "subtype = float8," "subtype_diff = float8mi)" + + cursor = connection.cursor() + + await maybe_await(cursor.execute("DROP TYPE if exists %s" % type_name)) + await maybe_await(cursor.execute(create_sql)) + + range_type_info = await maybe_await(psycopg.types.range.RangeInfo.fetch(connection, type_name)) + range_type_info.register(connection) + + await maybe_await(cursor.execute("DROP TYPE if exists %s" % type_name)) + await maybe_await(cursor.execute(create_sql)) + + range_type_info = await maybe_await(psycopg.types.range.RangeInfo.fetch(connection, type_name)) + range_type_info.register(cursor) + + await maybe_await(cursor.execute("DROP TYPE if exists %s" % type_name)) + + if hasattr(connection, "__aenter__"): + + async def coro(): + async with connection: + await test() + + loop.run_until_complete(coro()) + else: + with connection: + loop.run_until_complete(test()) diff --git a/tests/datastore_psycopg/test_rollback.py b/tests/datastore_psycopg/test_rollback.py new file mode 100644 index 000000000..5befa7962 --- /dev/null +++ b/tests/datastore_psycopg/test_rollback.py @@ -0,0 +1,104 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from conftest import DB_SETTINGS +from testing_support.fixtures import override_application_settings +from testing_support.util import instance_hostname +from testing_support.validators.validate_database_trace_inputs import ( + validate_database_trace_inputs, +) +from testing_support.validators.validate_transaction_metrics import ( + validate_transaction_metrics, +) + +from newrelic.api.background_task import background_task + +# Settings + +_enable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": True, +} +_disable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": False, +} + +# Metrics + +_base_scoped_metrics = (("Datastore/operation/Postgres/rollback", 1),) + +_base_rollup_metrics = ( + ("Datastore/all", 1), + ("Datastore/allOther", 1), + ("Datastore/Postgres/all", 1), + ("Datastore/Postgres/allOther", 1), + ("Datastore/operation/Postgres/rollback", 1), +) + +_enable_scoped_metrics = list(_base_scoped_metrics) +_enable_rollup_metrics = list(_base_rollup_metrics) + +_disable_scoped_metrics = list(_base_scoped_metrics) +_disable_rollup_metrics = list(_base_rollup_metrics) + +_host = instance_hostname(DB_SETTINGS["host"]) +_port = DB_SETTINGS["port"] + +_instance_metric_name = "Datastore/instance/Postgres/%s/%s" % (_host, _port) + +_enable_rollup_metrics.append((_instance_metric_name, 1)) + +_disable_rollup_metrics.append((_instance_metric_name, None)) + +# Query + + +async def _exercise_db(connection): + try: + if hasattr(connection, "__aenter__"): + async with connection: + raise RuntimeError("error") + else: + with connection: + raise RuntimeError("error") + except RuntimeError: + pass + + +# Tests + + +@override_application_settings(_enable_instance_settings) +@validate_transaction_metrics( + "test_rollback:test_rollback_on_exception_enable_instance", + scoped_metrics=_enable_scoped_metrics, + rollup_metrics=_enable_rollup_metrics, + background_task=True, +) +@validate_database_trace_inputs(sql_parameters_type=tuple) +@background_task() +def test_rollback_on_exception_enable_instance(loop, connection): + loop.run_until_complete(_exercise_db(connection)) + + +@override_application_settings(_disable_instance_settings) +@validate_transaction_metrics( + "test_rollback:test_rollback_on_exception_disable_instance", + scoped_metrics=_disable_scoped_metrics, + rollup_metrics=_disable_rollup_metrics, + background_task=True, +) +@validate_database_trace_inputs(sql_parameters_type=tuple) +@background_task() +def test_rollback_on_exception_disable_instance(loop, connection): + loop.run_until_complete(_exercise_db(connection)) diff --git a/tests/datastore_psycopg/test_slow_sql.py b/tests/datastore_psycopg/test_slow_sql.py new file mode 100644 index 000000000..abd2c31cc --- /dev/null +++ b/tests/datastore_psycopg/test_slow_sql.py @@ -0,0 +1,135 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from conftest import maybe_await +from testing_support.fixtures import override_application_settings +from testing_support.validators.validate_slow_sql_collector_json import ( + validate_slow_sql_collector_json, +) + +from newrelic.api.background_task import background_task +from newrelic.api.transaction import current_transaction + +# Settings + +_enable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": True, + "datastore_tracer.database_name_reporting.enabled": True, +} +_disable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": False, + "datastore_tracer.database_name_reporting.enabled": False, +} + +# Expected parameters + +_enabled_required = set(["host", "port_path_or_id", "database_name"]) +_enabled_forgone = set() + +_disabled_required = set() +_disabled_forgone = set(["host", "port_path_or_id", "database_name"]) + +# Guid is always required, regardless of DT status. +# It should be excluded from the forgone params set. +_distributed_tracing_required_params = set(["guid", "traceId", "priority", "sampled"]) +_distributed_tracing_forgone_params = set(["traceId", "priority", "sampled"]) +_distributed_tracing_payload_received_params = set( + ["parent.type", "parent.app", "parent.account", "parent.transportType", "parent.transportDuration"] +) + +_transaction_guid = "1234567890" +_distributed_tracing_exact_params = {"guid": _transaction_guid} + + +# Query + + +async def _exercise_db(connection): + try: + cursor = connection.cursor() + await maybe_await(cursor.execute("SELECT setting from pg_settings where name=%s", ("server_version",))) + finally: + await maybe_await(connection.close()) + + +# Tests + + +@pytest.mark.parametrize("instance_enabled", (True, False)) +@pytest.mark.parametrize( + "distributed_tracing_enabled,payload_received", + [ + (True, True), + (True, False), + (False, False), + ], +) +def test_slow_sql_json(loop, connection, instance_enabled, distributed_tracing_enabled, payload_received): + + exact_params = None + + if instance_enabled: + settings = _enable_instance_settings.copy() + required_params = set(_enabled_required) + forgone_params = set(_enabled_forgone) + else: + settings = _disable_instance_settings.copy() + required_params = set(_disabled_required) + forgone_params = set(_disabled_forgone) + + if distributed_tracing_enabled: + required_params.update(_distributed_tracing_required_params) + exact_params = _distributed_tracing_exact_params + settings["distributed_tracing.enabled"] = True + if payload_received: + required_params.update(_distributed_tracing_payload_received_params) + else: + forgone_params.update(_distributed_tracing_payload_received_params) + else: + forgone_params.update(_distributed_tracing_forgone_params) + forgone_params.update(_distributed_tracing_payload_received_params) + settings["distributed_tracing.enabled"] = False + + @override_application_settings(settings) + @validate_slow_sql_collector_json( + required_params=required_params, forgone_params=forgone_params, exact_params=exact_params + ) + @background_task() + def _test(): + transaction = current_transaction() + transaction.guid = _transaction_guid + + loop.run_until_complete(_exercise_db(connection)) + + if payload_received: + + payload = { + "v": [0, 1], + "d": { + "ty": "Mobile", + "ac": transaction.settings.account_id, + "tk": transaction.settings.trusted_account_key, + "ap": "2827902", + "pa": "5e5733a911cfbc73", + "id": "7d3efb1b173fecfa", + "tr": "d6b4ba0c3a712ca", + "ti": 1518469636035, + "tx": "8703ff3d88eefe9d", + }, + } + + transaction.accept_distributed_trace_payload(payload) + + _test() diff --git a/tests/datastore_psycopg/test_span_event.py b/tests/datastore_psycopg/test_span_event.py new file mode 100644 index 000000000..622cf706d --- /dev/null +++ b/tests/datastore_psycopg/test_span_event.py @@ -0,0 +1,136 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from conftest import DB_SETTINGS, maybe_await +from testing_support.fixtures import override_application_settings +from testing_support.util import instance_hostname +from testing_support.validators.validate_span_events import validate_span_events + +from newrelic.api.background_task import background_task +from newrelic.api.transaction import current_transaction + +# Settings + +_enable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": True, + "datastore_tracer.database_name_reporting.enabled": True, + "distributed_tracing.enabled": True, + "span_events.enabled": True, +} +_disable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": False, + "datastore_tracer.database_name_reporting.enabled": False, + "distributed_tracing.enabled": True, + "span_events.enabled": True, +} + + +async def _exercise_db(connection): + try: + cursor = connection.cursor() + await maybe_await(cursor.execute("SELECT setting from pg_settings where name=%s", ("server_version",))) + + # No target + await maybe_await(cursor.execute("SELECT 1")) + finally: + await maybe_await(connection.close()) + + +# Tests + + +@pytest.mark.parametrize("db_instance_enabled", (True, False)) +@pytest.mark.parametrize("instance_enabled", (True, False)) +def test_span_events(loop, connection, instance_enabled, db_instance_enabled): + guid = "dbb533c53b749e0b" + priority = 0.5 + + common_intrinsics = { + "type": "Span", + "transactionId": guid, + "priority": priority, + "sampled": True, + "category": "datastore", + "component": "Postgres", + "span.kind": "client", + } + + exact_agents = {} + + if instance_enabled: + settings = _enable_instance_settings.copy() + hostname = instance_hostname(DB_SETTINGS["host"]) + exact_agents.update( + { + "peer.address": "%s:%s" % (hostname, DB_SETTINGS["port"]), + "peer.hostname": hostname, + } + ) + else: + settings = _disable_instance_settings.copy() + exact_agents.update( + { + "peer.address": "Unknown:Unknown", + "peer.hostname": "Unknown", + } + ) + + if db_instance_enabled and instance_enabled: + exact_agents.update( + { + "db.instance": DB_SETTINGS["name"], + } + ) + unexpected_agents = () + else: + settings["attributes.exclude"] = ["db.instance"] + unexpected_agents = ("db.instance",) + + query_1_intrinsics = common_intrinsics.copy() + query_1_intrinsics["name"] = "Datastore/statement/Postgres/pg_settings/select" + + query_1_agents = exact_agents.copy() + query_1_agents["db.statement"] = "SELECT setting from pg_settings where name=%s" + + query_2_intrinsics = common_intrinsics.copy() + query_2_intrinsics["name"] = "Datastore/operation/Postgres/select" + + query_2_agents = exact_agents.copy() + query_2_agents["db.statement"] = "SELECT ?" + + @validate_span_events( + count=1, + exact_intrinsics=query_1_intrinsics, + unexpected_intrinsics=("db.instance", "db.statement"), + exact_agents=query_1_agents, + unexpected_agents=unexpected_agents, + ) + @validate_span_events( + count=1, + exact_intrinsics=query_2_intrinsics, + unexpected_intrinsics=("db.instance", "db.statement"), + exact_agents=query_2_agents, + unexpected_agents=unexpected_agents, + ) + @override_application_settings(settings) + @background_task(name="span_events") + def _test(): + txn = current_transaction() + txn.guid = guid + txn._priority = priority + txn._sampled = True + loop.run_until_complete(_exercise_db(connection)) + + _test() diff --git a/tests/datastore_psycopg/test_trace_node.py b/tests/datastore_psycopg/test_trace_node.py new file mode 100644 index 000000000..584e27bbf --- /dev/null +++ b/tests/datastore_psycopg/test_trace_node.py @@ -0,0 +1,104 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import psycopg +from conftest import DB_SETTINGS, maybe_await +from testing_support.fixtures import ( + override_application_settings, + validate_tt_parenting, +) +from testing_support.util import instance_hostname +from testing_support.validators.validate_tt_collector_json import ( + validate_tt_collector_json, +) + +from newrelic.api.background_task import background_task + +# Settings + +_enable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": True, + "datastore_tracer.database_name_reporting.enabled": True, +} +_disable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": False, + "datastore_tracer.database_name_reporting.enabled": False, +} + +# Expected parameters + +_enabled_required = { + "host": instance_hostname(DB_SETTINGS["host"]), + "port_path_or_id": str(DB_SETTINGS["port"]), + "db.instance": DB_SETTINGS["name"], +} +_enabled_forgone = {} + +_disabled_required = {} +_disabled_forgone = { + "host": "VALUE NOT USED", + "port_path_or_id": "VALUE NOT USED", + "db.instance": "VALUE NOT USED", +} + +_tt_parenting = ( + "TransactionNode", + [ + ("FunctionNode", []), + ("DatabaseNode", []), + ], +) + + +# Query + + +async def _exercise_db(async_=False): + # Connect here without using fixture to assert on the parenting of the FunctionTrace for Connection.connect() + # This is only possible when the connection is done inside a transaction, so connect after the test starts. + connect = psycopg.Connection.connect if async_ else psycopg.AsyncConnection.connect + connection = await maybe_await( + connect( + dbname=DB_SETTINGS["name"], + user=DB_SETTINGS["user"], + password=DB_SETTINGS["password"], + host=DB_SETTINGS["host"], + port=DB_SETTINGS["port"], + ) + ) + + try: + cursor = connection.cursor() + await maybe_await(cursor.execute("SELECT setting from pg_settings where name=%s", ("server_version",))) + finally: + await maybe_await(connection.close()) + + +# Tests + + +@override_application_settings(_enable_instance_settings) +@validate_tt_collector_json(datastore_params=_enabled_required, datastore_forgone_params=_enabled_forgone) +@validate_tt_parenting(_tt_parenting) +@background_task() +def test_trace_node_datastore_params_enable_instance(loop, is_async): + loop.run_until_complete(_exercise_db(is_async)) + + +@override_application_settings(_disable_instance_settings) +@validate_tt_collector_json(datastore_params=_disabled_required, datastore_forgone_params=_disabled_forgone) +@validate_tt_parenting(_tt_parenting) +@background_task() +def test_trace_node_datastore_params_disable_instance(loop, is_async): + loop.run_until_complete(_exercise_db(is_async)) diff --git a/tests/testing_support/db_settings.py b/tests/testing_support/db_settings.py index f7bda3d7a..da4c0055f 100644 --- a/tests/testing_support/db_settings.py +++ b/tests/testing_support/db_settings.py @@ -31,6 +31,7 @@ def postgresql_settings(): host = "host.docker.internal" if "GITHUB_ACTIONS" in os.environ else "localhost" instances = 2 + identifier = str(os.getpid()) settings = [ { "user": "postgres", @@ -38,7 +39,8 @@ def postgresql_settings(): "name": "postgres", "host": host, "port": 8080 + instance_num, - "table_name": "postgres_table_" + str(os.getpid()), + "procedure_name": "postgres_procedure_" + identifier, + "table_name": "postgres_table_" + identifier, } for instance_num in range(instances) ] @@ -205,10 +207,7 @@ def firestore_settings(): host = "host.docker.internal" if "GITHUB_ACTIONS" in os.environ else "127.0.0.1" instances = 2 - settings = [ - {"host": host, "port": 8080 + instance_num} - for instance_num in range(instances) - ] + settings = [{"host": host, "port": 8080 + instance_num} for instance_num in range(instances)] return settings diff --git a/tests/testing_support/validators/validate_transaction_slow_sql_count.py b/tests/testing_support/validators/validate_transaction_slow_sql_count.py index 460201dab..3e68a1ad0 100644 --- a/tests/testing_support/validators/validate_transaction_slow_sql_count.py +++ b/tests/testing_support/validators/validate_transaction_slow_sql_count.py @@ -17,16 +17,14 @@ def validate_transaction_slow_sql_count(num_slow_sql): - @transient_function_wrapper( - "newrelic.core.stats_engine", "StatsEngine.record_transaction" - ) + @transient_function_wrapper("newrelic.core.stats_engine", "StatsEngine.record_transaction") def _validate_transaction_slow_sql_count(wrapped, instance, args, kwargs): result = wrapped(*args, **kwargs) connections = SQLConnections() with connections: slow_sql_traces = instance.slow_sql_data(connections) - assert len(slow_sql_traces) == num_slow_sql + assert len(slow_sql_traces) == num_slow_sql, "Expected: %s. Got: %d." % (num_slow_sql, len(slow_sql_traces)) return result diff --git a/tox.ini b/tox.ini index c34d94941..8e315f195 100644 --- a/tox.ini +++ b/tox.ini @@ -66,11 +66,13 @@ envlist = mysql-datastore_mysql-mysql080023-py27, mysql-datastore_mysql-mysqllatest-{py37,py38,py39,py310,py311,py312}, mysql-datastore_pymysql-{py27,py37,py38,py39,py310,py311,py312,pypy27,pypy310}, - postgres-datastore_asyncpg-{py37,py38,py39,py310,py311,py312}, - postgres-datastore_postgresql-{py37,py38,py39}, - postgres-datastore_psycopg2-{py27,py37,py38,py39,py310,py311,py312}-psycopg2latest, - postgres-datastore_psycopg2cffi-{py27,pypy27,py37,py38,py39,py310,py311,py312}-psycopg2cffilatest, - postgres-datastore_pyodbc-{py27,py37,py38,py39,py310,py311,py312}-pyodbclatest, + postgres16-datastore_asyncpg-{py37,py38,py39,py310,py311,py312}, + postgres16-datastore_psycopg-{py38,py39,py310,py311,py312,pypy310}-psycopglatest, + postgres16-datastore_psycopg-py312-psycopg_{purepython,binary,compiled}0301, + postgres16-datastore_psycopg2-{py27,py37,py38,py39,py310,py311,py312}-psycopg2latest, + postgres16-datastore_psycopg2cffi-{py27,pypy27,py37,py38,py39,py310,py311,py312}-psycopg2cffilatest, + postgres16-datastore_pyodbc-{py27,py37,py38,py39,py310,py311,py312}-pyodbclatest, + postgres9-datastore_postgresql-{py37,py38,py39}, python-adapter_asgiref-{py37,py38,py39,py310,py311,py312,pypy310}-asgireflatest, python-adapter_asgiref-py310-asgiref{0303,0304,0305,0306,0307}, python-adapter_cheroot-{py27,py37,py38,py39,py310,py311,py312}, @@ -94,9 +96,9 @@ envlist = python-application_celery-{py37,py38,py39,py310,py311,py312,pypy310}-celerylatest, python-application_celery-py311-celery{0503,0502,0501}, python-component_djangorestframework-{py37,py38,py39,py310,py311,py312}-djangorestframeworklatest, - python-component_flask_rest-py37-flaskrestx110, python-component_flask_rest-{py38,py39,py310,py311,py312,pypy310}-flaskrestxlatest, python-component_flask_rest-{py27,pypy27}-flaskrestx051, + python-component_flask_rest-py37-flaskrestx110, python-component_graphqlserver-{py37,py38,py39,py310,py311,py312}, python-component_tastypie-{py37,py38,py39,py310,py311,py312,pypy310}-tastypielatest, python-coroutines_asyncio-{py37,py38,py39,py310,py311,py312,pypy310}, @@ -147,16 +149,16 @@ envlist = python-logger_loguru-{py37,py38,py39,py310,py311,py312,pypy310}-logurulatest, python-logger_loguru-py39-loguru{06,05}, python-logger_structlog-{py37,py38,py39,py310,py311,py312,pypy310}-structloglatest, + ; langchain dependency faiss-cpu isn't compatible with 3.12 yet. + python-mlmodel_langchain-{py38,py39,py310,py311}, python-mlmodel_openai-openai0-{py37,py38,py39,py310,py311,py312}, python-mlmodel_openai-openai107-py312, python-mlmodel_openai-openailatest-{py37,py38,py39,py310,py311,py312}, - ; langchain dependency faiss-cpu isn't compatible with 3.12 yet. - python-mlmodel_langchain-{py38,py39,py310,py311}, python-mlmodel_sklearn-{py37}-scikitlearn0101, python-mlmodel_sklearn-{py38,py39,py310,py311,py312}-scikitlearnlatest, python-template_genshi-{py27,py37,py38,py39,py310,py311,py312}-genshilatest, - python-template_jinja2-py37-jinja2030103, python-template_jinja2-{py38,py39,py310,py311,py312}-jinja2latest, + python-template_jinja2-py37-jinja2030103, python-template_mako-{py27,py37,py38,py39,py310,py311,py312}, rabbitmq-messagebroker_pika-{py37,py38,py39,py310,py311,py312,pypy310}-pikalatest, redis-datastore_redis-{py37,py38,py39,py310,py311,py312,pypy310}-redis{0400,latest}, @@ -166,7 +168,7 @@ envlist = [testenv] deps = # Base Dependencies - {py38,py39,py310,py311,py312,pypy310}: pytest==8.1.1 + {py38,py39,py310,py311,py312,pypy310}: pytest==8.2.1 py37: pytest==7.4.4 {py27,pypy27}: pytest==4.6.11 iniconfig @@ -250,7 +252,11 @@ deps = datastore_mysql-mysqllatest: mysql-connector-python datastore_mysql-mysql080023: mysql-connector-python<8.0.24 datastore_mysql: protobuf<4 - datastore_postgresql: py-postgresql<1.3 + datastore_postgresql: py-postgresql + datastore_psycopg-psycopglatest: psycopg[binary]>=3 + datastore_psycopg-psycopg_purepython0301: psycopg<3.2 + datastore_psycopg-psycopg_binary0301: psycopg[binary]<3.2 + datastore_psycopg-psycopg_compiled0301: psycopg[c]<3.2 datastore_psycopg2-psycopg2latest: psycopg2-binary datastore_psycopg2cffi-psycopg2cffilatest: psycopg2cffi datastore_pyodbc-pyodbclatest: pyodbc @@ -441,7 +447,6 @@ changedir = agent_streaming: tests/agent_streaming agent_unittests: tests/agent_unittests application_celery: tests/application_celery - mlmodel_sklearn: tests/mlmodel_sklearn component_djangorestframework: tests/component_djangorestframework component_flask_rest: tests/component_flask_rest component_graphqlserver: tests/component_graphqlserver @@ -455,14 +460,15 @@ changedir = datastore_memcache: tests/datastore_memcache datastore_mysql: tests/datastore_mysql datastore_postgresql: tests/datastore_postgresql + datastore_psycopg: tests/datastore_psycopg datastore_psycopg2: tests/datastore_psycopg2 - datastore_pyodbc: tests/datastore_pyodbc datastore_psycopg2cffi: tests/datastore_psycopg2cffi datastore_pylibmc: tests/datastore_pylibmc datastore_pymemcache: tests/datastore_pymemcache datastore_pymongo: tests/datastore_pymongo datastore_pymssql: tests/datastore_pymssql datastore_pymysql: tests/datastore_pymysql + datastore_pyodbc: tests/datastore_pyodbc datastore_pysolr: tests/datastore_pysolr datastore_redis: tests/datastore_redis datastore_rediscluster: tests/datastore_rediscluster @@ -492,14 +498,15 @@ changedir = framework_starlette: tests/framework_starlette framework_strawberry: tests/framework_strawberry framework_tornado: tests/framework_tornado - mlmodel_openai: tests/mlmodel_openai - mlmodel_langchain: tests/mlmodel_langchain logger_logging: tests/logger_logging logger_loguru: tests/logger_loguru logger_structlog: tests/logger_structlog - messagebroker_pika: tests/messagebroker_pika messagebroker_confluentkafka: tests/messagebroker_confluentkafka messagebroker_kafkapython: tests/messagebroker_kafkapython + messagebroker_pika: tests/messagebroker_pika + mlmodel_langchain: tests/mlmodel_langchain + mlmodel_openai: tests/mlmodel_openai + mlmodel_sklearn: tests/mlmodel_sklearn template_genshi: tests/template_genshi template_jinja2: tests/template_jinja2 template_mako: tests/template_mako