Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to psycopg3 #14801

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ prometheus-client==0.17.0; python_version > '3.0'
protobuf==3.17.3; python_version < '3.0'
protobuf==3.20.2; python_version > '3.0'
psutil==5.9.0
psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64'
psycopg[binary]==3.1.9; python_version > '3.0'
psycopg[pool]==3.1.7; python_version > '3.0'
pyasn1==0.4.6
pycryptodomex==3.10.1
pydantic==1.10.8; python_version > '3.0'
Expand Down
7 changes: 4 additions & 3 deletions pgbouncer/datadog_checks/pgbouncer/pgbouncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import re
import time

import psycopg2 as pg
from psycopg2 import extras as pgextras
import psycopg as pg
from psycopg.rows import dict_row
from six.moves.urllib.parse import urlparse

from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative
Expand Down Expand Up @@ -73,7 +73,7 @@ def _collect_stats(self, db):
metric_scope.append(SERVERS_METRICS)

try:
with db.cursor(cursor_factory=pgextras.DictCursor) as cursor:
with db.cursor(row_factory=dict_row) as cursor:
for scope in metric_scope:
descriptors = scope['descriptors']
metrics = scope['metrics']
Expand Down Expand Up @@ -159,6 +159,7 @@ def _get_connect_kwargs(self):

return args

# TODO: needs to be updated to remain compliant with psycopg3
def _get_connection(self, use_cached=None):
"""Get and memoize connections to instances"""
use_cached = use_cached if use_cached is not None else self.use_cached
Expand Down
2 changes: 1 addition & 1 deletion pgbouncer/hatch.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[env.collectors.datadog-checks]

[[envs.default.matrix]]
python = ["2.7", "3.9"]
python = ["3.9"]
version = ["1.7", "1.8", "1.12"]

[envs.default.env-vars]
Expand Down
5 changes: 2 additions & 3 deletions pgbouncer/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
[build-system]
requires = [
"hatchling>=0.11.2",
"setuptools>=66; python_version > '3.0'",
"setuptools; python_version < '3.0'",
]
build-backend = "hatchling.build"

[project]
name = "datadog-pgbouncer"
description = "The PGBouncer check"
readme = "README.md"
requires-python = ">=3.9"
keywords = [
"datadog",
"datadog agent",
Expand Down Expand Up @@ -40,7 +39,7 @@ license = "BSD-3-Clause"

[project.optional-dependencies]
deps = [
"psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64'",
"psycopg[binary]==3.1.9; python_version > '3.0'",
]

[project.urls]
Expand Down
14 changes: 7 additions & 7 deletions postgres/datadog_checks/postgres/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
from typing import Callable, Dict

import psycopg2
import psycopg


class ConnectionPoolFullError(Exception):
Expand All @@ -23,10 +23,10 @@ def __str__(self):
class ConnectionInfo:
def __init__(
self,
connection: psycopg2.extensions.connection,
deadline: int,
connection: psycopg.Connection,
deadline: datetime,
active: bool,
last_accessed: int,
last_accessed: datetime,
thread: threading.Thread,
):
self.connection = connection
Expand Down Expand Up @@ -79,7 +79,7 @@ def __init__(self, connect_fn: Callable[[str], None], max_conns: int = None):
)
self.connect_fn = connect_fn

def _get_connection_raw(self, dbname: str, ttl_ms: int, timeout: int = None) -> psycopg2.extensions.connection:
def _get_connection_raw(self, dbname: str, ttl_ms: int, timeout: int = None) -> psycopg.Connection:
"""
Return a connection from the pool.
"""
Expand All @@ -101,7 +101,7 @@ def _get_connection_raw(self, dbname: str, ttl_ms: int, timeout: int = None) ->
self._stats.connection_opened += 1
db = self.connect_fn(dbname)

if db.status != psycopg2.extensions.STATUS_READY:
if db.info.status != psycopg.pq.ConnStatus.OK:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()

Expand Down Expand Up @@ -183,6 +183,6 @@ def _terminate_connection_unsafe(self, dbname: str):
db.close()
except Exception:
self._stats.connection_closed_failed += 1
self._log.exception("failed to close DB connection for db=%s", dbname)
self.log.exception("failed to close DB connection for db=%s", dbname)
return False
return True
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import logging

