Skip to content

Commit

Permalink
try removing with clauses and closing cursor myself
Browse files Browse the repository at this point in the history
  • Loading branch information
jmeunier28 committed Jul 13, 2023
1 parent d4aa0dc commit 1a225bd
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 39 deletions.
81 changes: 46 additions & 35 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import psycopg
from psycopg.rows import dict_row
from psycopg_pool import ConnectionPool
from six import iteritems

from datadog_checks.base import AgentCheck
Expand Down Expand Up @@ -145,9 +146,15 @@ def _new_query_executor(self, queries):
)

def execute_query_raw(self, query):
with self.db.cursor() as cursor:
cursor.execute(query)
cursor = self.db.cursor()
try:
self.db.cursor.execute(query)
return cursor.fetchall()
except psycopg.Error as e:
self.log.error("error executing query {} {}", query, repr(e))
finally:
if cursor:
cursor.close()

@property
def dynamic_queries(self):
Expand Down Expand Up @@ -573,18 +580,18 @@ def _connect(self):

# Reload pg_settings on a new connection to the main db
def _load_pg_settings(self, db):
cursor = db.cursor(row_factory=dict_row)
try:
with db.cursor(row_factory=dict_row) as cursor:
self.log.debug("Running query [%s]", PG_SETTINGS_QUERY)
cursor.execute(
PG_SETTINGS_QUERY,
("pg_stat_statements.max", "track_activity_query_size", "track_io_timing"),
)
rows = cursor.fetchall()
self.pg_settings.clear()
for setting in rows:
name, val = setting
self.pg_settings[name] = val
self.log.debug("Running query [%s]", PG_SETTINGS_QUERY)
cursor.execute(
PG_SETTINGS_QUERY,
("pg_stat_statements.max", "track_activity_query_size", "track_io_timing"),
)
rows = cursor.fetchall()
self.pg_settings.clear()
for setting in rows:
name, val = setting
self.pg_settings[name] = val
except (psycopg.DatabaseError, psycopg.OperationalError) as err:
self.log.warning("Failed to query for pg_settings: %s", repr(err))
self.count(
Expand All @@ -593,6 +600,9 @@ def _load_pg_settings(self, db):
tags=self.tags + ["error:load-pg-settings"] + self._get_debug_tags(),
hostname=self.resolved_hostname,
)
finally:
if cursor:
cursor.close()

def _get_db(self, dbname):
"""
Expand All @@ -603,20 +613,19 @@ def _get_db(self, dbname):
"""
# TODO: migrate the rest of this check to use a connection from this pool
# right now we wrap everything in a `with` block so the conn gets closed automatically
return self._new_connection(dbname)
# with self._db_pool_lock:
# db = self._db_pool.get(dbname)
# if not db or db.closed:
# self.log.debug("initializing connection to dbname=%s", dbname)
# db = self._new_connection(dbname)
# self._db_pool[dbname] = db
# if self._config.dbname == dbname:
# # reload settings for the main DB only once every time the connection is reestablished
# self._load_pg_settings(db)
# if db.info.status != psycopg.pq.ConnStatus.OK:
# # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
# db.rollback()
# return db
with self._db_pool_lock:
db = self._db_pool.get(dbname)
if not db or db.closed:
self.log.debug("initializing connection to dbname=%s", dbname)
db = self._new_connection(dbname)
self._db_pool[dbname] = db
if self._config.dbname == dbname:
# reload settings for the main DB only once every time the connection is reestablished
self._load_pg_settings(db)
if db.info.status != psycopg.pq.ConnStatus.OK:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()
return db

def _close_db_pool(self):
# TODO: add automatic aging out of connections after some time
Expand Down Expand Up @@ -651,14 +660,16 @@ def _collect_custom_queries(self, tags):
continue

cursor = self.db.cursor()
with closing(cursor) as cursor:
try:
self.log.debug("Running query: %s", query)
cursor.execute(query)
except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e:
self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e))
self.db.rollback()
continue
try:
self.log.debug("Running query: %s", query)
cursor.execute(query)
except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e:
self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e))
self.db.rollback()
continue
finally:
if cursor:
cursor.close()

for row in cursor:
if not row:
Expand Down
3 changes: 1 addition & 2 deletions postgres/datadog_checks/postgres/statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,7 @@ def _get_db_explain_setup_state_cached(self, dbname):

def _run_explain(self, dbname, statement, obfuscated_statement):
start_time = time.time()
with self._conn_pool.get_connection(dbname, ttl_ms=self._conn_ttl_ms) as conn:
cursor = conn.cursor()
with self._conn_pool.get_connection(dbname, ttl_ms=self._conn_ttl_ms).cursor() as cursor:
self._log.debug("Running query on dbname=%s: %s(%s)", dbname, self._explain_function, obfuscated_statement)
cursor.execute(
"""SELECT {explain_function}($stmt${statement}$stmt$)""".format(
Expand Down
14 changes: 12 additions & 2 deletions postgres/datadog_checks/postgres/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,18 @@ def _get_pg_stat_statements_columns(self):
cols='*', pg_stat_statements_view=self._config.pg_stat_statements_view, extra_clauses="LIMIT 0", filters=""
)
# TODO: using a with blo`ck here actually closes the connection https://www.psycopg.org/psycopg3/docs/basic/from_pg2.html#diff-with
with self._check._get_db(self._config.dbname).cursor() as cursor:
cursor = self._check._get_db(self._config.dbname).cursor()
try:
# TODO: we do not need the dbname as a param here, psycopg2 just ignored it, but psycopg3 will fail
self._execute_query(cursor, query, params=())
col_names = [desc[0] for desc in cursor.description] if cursor.description else []
self._stat_column_cache = col_names
return col_names
except psycopg.Error as e:
raise e
finally:
if cursor:
cursor.close()

def run_job(self):
# do not emit any dd.internal metrics for DBM specific check code
Expand Down Expand Up @@ -288,7 +294,8 @@ def _load_pg_stat_statements(self):
)
params = params + tuple(self._config.ignore_databases)

with self._check._get_db(self._config.dbname).cursor(row_factory=dict_row) as cursor:
cursor = self._check._get_db(self._config.dbname).cursor(row_factory=dict_row)
try:
return self._execute_query(
cursor,
STATEMENTS_QUERY.format(
Expand All @@ -299,6 +306,9 @@ def _load_pg_stat_statements(self):
),
params=params,
)
finally:
if cursor:
cursor.close()
except psycopg.Error as e:
error_tag = "error:database-{}".format(type(e).__name__)

Expand Down

0 comments on commit 1a225bd

Please sign in to comment.