Skip to content

Commit

Permalink
Revert "try removing with clauses and closing cursor myself"
Browse files Browse the repository at this point in the history
This reverts commit 241fc1dca7950e16297e32f87e69b695b9e62e1d.
  • Loading branch information
jmeunier28 committed Jul 13, 2023
1 parent 46ec3f0 commit f4a8079
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 57 deletions.
80 changes: 35 additions & 45 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,9 @@ def _new_query_executor(self, queries):
)

def execute_query_raw(self, query):
cursor = self.db.cursor()
try:
self.db.cursor.execute(query)
with self.db.cursor() as cursor:
cursor.execute(query)
return cursor.fetchall()
except psycopg.Error as e:
self.log.error("error executing query {} {}", query, repr(e))
finally:
if cursor:
cursor.close()

@property
def dynamic_queries(self):
Expand Down Expand Up @@ -579,18 +573,18 @@ def _connect(self):

# Reload pg_settings on a new connection to the main db
def _load_pg_settings(self, db):
cursor = db.cursor(row_factory=dict_row)
try:
self.log.debug("Running query [%s]", PG_SETTINGS_QUERY)
cursor.execute(
PG_SETTINGS_QUERY,
("pg_stat_statements.max", "track_activity_query_size", "track_io_timing"),
)
rows = cursor.fetchall()
self.pg_settings.clear()
for setting in rows:
name, val = setting
self.pg_settings[name] = val
with db.cursor(row_factory=dict_row) as cursor:
self.log.debug("Running query [%s]", PG_SETTINGS_QUERY)
cursor.execute(
PG_SETTINGS_QUERY,
("pg_stat_statements.max", "track_activity_query_size", "track_io_timing"),
)
rows = cursor.fetchall()
self.pg_settings.clear()
for setting in rows:
name, val = setting
self.pg_settings[name] = val
except (psycopg.DatabaseError, psycopg.OperationalError) as err:
self.log.warning("Failed to query for pg_settings: %s", repr(err))
self.count(
Expand All @@ -599,9 +593,6 @@ def _load_pg_settings(self, db):
tags=self.tags + ["error:load-pg-settings"] + self._get_debug_tags(),
hostname=self.resolved_hostname,
)
finally:
if cursor:
cursor.close()

def _get_db(self, dbname):
"""
Expand All @@ -612,19 +603,20 @@ def _get_db(self, dbname):
"""
# TODO: migrate the rest of this check to use a connection from this pool
# right now we wrap everything in a `with` block so the conn gets closed automatically
with self._db_pool_lock:
db = self._db_pool.get(dbname)
if not db or db.closed:
self.log.debug("initializing connection to dbname=%s", dbname)
db = self._new_connection(dbname)
self._db_pool[dbname] = db
if self._config.dbname == dbname:
# reload settings for the main DB only once every time the connection is reestablished
self._load_pg_settings(db)
if db.info.status != psycopg.pq.ConnStatus.OK:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()
return db
return self._new_connection(dbname)
# with self._db_pool_lock:
# db = self._db_pool.get(dbname)
# if not db or db.closed:
# self.log.debug("initializing connection to dbname=%s", dbname)
# db = self._new_connection(dbname)
# self._db_pool[dbname] = db
# if self._config.dbname == dbname:
# # reload settings for the main DB only once every time the connection is reestablished
# self._load_pg_settings(db)
# if db.info.status != psycopg.pq.ConnStatus.OK:
# # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
# db.rollback()
# return db

def _close_db_pool(self):
# TODO: add automatic aging out of connections after some time
Expand Down Expand Up @@ -659,16 +651,14 @@ def _collect_custom_queries(self, tags):
continue

cursor = self.db.cursor()
try:
self.log.debug("Running query: %s", query)
cursor.execute(query)
except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e:
self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e))
self.db.rollback()
continue
finally:
if cursor:
cursor.close()
with closing(cursor) as cursor:
try:
self.log.debug("Running query: %s", query)
cursor.execute(query)
except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e:
self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e))
self.db.rollback()
continue

for row in cursor:
if not row:
Expand Down
14 changes: 2 additions & 12 deletions postgres/datadog_checks/postgres/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,12 @@ def _get_pg_stat_statements_columns(self):
cols='*', pg_stat_statements_view=self._config.pg_stat_statements_view, extra_clauses="LIMIT 0", filters=""
)
# TODO: using a with blo`ck here actually closes the connection https://www.psycopg.org/psycopg3/docs/basic/from_pg2.html#diff-with
cursor = self._check._get_db(self._config.dbname).cursor()
try:
with self._check._get_db(self._config.dbname).cursor() as cursor:
# TODO: we do not need the dbname as a param here, psycopg2 just ignored it, but psycopg3 will fail
self._execute_query(cursor, query, params=())
col_names = [desc[0] for desc in cursor.description] if cursor.description else []
self._stat_column_cache = col_names
return col_names
except psycopg.Error as e:
raise e
finally:
if cursor:
cursor.close()

def run_job(self):
# do not emit any dd.internal metrics for DBM specific check code
Expand Down Expand Up @@ -294,8 +288,7 @@ def _load_pg_stat_statements(self):
)
params = params + tuple(self._config.ignore_databases)

cursor = self._check._get_db(self._config.dbname).cursor(row_factory=dict_row)
try:
with self._check._get_db(self._config.dbname).cursor(row_factory=dict_row) as cursor:
return self._execute_query(
cursor,
STATEMENTS_QUERY.format(
Expand All @@ -306,9 +299,6 @@ def _load_pg_stat_statements(self):
),
params=params,
)
finally:
if cursor:
cursor.close()
except psycopg.Error as e:
error_tag = "error:database-{}".format(type(e).__name__)

Expand Down

0 comments on commit f4a8079

Please sign in to comment.