import psycopg2
import psycopg
from psycopg.rows import dict_row

from datadog_checks.base.utils.db.sql import compute_sql_signature
from datadog_checks.base.utils.tracking import tracked_method
Expand Down Expand Up @@ -112,7 +113,10 @@ def _get_number_of_parameters_for_prepared_statement(self, dbname, query_signatu
rows = self._execute_query_and_fetch_rows(
dbname, PARAM_TYPES_COUNT_QUERY.format(query_signature=query_signature)
)
return rows[0][0] if rows else 0
count = 0
if rows and 'count' in rows[0]:
count = rows[0]['count']
return count

@tracked_method(agent_check_getter=agent_check_getter)
def _explain_prepared_statement(self, dbname, statement, obfuscated_statement, query_signature):
Expand Down Expand Up @@ -157,12 +161,12 @@ def _deallocate_prepared_statement(self, dbname, query_signature):
)

def _execute_query(self, dbname, query):
with self._check._get_db(dbname).cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
with self._check._get_db(dbname).cursor(row_factory=dict_row) as cursor:
logger.debug('Executing query=[%s]', query)
cursor.execute(query)

def _execute_query_and_fetch_rows(self, dbname, query):
with self._check._get_db(dbname).cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
with self._check._get_db(dbname).cursor(row_factory=dict_row) as cursor:
logger.debug('Executing query=[%s] and fetching rows', query)
cursor.execute(query)
return cursor.fetchall()
10 changes: 6 additions & 4 deletions postgres/datadog_checks/postgres/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import time
from typing import Dict, Optional, Tuple # noqa: F401

import psycopg2
import psycopg
from psycopg.rows import dict_row

try:
import datadog_agent
Expand Down Expand Up @@ -60,7 +61,7 @@ def shutdown_cb():
enabled=is_affirmative(config.resources_metadata_config.get('enabled', True)),
dbms="postgres",
min_collection_interval=config.min_collection_interval,
expected_db_exceptions=(psycopg2.errors.DatabaseError,),
expected_db_exceptions=(psycopg.errors.DatabaseError,),
job_name="database-metadata",
shutdown_callback=shutdown_cb,
)
Expand Down Expand Up @@ -120,8 +121,9 @@ def _payload_pg_version(self):

@tracked_method(agent_check_getter=agent_check_getter)
def _collect_postgres_settings(self):
with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms) as conn:
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
with self._conn_pool.get_connection(self._config.dbname, ttl_ms=self._conn_ttl_ms).cursor(
row_factory=dict_row
) as cursor:
self._log.debug("Running query [%s]", PG_SETTINGS_QUERY)
self._time_since_last_settings_query = time.time()
cursor.execute(PG_SETTINGS_QUERY)
Expand Down
58 changes: 30 additions & 28 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from contextlib import closing
from time import time

import psycopg2
import psycopg
from psycopg.rows import dict_row
from psycopg_pool import ConnectionPool
from six import iteritems

from datadog_checks.base import AgentCheck
Expand Down Expand Up @@ -351,22 +353,22 @@ def _run_query_scope(self, cursor, scope, is_custom_metrics, cols, descriptors):
cursor.execute(query.replace(r'%', r'%%'))

results = cursor.fetchall()
except psycopg2.errors.FeatureNotSupported as e:
except psycopg.errors.FeatureNotSupported as e:
# This happens for example when trying to get replication metrics from readers in Aurora. Let's ignore it.
log_func(e)
self.db.rollback()
self.log.debug("Disabling replication metrics")
self._is_aurora = False
self.metrics_cache.replication_metrics = {}
except psycopg2.errors.UndefinedFunction as e:
except psycopg.errors.UndefinedFunction as e:
log_func(e)
log_func(
"It seems the PG version has been incorrectly identified as %s. "
"A reattempt to identify the right version will happen on next agent run." % self._version
)
self._clean_state()
self.db.rollback()
except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e:
except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e:
log_func("Not all metrics may be available: %s" % str(e))
self.db.rollback()

