diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index 29433d4a422ef..d365fe22cf657 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -9,6 +9,7 @@ import psycopg from psycopg.rows import dict_row +from psycopg_pool import ConnectionPool from six import iteritems from datadog_checks.base import AgentCheck @@ -145,9 +146,15 @@ def _new_query_executor(self, queries): ) def execute_query_raw(self, query): - with self.db.cursor() as cursor: - cursor.execute(query) + cursor = self.db.cursor() + try: + self.db.cursor.execute(query) return cursor.fetchall() + except psycopg.Error as e: + self.log.error("error executing query {} {}", query, repr(e)) + finally: + if cursor: + cursor.close() @property def dynamic_queries(self): @@ -573,18 +580,18 @@ def _connect(self): # Reload pg_settings on a new connection to the main db def _load_pg_settings(self, db): + cursor = db.cursor(row_factory=dict_row) try: - with db.cursor(row_factory=dict_row) as cursor: - self.log.debug("Running query [%s]", PG_SETTINGS_QUERY) - cursor.execute( - PG_SETTINGS_QUERY, - ("pg_stat_statements.max", "track_activity_query_size", "track_io_timing"), - ) - rows = cursor.fetchall() - self.pg_settings.clear() - for setting in rows: - name, val = setting - self.pg_settings[name] = val + self.log.debug("Running query [%s]", PG_SETTINGS_QUERY) + cursor.execute( + PG_SETTINGS_QUERY, + ("pg_stat_statements.max", "track_activity_query_size", "track_io_timing"), + ) + rows = cursor.fetchall() + self.pg_settings.clear() + for setting in rows: + name, val = setting + self.pg_settings[name] = val except (psycopg.DatabaseError, psycopg.OperationalError) as err: self.log.warning("Failed to query for pg_settings: %s", repr(err)) self.count( @@ -593,6 +600,9 @@ def _load_pg_settings(self, db): tags=self.tags + ["error:load-pg-settings"] + self._get_debug_tags(), hostname=self.resolved_hostname, ) + finally: + if cursor: + cursor.close() def _get_db(self, dbname): """ @@ -603,20 +613,19 @@ def _get_db(self, dbname): """ # TODO: migrate the rest of this check to use a connection from this pool # right now we wrap everything in a `with` block so the conn gets closed automatically - return self._new_connection(dbname) - # with self._db_pool_lock: - # db = self._db_pool.get(dbname) - # if not db or db.closed: - # self.log.debug("initializing connection to dbname=%s", dbname) - # db = self._new_connection(dbname) - # self._db_pool[dbname] = db - # if self._config.dbname == dbname: - # # reload settings for the main DB only once every time the connection is reestablished - # self._load_pg_settings(db) - # if db.info.status != psycopg.pq.ConnStatus.OK: - # # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that - # db.rollback() - # return db + with self._db_pool_lock: + db = self._db_pool.get(dbname) + if not db or db.closed: + self.log.debug("initializing connection to dbname=%s", dbname) + db = self._new_connection(dbname) + self._db_pool[dbname] = db + if self._config.dbname == dbname: + # reload settings for the main DB only once every time the connection is reestablished + self._load_pg_settings(db) + if db.info.status != psycopg.pq.ConnStatus.OK: + # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that + db.rollback() + return db def _close_db_pool(self): # TODO: add automatic aging out of connections after some time @@ -651,14 +660,16 @@ def _collect_custom_queries(self, tags): continue cursor = self.db.cursor() - with closing(cursor) as cursor: - try: - self.log.debug("Running query: %s", query) - cursor.execute(query) - except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e: - self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e)) - self.db.rollback() - continue + try: + self.log.debug("Running query: %s", query) + cursor.execute(query) + except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e: + self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e)) + self.db.rollback() + continue + finally: + if cursor: + cursor.close() for row in cursor: if not row: diff --git a/postgres/datadog_checks/postgres/statement_samples.py b/postgres/datadog_checks/postgres/statement_samples.py index bb9a019ab0dfa..9bfe73b1fb26d 100644 --- a/postgres/datadog_checks/postgres/statement_samples.py +++ b/postgres/datadog_checks/postgres/statement_samples.py @@ -597,8 +597,7 @@ def _get_db_explain_setup_state_cached(self, dbname): def _run_explain(self, dbname, statement, obfuscated_statement): start_time = time.time() - with self._conn_pool.get_connection(dbname, ttl_ms=self._conn_ttl_ms) as conn: - cursor = conn.cursor() + with self._conn_pool.get_connection(dbname, ttl_ms=self._conn_ttl_ms).cursor() as cursor: self._log.debug("Running query on dbname=%s: %s(%s)", dbname, self._explain_function, obfuscated_statement) cursor.execute( """SELECT {explain_function}($stmt${statement}$stmt$)""".format( diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 33d8b5e3a64a1..da1b4dc0a9e99 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -171,12 +171,18 @@ def _get_pg_stat_statements_columns(self): cols='*', pg_stat_statements_view=self._config.pg_stat_statements_view, extra_clauses="LIMIT 0", filters="" ) # TODO: using a with blo`ck here actually closes the connection https://www.psycopg.org/psycopg3/docs/basic/from_pg2.html#diff-with - with self._check._get_db(self._config.dbname).cursor() as cursor: + cursor = self._check._get_db(self._config.dbname).cursor() + try: # TODO: we do not need the dbname as a param here, psycopg2 just ignored it, but psycopg3 will fail self._execute_query(cursor, query, params=()) col_names = [desc[0] for desc in cursor.description] if cursor.description else [] self._stat_column_cache = col_names return col_names + except psycopg.Error as e: + raise e + finally: + if cursor: + cursor.close() def run_job(self): # do not emit any dd.internal metrics for DBM specific check code @@ -288,7 +294,8 @@ def _load_pg_stat_statements(self): ) params = params + tuple(self._config.ignore_databases) - with self._check._get_db(self._config.dbname).cursor(row_factory=dict_row) as cursor: + cursor = self._check._get_db(self._config.dbname).cursor(row_factory=dict_row) + try: return self._execute_query( cursor, STATEMENTS_QUERY.format( @@ -299,6 +306,9 @@ def _load_pg_stat_statements(self): ), params=params, ) + finally: + if cursor: + cursor.close() except psycopg.Error as e: error_tag = "error:database-{}".format(type(e).__name__)