diff --git a/datadog_checks_base/datadog_checks/base/data/agent_requirements.in b/datadog_checks_base/datadog_checks/base/data/agent_requirements.in index 026c532007a3e..0f5e2f3f615b1 100644 --- a/datadog_checks_base/datadog_checks/base/data/agent_requirements.in +++ b/datadog_checks_base/datadog_checks/base/data/agent_requirements.in @@ -56,7 +56,8 @@ prometheus-client==0.17.0; python_version > '3.0' protobuf==3.17.3; python_version < '3.0' protobuf==3.20.2; python_version > '3.0' psutil==5.9.0 -psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64' +psycopg[binary]==3.1.9; python_version > '3.0' +psycopg[pool]==3.1.7; python_version > '3.0' pyasn1==0.4.6 pycryptodomex==3.10.1 pydantic==1.10.8; python_version > '3.0' diff --git a/pgbouncer/datadog_checks/pgbouncer/pgbouncer.py b/pgbouncer/datadog_checks/pgbouncer/pgbouncer.py index 6d04fa27e4dd1..2f0e7fc9c056e 100644 --- a/pgbouncer/datadog_checks/pgbouncer/pgbouncer.py +++ b/pgbouncer/datadog_checks/pgbouncer/pgbouncer.py @@ -4,8 +4,8 @@ import re import time -import psycopg2 as pg -from psycopg2 import extras as pgextras +import psycopg as pg +from psycopg.rows import dict_row from six.moves.urllib.parse import urlparse from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative @@ -73,7 +73,7 @@ def _collect_stats(self, db): metric_scope.append(SERVERS_METRICS) try: - with db.cursor(cursor_factory=pgextras.DictCursor) as cursor: + with db.cursor(row_factory=dict_row) as cursor: for scope in metric_scope: descriptors = scope['descriptors'] metrics = scope['metrics'] @@ -159,6 +159,7 @@ def _get_connect_kwargs(self): return args + # TODO: needs to be updated to remain compliant with psycopg3 def _get_connection(self, use_cached=None): """Get and memoize connections to instances""" use_cached = use_cached if use_cached is not None else self.use_cached diff --git a/pgbouncer/hatch.toml b/pgbouncer/hatch.toml index 1c3f080ebbfd8..13a48a0686565 100644 --- a/pgbouncer/hatch.toml +++ b/pgbouncer/hatch.toml @@ -1,7 +1,7 @@ [env.collectors.datadog-checks] [[envs.default.matrix]] -python = ["2.7", "3.9"] +python = ["3.9"] version = ["1.7", "1.8", "1.12"] [envs.default.env-vars] diff --git a/pgbouncer/pyproject.toml b/pgbouncer/pyproject.toml index 92e8e3d6762b6..42270105a673f 100644 --- a/pgbouncer/pyproject.toml +++ b/pgbouncer/pyproject.toml @@ -1,8 +1,6 @@ [build-system] requires = [ "hatchling>=0.11.2", - "setuptools>=66; python_version > '3.0'", - "setuptools; python_version < '3.0'", ] build-backend = "hatchling.build" @@ -10,6 +8,7 @@ build-backend = "hatchling.build" name = "datadog-pgbouncer" description = "The PGBouncer check" readme = "README.md" +requires-python = ">=3.9" keywords = [ "datadog", "datadog agent", @@ -40,7 +39,7 @@ license = "BSD-3-Clause" [project.optional-dependencies] deps = [ - "psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64'", + "psycopg[binary]==3.1.9; python_version > '3.0'", ] [project.urls] diff --git a/postgres/datadog_checks/postgres/connections.py b/postgres/datadog_checks/postgres/connections.py index 264f1adc6103e..fd729a45d272c 100644 --- a/postgres/datadog_checks/postgres/connections.py +++ b/postgres/datadog_checks/postgres/connections.py @@ -8,7 +8,7 @@ import time from typing import Callable, Dict -import psycopg2 +import psycopg class ConnectionPoolFullError(Exception): @@ -23,10 +23,10 @@ def __str__(self): class ConnectionInfo: def __init__( self, - connection: psycopg2.extensions.connection, - deadline: int, + connection: psycopg.Connection, + deadline: datetime, active: bool, - last_accessed: int, + last_accessed: datetime, thread: threading.Thread, ): self.connection = connection @@ -79,7 +79,7 @@ def __init__(self, connect_fn: Callable[[str], None], max_conns: int = None): ) self.connect_fn = connect_fn - def _get_connection_raw(self, dbname: str, ttl_ms: int, timeout: int = None) -> psycopg2.extensions.connection: + def _get_connection_raw(self, dbname: str, ttl_ms: int, timeout: int = None) -> psycopg.Connection: """ Return a connection from the pool. """ @@ -101,7 +101,7 @@ def _get_connection_raw(self, dbname: str, ttl_ms: int, timeout: int = None) -> self._stats.connection_opened += 1 db = self.connect_fn(dbname) - if db.status != psycopg2.extensions.STATUS_READY: + if db.info.status != psycopg.pq.ConnStatus.OK: # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that db.rollback() @@ -183,6 +183,6 @@ def _terminate_connection_unsafe(self, dbname: str): db.close() except Exception: self._stats.connection_closed_failed += 1 - self._log.exception("failed to close DB connection for db=%s", dbname) + self.log.exception("failed to close DB connection for db=%s", dbname) return False return True diff --git a/postgres/datadog_checks/postgres/explain_parameterized_queries.py b/postgres/datadog_checks/postgres/explain_parameterized_queries.py index 620c3d6126f6d..448a3f104ff61 100644 --- a/postgres/datadog_checks/postgres/explain_parameterized_queries.py +++ b/postgres/datadog_checks/postgres/explain_parameterized_queries.py @@ -4,7 +4,8 @@ import logging -import psycopg2 +import psycopg +from psycopg.rows import dict_row from datadog_checks.base.utils.db.sql import compute_sql_signature from datadog_checks.base.utils.tracking import tracked_method @@ -112,7 +113,10 @@ def _get_number_of_parameters_for_prepared_statement(self, dbname, query_signatu rows = self._execute_query_and_fetch_rows( dbname, PARAM_TYPES_COUNT_QUERY.format(query_signature=query_signature) ) - return rows[0][0] if rows else 0 + count = 0 + if rows and 'count' in rows[0]: + count = rows[0]['count'] + return count @tracked_method(agent_check_getter=agent_check_getter) def _explain_prepared_statement(self, dbname, statement, obfuscated_statement, query_signature): @@ -157,12 +161,12 @@ def _deallocate_prepared_statement(self, dbname, query_signature): ) def _execute_query(self, dbname, query): - with self._check._get_db(dbname).cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: + with self._check._get_db(dbname).cursor(row_factory=dict_row) as cursor: logger.debug('Executing query=[%s]', query) cursor.execute(query) def _execute_query_and_fetch_rows(self, dbname, query): - with self._check._get_db(dbname).cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: + with self._check._get_db(dbname).cursor(row_factory=dict_row) as cursor: logger.debug('Executing query=[%s] and fetching rows', query) cursor.execute(query) return cursor.fetchall() diff --git a/postgres/datadog_checks/postgres/metadata.py b/postgres/datadog_checks/postgres/metadata.py index 8456526dc0aa3..ff89b881f59f3 100644 --- a/postgres/datadog_checks/postgres/metadata.py +++ b/postgres/datadog_checks/postgres/metadata.py @@ -5,7 +5,8 @@ import time from typing import Dict, Optional, Tuple # noqa: F401 -import psycopg2 +import psycopg +from psycopg.rows import dict_row try: import datadog_agent @@ -60,7 +61,7 @@ def shutdown_cb(): enabled=is_affirmative(config.resources_metadata_config.get('enabled', True)), dbms="postgres", min_collection_interval=config.min_collection_interval, - expected_db_exceptions=(psycopg2.errors.DatabaseError,), + expected_db_exceptions=(psycopg.errors.DatabaseError,), job_name="database-metadata", shutdown_callback=shutdown_cb, ) @@ -120,8 +121,9 @@ def _payload_pg_version(self): @tracked_method(agent_check_getter=agent_check_getter) def _collect_postgres_settings(self): - with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms) as conn: - cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms).cursor( + row_factory=dict_row + ) as cursor: self._log.debug("Running query [%s]", PG_SETTINGS_QUERY) self._time_since_last_settings_query = time.time() cursor.execute(PG_SETTINGS_QUERY) diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index 24ed04877dc18..760e1d85a65a1 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -7,7 +7,9 @@ from contextlib import closing from time import time -import psycopg2 +import psycopg +from psycopg.rows import dict_row +from psycopg_pool import ConnectionPool from six import iteritems from datadog_checks.base import AgentCheck @@ -351,14 +353,14 @@ def _run_query_scope(self, cursor, scope, is_custom_metrics, cols, descriptors): cursor.execute(query.replace(r'%', r'%%')) results = cursor.fetchall() - except psycopg2.errors.FeatureNotSupported as e: + except psycopg.errors.FeatureNotSupported as e: # This happens for example when trying to get replication metrics from readers in Aurora. Let's ignore it. log_func(e) self.db.rollback() self.log.debug("Disabling replication metrics") self._is_aurora = False self.metrics_cache.replication_metrics = {} - except psycopg2.errors.UndefinedFunction as e: + except psycopg.errors.UndefinedFunction as e: log_func(e) log_func( "It seems the PG version has been incorrectly identified as %s. " @@ -366,7 +368,7 @@ def _run_query_scope(self, cursor, scope, is_custom_metrics, cols, descriptors): ) self._clean_state() self.db.rollback() - except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e: + except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e: log_func("Not all metrics may be available: %s" % str(e)) self.db.rollback() @@ -522,7 +524,7 @@ def _new_connection(self, dbname): ) if self._config.query_timeout: connection_string += " options='-c statement_timeout=%s'" % self._config.query_timeout - conn = psycopg2.connect(connection_string) + conn = psycopg.connect(conninfo=connection_string, autocommit=True) else: password = self._config.password region = self._config.cloud_metadata.get('aws', {}).get('region', None) @@ -538,7 +540,7 @@ def _new_connection(self, dbname): 'host': self._config.host, 'user': self._config.user, 'password': password, - 'database': dbname, + 'dbname': dbname, 'sslmode': self._config.ssl_mode, 'application_name': self._config.application_name, } @@ -554,9 +556,7 @@ def _new_connection(self, dbname): args['sslkey'] = self._config.ssl_key if self._config.ssl_password: args['sslpassword'] = self._config.ssl_password - conn = psycopg2.connect(**args) - # Autocommit is enabled by default for safety for all new connections (to prevent long-lived transactions). - conn.set_session(autocommit=True, readonly=True) + conn = psycopg.connect(**args, autocommit=True) return conn def _connect(self): @@ -566,7 +566,7 @@ def _connect(self): self.db = None if self.db: - if self.db.status != psycopg2.extensions.STATUS_READY: + if self.db.info.status != psycopg.pq.ConnStatus.OK: # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that self.db.rollback() else: @@ -575,7 +575,7 @@ def _connect(self): # Reload pg_settings on a new connection to the main db def _load_pg_settings(self, db): try: - with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: + with db.cursor(row_factory=dict_row) as cursor: self.log.debug("Running query [%s]", PG_SETTINGS_QUERY) cursor.execute( PG_SETTINGS_QUERY, @@ -586,7 +586,7 @@ def _load_pg_settings(self, db): for setting in rows: name, val = setting self.pg_settings[name] = val - except (psycopg2.DatabaseError, psycopg2.OperationalError) as err: + except (psycopg.DatabaseError, psycopg.OperationalError) as err: self.log.warning("Failed to query for pg_settings: %s", repr(err)) self.count( "dd.postgres.error", @@ -597,25 +597,27 @@ def _load_pg_settings(self, db): def _get_db(self, dbname): """ - Returns a memoized psycopg2 connection to `dbname` with autocommit + Returns a memoized psycopg connection to `dbname` with autocommit Threadsafe as long as no transactions are used :param dbname: - :return: a psycopg2 connection + :return: a psycopg connection """ # TODO: migrate the rest of this check to use a connection from this pool - with self._db_pool_lock: - db = self._db_pool.get(dbname) - if not db or db.closed: - self.log.debug("initializing connection to dbname=%s", dbname) - db = self._new_connection(dbname) - self._db_pool[dbname] = db - if self._config.dbname == dbname: - # reload settings for the main DB only once every time the connection is reestablished - self._load_pg_settings(db) - if db.status != psycopg2.extensions.STATUS_READY: - # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that - db.rollback() - return db + # right now we wrap everything in a `with` block so the conn gets closed automatically + return self._new_connection(dbname) + # with self._db_pool_lock: + # db = self._db_pool.get(dbname) + # if not db or db.closed: + # self.log.debug("initializing connection to dbname=%s", dbname) + # db = self._new_connection(dbname) + # self._db_pool[dbname] = db + # if self._config.dbname == dbname: + # # reload settings for the main DB only once every time the connection is reestablished + # self._load_pg_settings(db) + # if db.info.status != psycopg.pq.ConnStatus.OK: + # # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that + # db.rollback() + # return db def _close_db_pool(self): # TODO: add automatic aging out of connections after some time @@ -654,7 +656,7 @@ def _collect_custom_queries(self, tags): try: self.log.debug("Running query: %s", query) cursor.execute(query) - except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e: + except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e: self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e)) self.db.rollback() continue diff --git a/postgres/datadog_checks/postgres/statement_samples.py b/postgres/datadog_checks/postgres/statement_samples.py index 2d8648660399e..9bfe73b1fb26d 100644 --- a/postgres/datadog_checks/postgres/statement_samples.py +++ b/postgres/datadog_checks/postgres/statement_samples.py @@ -8,7 +8,8 @@ from enum import Enum from typing import Dict, Optional, Tuple # noqa: F401 -import psycopg2 +import psycopg +from psycopg.rows import dict_row from cachetools import TTLCache from six import PY2 @@ -203,7 +204,7 @@ def shutdown_cb(): ), dbms="postgres", min_collection_interval=config.min_collection_interval, - expected_db_exceptions=(psycopg2.errors.DatabaseError,), + expected_db_exceptions=(psycopg.errors.DatabaseError,), job_name="query-samples", shutdown_callback=shutdown_cb, ) @@ -273,8 +274,9 @@ def _get_active_connections(self): query = PG_ACTIVE_CONNECTIONS_QUERY.format( pg_stat_activity_view=self._config.pg_stat_activity_view, extra_filters=extra_filters ) - with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms) as conn: - cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms).cursor( + row_factory=dict_row + ) as cursor: self._log.debug("Running query [%s] %s", query, params) cursor.execute(query, params) rows = cursor.fetchall() @@ -307,8 +309,9 @@ def _get_new_pg_stat_activity(self, available_activity_columns): pg_stat_activity_view=self._config.pg_stat_activity_view, extra_filters=extra_filters, ) - with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms) as conn: - cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms).cursor( + row_factory=dict_row + ) as cursor: self._log.debug("Running query [%s] %s", query, params) cursor.execute(query, params) rows = cursor.fetchall() @@ -325,8 +328,9 @@ def _get_pg_stat_activity_cols_cached(self, expected_cols): @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _get_available_activity_columns(self, all_expected_columns): - with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms) as conn: - cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms).cursor( + row_factory=dict_row + ) as cursor: cursor.execute( "select * from {pg_stat_activity_view} LIMIT 0".format( pg_stat_activity_view=self._config.pg_stat_activity_view @@ -450,18 +454,24 @@ def _collect_statement_samples(self): if self._explain_plan_coll_enabled: event_samples = self._collect_plans(rows) for e in event_samples: - self._check.database_monitoring_query_sample(json.dumps(e, default=default_json_event_encoding)) - submitted_count += 1 + try: + self._check.database_monitoring_query_sample(json.dumps(e, default=default_json_event_encoding)) + submitted_count += 1 + except TypeError as exe: + self._log.warning("error encoding event to json {}".format(repr(exe))) if self._report_activity_event(): active_connections = self._get_active_connections() activity_event = self._create_activity_event(rows, active_connections) - self._check.database_monitoring_query_activity( - json.dumps(activity_event, default=default_json_event_encoding) - ) - self._check.histogram( - "dd.postgres.collect_activity_snapshot.time", (time.time() - start_time) * 1000, tags=self.tags - ) + try: + self._check.database_monitoring_query_activity( + json.dumps(activity_event, default=default_json_event_encoding) + ) + self._check.histogram( + "dd.postgres.collect_activity_snapshot.time", (time.time() - start_time) * 1000, tags=self.tags + ) + except TypeError as exe: + self._log.warning("error encoding event to json {}".format(repr(exe))) elapsed_ms = (time.time() - start_time) * 1000 self._check.histogram( "dd.postgres.collect_statement_samples.time", @@ -524,12 +534,12 @@ def _get_db_explain_setup_state(self, dbname): # type: (str) -> Tuple[Optional[DBExplainError], Optional[Exception]] try: self._check._get_db(dbname) - except psycopg2.OperationalError as e: + except psycopg.OperationalError as e: self._log.warning( "cannot collect execution plans due to failed DB connection to dbname=%s: %s", dbname, repr(e) ) return DBExplainError.connection_error, e - except psycopg2.DatabaseError as e: + except psycopg.DatabaseError as e: self._log.warning( "cannot collect execution plans due to a database error in dbname=%s: %s", dbname, repr(e) ) @@ -537,14 +547,14 @@ def _get_db_explain_setup_state(self, dbname): try: result = self._run_explain(dbname, EXPLAIN_VALIDATION_QUERY, EXPLAIN_VALIDATION_QUERY) - except psycopg2.errors.InvalidSchemaName as e: + except psycopg.errors.InvalidSchemaName as e: self._log.warning("cannot collect execution plans due to invalid schema in dbname=%s: %s", dbname, repr(e)) self._emit_run_explain_error(dbname, DBExplainError.invalid_schema, e) return DBExplainError.invalid_schema, e - except psycopg2.errors.DatatypeMismatch as e: + except psycopg.errors.DatatypeMismatch as e: self._emit_run_explain_error(dbname, DBExplainError.datatype_mismatch, e) return DBExplainError.datatype_mismatch, e - except psycopg2.DatabaseError as e: + except psycopg.DatabaseError as e: # if the schema is valid then it's some problem with the function (missing, or invalid permissions, # incorrect definition) self._emit_run_explain_error(dbname, DBExplainError.failed_function, e) @@ -587,8 +597,7 @@ def _get_db_explain_setup_state_cached(self, dbname): def _run_explain(self, dbname, statement, obfuscated_statement): start_time = time.time() - with self._conn_pool.get_connection(dbname, ttl_ms=self._conn_ttl_ms) as conn: - cursor = conn.cursor() + with self._conn_pool.get_connection(dbname, ttl_ms=self._conn_ttl_ms).cursor() as cursor: self._log.debug("Running query on dbname=%s: %s(%s)", dbname, self._explain_function, obfuscated_statement) cursor.execute( """SELECT {explain_function}($stmt${statement}$stmt$)""".format( @@ -648,7 +657,7 @@ def _run_explain_safe(self, dbname, statement, obfuscated_statement, query_signa try: return self._run_explain(dbname, statement, obfuscated_statement), None, None - except psycopg2.errors.UndefinedParameter as e: + except psycopg.errors.UndefinedParameter as e: self._log.debug( "Unable to collect execution plan, clients using the extended query protocol or prepared statements" " can't be explained due to the separation of the parsed query and raw bind parameters: %s", @@ -662,18 +671,18 @@ def _run_explain_safe(self, dbname, statement, obfuscated_statement, query_signa self._explain_errors_cache[query_signature] = error_response self._emit_run_explain_error(dbname, DBExplainError.parameterized_query, e) return error_response - except psycopg2.errors.UndefinedTable as e: + except psycopg.errors.UndefinedTable as e: self._log.debug("Failed to collect execution plan: %s", repr(e)) error_response = None, DBExplainError.undefined_table, '{}'.format(type(e)) self._explain_errors_cache[query_signature] = error_response self._emit_run_explain_error(dbname, DBExplainError.undefined_table, e) return error_response - except psycopg2.errors.DatabaseError as e: + except psycopg.errors.DatabaseError as e: self._log.debug("Failed to collect execution plan: %s", repr(e)) error_response = None, DBExplainError.database_error, '{}'.format(type(e)) self._emit_run_explain_error(dbname, DBExplainError.database_error, e) - if isinstance(e, psycopg2.errors.ProgrammingError) and not isinstance( - e, psycopg2.errors.InsufficientPrivilege + if isinstance(e, psycopg.errors.ProgrammingError) and not isinstance( + e, psycopg.errors.InsufficientPrivilege ): # ProgrammingError is things like InvalidName, InvalidSchema, SyntaxError # we don't want to cache things like permission errors for a very long time because they can be fixed diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index f4c8e76babbfd..33d8b5e3a64a1 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -6,8 +6,8 @@ import copy import time -import psycopg2 -import psycopg2.extras +import psycopg +from psycopg.rows import dict_row from cachetools import TTLCache from datadog_checks.base import is_affirmative @@ -120,7 +120,7 @@ def __init__(self, check, config, shutdown_callback): check, run_sync=is_affirmative(config.statement_metrics_config.get('run_sync', False)), enabled=is_affirmative(config.statement_metrics_config.get('enabled', True)), - expected_db_exceptions=(psycopg2.errors.DatabaseError,), + expected_db_exceptions=(psycopg.errors.DatabaseError,), min_collection_interval=config.min_collection_interval, dbms="postgres", rate_limit=1 / float(collection_interval), @@ -150,7 +150,7 @@ def _execute_query(self, cursor, query, params=()): self._log.debug("Running query [%s] %s", query, params) cursor.execute(query, params) return cursor.fetchall() - except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e: + except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e: # A failed query could've derived from incorrect columns within the cache. It's a rare edge case, # but the next time the query is run, it will retrieve the correct columns. self._stat_column_cache = [] @@ -170,11 +170,13 @@ def _get_pg_stat_statements_columns(self): query = STATEMENTS_QUERY.format( cols='*', pg_stat_statements_view=self._config.pg_stat_statements_view, extra_clauses="LIMIT 0", filters="" ) - cursor = self._check._get_db(self._config.dbname).cursor() - self._execute_query(cursor, query, params=(self._config.dbname,)) - col_names = [desc[0] for desc in cursor.description] if cursor.description else [] - self._stat_column_cache = col_names - return col_names + # TODO: using a with blo`ck here actually closes the connection https://www.psycopg.org/psycopg3/docs/basic/from_pg2.html#diff-with + with self._check._get_db(self._config.dbname).cursor() as cursor: + # TODO: we do not need the dbname as a param here, psycopg2 just ignored it, but psycopg3 will fail + self._execute_query(cursor, query, params=()) + col_names = [desc[0] for desc in cursor.description] if cursor.description else [] + self._stat_column_cache = col_names + return col_names def run_job(self): # do not emit any dd.internal metrics for DBM specific check code @@ -246,31 +248,33 @@ def _load_pg_stat_statements(self): if self._check.pg_settings.get("track_io_timing") != "on": desired_columns -= PG_STAT_STATEMENTS_TIMING_COLUMNS - pg_stat_statements_max = int(self._check.pg_settings.get("pg_stat_statements.max")) - if pg_stat_statements_max > self._pg_stat_statements_max_warning_threshold: - self._check.record_warning( - DatabaseConfigurationError.high_pg_stat_statements_max, - warning_with_tags( - "pg_stat_statements.max is set to %d which is higher than the supported " - "value of %d. This can have a negative impact on database and collection of " - "query metrics performance. Consider lowering the pg_stat_statements.max value to %d. " - "Alternatively, you may acknowledge the potential performance impact by increasing the " - "query_metrics.pg_stat_statements_max_warning_threshold to equal or greater than %d to " - "silence this warning. " - "See https://docs.datadoghq.com/database_monitoring/setup_postgres/" - "troubleshooting#%s for more details", - pg_stat_statements_max, - self._pg_stat_statements_max_warning_threshold, - self._pg_stat_statements_max_warning_threshold, - self._pg_stat_statements_max_warning_threshold, - DatabaseConfigurationError.high_pg_stat_statements_max.value, - host=self._check.resolved_hostname, - dbname=self._config.dbname, - code=DatabaseConfigurationError.high_pg_stat_statements_max.value, - value=pg_stat_statements_max, - threshold=self._pg_stat_statements_max_warning_threshold, - ), - ) + # TODO: make this work again with upgraded psycopg + # this is just for monitoring & isn't important for the actual functionality in the check + # pg_stat_statements_max = int(self._check.pg_settings.get("pg_stat_statements.max")) + # if pg_stat_statements_max > self._pg_stat_statements_max_warning_threshold: + # self._check.record_warning( + # DatabaseConfigurationError.high_pg_stat_statements_max, + # warning_with_tags( + # "pg_stat_statements.max is set to %d which is higher than the supported " + # "value of %d. This can have a negative impact on database and collection of " + # "query metrics performance. Consider lowering the pg_stat_statements.max value to %d. " + # "Alternatively, you may acknowledge the potential performance impact by increasing the " + # "query_metrics.pg_stat_statements_max_warning_threshold to equal or greater than %d to " + # "silence this warning. " + # "See https://docs.datadoghq.com/database_monitoring/setup_postgres/" + # "troubleshooting#%s for more details", + # pg_stat_statements_max, + # self._pg_stat_statements_max_warning_threshold, + # self._pg_stat_statements_max_warning_threshold, + # self._pg_stat_statements_max_warning_threshold, + # DatabaseConfigurationError.high_pg_stat_statements_max.value, + # host=self._check.resolved_hostname, + # dbname=self._config.dbname, + # code=DatabaseConfigurationError.high_pg_stat_statements_max.value, + # value=pg_stat_statements_max, + # threshold=self._pg_stat_statements_max_warning_threshold, + # ), + # ) query_columns = sorted(available_columns & desired_columns) params = () @@ -283,22 +287,24 @@ def _load_pg_stat_statements(self): "pg_database.datname NOT ILIKE %s" for _ in self._config.ignore_databases ) params = params + tuple(self._config.ignore_databases) - return self._execute_query( - self._check._get_db(self._config.dbname).cursor(cursor_factory=psycopg2.extras.DictCursor), - STATEMENTS_QUERY.format( - cols=', '.join(query_columns), - pg_stat_statements_view=self._config.pg_stat_statements_view, - filters=filters, - extra_clauses="", - ), - params=params, - ) - except psycopg2.Error as e: + + with self._check._get_db(self._config.dbname).cursor(row_factory=dict_row) as cursor: + return self._execute_query( + cursor, + STATEMENTS_QUERY.format( + cols=', '.join(query_columns), + pg_stat_statements_view=self._config.pg_stat_statements_view, + filters=filters, + extra_clauses="", + ), + params=params, + ) + except psycopg.Error as e: error_tag = "error:database-{}".format(type(e).__name__) if ( - isinstance(e, psycopg2.errors.ObjectNotInPrerequisiteState) - ) and 'pg_stat_statements must be loaded' in str(e.pgerror): + isinstance(e, psycopg.errors.ObjectNotInPrerequisiteState) + ) and 'pg_stat_statements must be loaded' in str(e.sqlstate): error_tag = "error:database-{}-pg_stat_statements_not_loaded".format(type(e).__name__) self._check.record_warning( DatabaseConfigurationError.pg_stat_statements_not_loaded, @@ -314,7 +320,7 @@ def _load_pg_stat_statements(self): code=DatabaseConfigurationError.pg_stat_statements_not_loaded.value, ), ) - elif isinstance(e, psycopg2.errors.UndefinedTable) and 'pg_stat_statements' in str(e.pgerror): + elif isinstance(e, psycopg.errors.UndefinedTable) and 'pg_stat_statements' in str(e.sqlstate): error_tag = "error:database-{}-pg_stat_statements_not_created".format(type(e).__name__) self._check.record_warning( DatabaseConfigurationError.pg_stat_statements_not_created, @@ -356,18 +362,18 @@ def _emit_pg_stat_statements_dealloc(self): return try: rows = self._execute_query( - self._check._get_db(self._config.dbname).cursor(cursor_factory=psycopg2.extras.DictCursor), + self._check._get_db(self._config.dbname).cursor(row_factory=dict_row), PG_STAT_STATEMENTS_DEALLOC, ) - if rows: - dealloc = rows[0][0] + if rows and 'count' in rows[0]: + dealloc = rows[0]['count'] self._check.monotonic_count( "postgresql.pg_stat_statements.dealloc", dealloc, tags=self.tags, hostname=self._check.resolved_hostname, ) - except psycopg2.Error as e: + except psycopg.Error as e: self._log.warning("Failed to query for pg_stat_statements_info: %s", e) @tracked_method(agent_check_getter=agent_check_getter) @@ -375,12 +381,12 @@ def _emit_pg_stat_statements_metrics(self): query = PG_STAT_STATEMENTS_COUNT_QUERY_LT_9_4 if self._check.version < V9_4 else PG_STAT_STATEMENTS_COUNT_QUERY try: rows = self._execute_query( - self._check._get_db(self._config.dbname).cursor(cursor_factory=psycopg2.extras.DictCursor), + self._check._get_db(self._config.dbname).cursor(row_factory=dict_row), query, ) count = 0 - if rows: - count = rows[0][0] + if rows and 'count' in rows[0]: + count = rows[0]['count'] self._check.gauge( "postgresql.pg_stat_statements.max", self._check.pg_settings.get("pg_stat_statements.max", 0), @@ -393,7 +399,7 @@ def _emit_pg_stat_statements_metrics(self): tags=self.tags, hostname=self._check.resolved_hostname, ) - except psycopg2.Error as e: + except psycopg.Error as e: self._log.warning("Failed to query for pg_stat_statements count: %s", e) @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) diff --git a/postgres/pyproject.toml b/postgres/pyproject.toml index 32c90e8075194..3ebab24ffef69 100644 --- a/postgres/pyproject.toml +++ b/postgres/pyproject.toml @@ -1,8 +1,6 @@ [build-system] requires = [ "hatchling>=0.11.2", - "setuptools>=66; python_version > '3.0'", - "setuptools; python_version < '3.0'", ] build-backend = "hatchling.build" @@ -39,12 +37,10 @@ license = "BSD-3-Clause" [project.optional-dependencies] deps = [ - "boto3==1.17.112; python_version < '3.0'", "boto3==1.27.0; python_version > '3.0'", "cachetools==3.1.1; python_version < '3.0'", "cachetools==5.3.1; python_version > '3.0'", - "psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64'", - "semver==2.13.0; python_version < '3.0'", + "psycopg[binary]==3.1.9; python_version > '3.0'", "semver==3.0.1; python_version > '3.0'", ]