Expand Down Expand Up @@ -522,7 +524,7 @@ def _new_connection(self, dbname):
)
if self._config.query_timeout:
connection_string += " options='-c statement_timeout=%s'" % self._config.query_timeout
conn = psycopg2.connect(connection_string)
conn = psycopg.connect(conninfo=connection_string, autocommit=True)
else:
password = self._config.password
region = self._config.cloud_metadata.get('aws', {}).get('region', None)
Expand All @@ -538,7 +540,7 @@ def _new_connection(self, dbname):
'host': self._config.host,
'user': self._config.user,
'password': password,
'database': dbname,
'dbname': dbname,
'sslmode': self._config.ssl_mode,
'application_name': self._config.application_name,
}
Expand All @@ -554,9 +556,7 @@ def _new_connection(self, dbname):
args['sslkey'] = self._config.ssl_key
if self._config.ssl_password:
args['sslpassword'] = self._config.ssl_password
conn = psycopg2.connect(**args)
# Autocommit is enabled by default for safety for all new connections (to prevent long-lived transactions).
conn.set_session(autocommit=True, readonly=True)
conn = psycopg.connect(**args, autocommit=True)
return conn

def _connect(self):
Expand All @@ -566,7 +566,7 @@ def _connect(self):
self.db = None

if self.db:
if self.db.status != psycopg2.extensions.STATUS_READY:
if self.db.info.status != psycopg.pq.ConnStatus.OK:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
self.db.rollback()
else:
Expand All @@ -575,7 +575,7 @@ def _connect(self):
# Reload pg_settings on a new connection to the main db
def _load_pg_settings(self, db):
try:
with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
with db.cursor(row_factory=dict_row) as cursor:
self.log.debug("Running query [%s]", PG_SETTINGS_QUERY)
cursor.execute(
PG_SETTINGS_QUERY,
Expand All @@ -586,7 +586,7 @@ def _load_pg_settings(self, db):
for setting in rows:
name, val = setting
self.pg_settings[name] = val
except (psycopg2.DatabaseError, psycopg2.OperationalError) as err:
except (psycopg.DatabaseError, psycopg.OperationalError) as err:
self.log.warning("Failed to query for pg_settings: %s", repr(err))
self.count(
"dd.postgres.error",
Expand All @@ -597,25 +597,27 @@ def _load_pg_settings(self, db):

def _get_db(self, dbname):
"""
Returns a memoized psycopg2 connection to `dbname` with autocommit
Returns a memoized psycopg connection to `dbname` with autocommit
Threadsafe as long as no transactions are used
:param dbname:
:return: a psycopg2 connection
:return: a psycopg connection
"""
# TODO: migrate the rest of this check to use a connection from this pool
with self._db_pool_lock:
db = self._db_pool.get(dbname)
if not db or db.closed:
self.log.debug("initializing connection to dbname=%s", dbname)
db = self._new_connection(dbname)
self._db_pool[dbname] = db
if self._config.dbname == dbname:
# reload settings for the main DB only once every time the connection is reestablished
self._load_pg_settings(db)
if db.status != psycopg2.extensions.STATUS_READY:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()
return db
# right now we wrap everything in a `with` block so the conn gets closed automatically
return self._new_connection(dbname)
# with self._db_pool_lock:
# db = self._db_pool.get(dbname)
# if not db or db.closed:
# self.log.debug("initializing connection to dbname=%s", dbname)
# db = self._new_connection(dbname)
# self._db_pool[dbname] = db
# if self._config.dbname == dbname:
# # reload settings for the main DB only once every time the connection is reestablished
# self._load_pg_settings(db)
# if db.info.status != psycopg.pq.ConnStatus.OK:
# # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
# db.rollback()
# return db

def _close_db_pool(self):
# TODO: add automatic aging out of connections after some time
Expand Down Expand Up @@ -654,7 +656,7 @@ def _collect_custom_queries(self, tags):
try:
self.log.debug("Running query: %s", query)
cursor.execute(query)
except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e:
except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e:
self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e))
self.db.rollback()
continue
Expand Down
Loading
Loading