diff --git a/postgres/CHANGELOG.md b/postgres/CHANGELOG.md index 2b8b88e2acab7..de4ce7dd2762b 100644 --- a/postgres/CHANGELOG.md +++ b/postgres/CHANGELOG.md @@ -6,6 +6,10 @@ * Attempt to connect to the database and fail fast before trying to establish a connection pool ([#15839](https://github.com/DataDog/integrations-core/pull/15839)) +***Added***: + +* Add schema collection to Postgres integration (#15484) ([#15866](https://github.com/DataDog/integrations-core/pull/15866)) + ***Fixed***: * Revert psycopg3 upgrade ([#15859](https://github.com/DataDog/integrations-core/pull/15859)) diff --git a/postgres/assets/configuration/spec.yaml b/postgres/assets/configuration/spec.yaml index d4b43dbcf3cfd..d76a22a355168 100644 --- a/postgres/assets/configuration/spec.yaml +++ b/postgres/assets/configuration/spec.yaml @@ -502,6 +502,40 @@ files: type: number example: 600 + - name: collect_schemas + description: | + Enable collection of database schemas. In order to collect schemas from all user databases, + enable `database_autodiscovery`. To collect from a single database, set `dbname` to collect + the schema for that database. + Relation metrics must be enabled for schema collection. + options: + - name: enabled + description: | + Enable collection of database schemas. Requires `dbm: true` and relation metrics must be enabled. + value: + type: boolean + example: false + - name: max_tables + description: | + Maximum amount of tables the Agent collects from the instance. + value: + type: number + example: 1000 + display_default: 1000 + - name: max_columns + description: | + Maximum amount of columns the Agent collects per table. + value: + type: number + example: 50 + display_default: 50 + - name: collection_interval + description: | + The database schema collection interval (in seconds). + value: + type: number + example: 600 + - name: aws description: | This block defines the configuration for AWS RDS and Aurora instances. diff --git a/postgres/datadog_checks/postgres/config.py b/postgres/datadog_checks/postgres/config.py index ef8e51b38cdcf..577093659f2d0 100644 --- a/postgres/datadog_checks/postgres/config.py +++ b/postgres/datadog_checks/postgres/config.py @@ -98,6 +98,12 @@ def __init__(self, instance): self.pg_stat_activity_view = instance.get('pg_stat_activity_view', 'pg_stat_activity') self.statement_samples_config = instance.get('query_samples', instance.get('statement_samples', {})) or {} self.settings_metadata_config = instance.get('collect_settings', {}) or {} + self.schemas_metadata_config = instance.get('collect_schemas', {"enabled": False}) + if not self.relations and self.schemas_metadata_config['enabled']: + raise ConfigurationError( + 'In order to collect schemas on this database, you must enable relation metrics collection.' + ) + self.resources_metadata_config = instance.get('collect_resources', {}) or {} self.statement_activity_config = instance.get('query_activity', {}) or {} self.statement_metrics_config = instance.get('query_metrics', {}) or {} diff --git a/postgres/datadog_checks/postgres/config_models/instance.py b/postgres/datadog_checks/postgres/config_models/instance.py index 52951f82f3a4a..8adc4f098be7d 100644 --- a/postgres/datadog_checks/postgres/config_models/instance.py +++ b/postgres/datadog_checks/postgres/config_models/instance.py @@ -38,6 +38,17 @@ class Azure(BaseModel): fully_qualified_domain_name: Optional[str] = None +class CollectSchemas(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + collection_interval: Optional[float] = None + enabled: Optional[bool] = None + max_columns: Optional[float] = None + max_tables: Optional[float] = None + + class CollectSettings(BaseModel): model_config = ConfigDict( arbitrary_types_allowed=True, @@ -163,6 +174,7 @@ class InstanceConfig(BaseModel): collect_database_size_metrics: Optional[bool] = None collect_default_database: Optional[bool] = None collect_function_metrics: Optional[bool] = None + collect_schemas: Optional[CollectSchemas] = None collect_settings: Optional[CollectSettings] = None collect_wal_metrics: Optional[bool] = None custom_queries: Optional[tuple[MappingProxyType[str, Any], ...]] = None diff --git a/postgres/datadog_checks/postgres/data/conf.yaml.example b/postgres/datadog_checks/postgres/data/conf.yaml.example index 8004b842eed3d..487805d27ee6f 100644 --- a/postgres/datadog_checks/postgres/data/conf.yaml.example +++ b/postgres/datadog_checks/postgres/data/conf.yaml.example @@ -404,6 +404,33 @@ instances: # # collection_interval: 600 + ## Enable collection of database schemas. In order to collect schemas from all user databases, + ## enable `database_autodiscovery`. To collect from a single database, set `dbname` to collect + ## the schema for that database. + ## Relation metrics must be enabled for schema collection. + # + # collect_schemas: + + ## @param enabled - boolean - optional - default: false + ## Enable collection of database schemas. Requires `dbm: true` and relation metrics must be enabled. + # + # enabled: false + + ## @param max_tables - number - optional - default: 1000 + ## Maximum amount of tables the Agent collects from the instance. + # + # max_tables: 1000 + + ## @param max_columns - number - optional - default: 50 + ## Maximum amount of columns the Agent collects per table. + # + # max_columns: 50 + + ## @param collection_interval - number - optional - default: 600 + ## The database schema collection interval (in seconds). + # + # collection_interval: 600 + ## This block defines the configuration for AWS RDS and Aurora instances. ## ## Complete this section if you have installed the Datadog AWS Integration diff --git a/postgres/datadog_checks/postgres/explain_parameterized_queries.py b/postgres/datadog_checks/postgres/explain_parameterized_queries.py index 0a366931a56fe..b3d335510ef74 100644 --- a/postgres/datadog_checks/postgres/explain_parameterized_queries.py +++ b/postgres/datadog_checks/postgres/explain_parameterized_queries.py @@ -3,6 +3,7 @@ # Licensed under a 3-clause BSD style license (see LICENSE) import logging +import re import psycopg2 @@ -169,3 +170,11 @@ def _execute_query_and_fetch_rows(self, dbname, query): with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(query) return cursor.fetchall() + + def _is_parameterized_query(self, statement: str) -> bool: + # Use regex to match $1 to determine if a query is parameterized + # BUT single quoted string '$1' should not be considered as a parameter + # e.g. SELECT * FROM products WHERE id = $1; -- $1 is a parameter + # e.g. SELECT * FROM products WHERE id = '$1'; -- '$1' is not a parameter + parameterized_query_pattern = r"(? 0 + AND NOT attisdropped; +""" + +PARTITION_KEY_QUERY = """ +SELECT relname, + pg_get_partkeydef(oid) AS partition_key +FROM pg_class +WHERE '{parent}' = relname; +""" + +NUM_PARTITIONS_QUERY = """ +SELECT count(inhrelid :: regclass) AS num_partitions +FROM pg_inherits +WHERE inhparent = {parent_oid}; +""" + +PARTITION_ACTIVITY_QUERY = """ +SELECT pi.inhparent :: regclass AS parent_table_name, + SUM(psu.seq_scan + psu.idx_scan) AS total_activity +FROM pg_catalog.pg_stat_user_tables psu + join pg_class pc + ON psu.relname = pc.relname + join pg_inherits pi + ON pi.inhrelid = pc.oid +WHERE pi.inhparent = {parent_oid} +GROUP BY pi.inhparent; +""" + def agent_check_getter(self): return self._check @@ -42,6 +164,10 @@ def __init__(self, check, config, shutdown_callback): self.pg_settings_collection_interval = config.settings_metadata_config.get( 'collection_interval', DEFAULT_SETTINGS_COLLECTION_INTERVAL ) + self.schemas_collection_interval = config.schemas_metadata_config.get( + 'collection_interval', DEFAULT_SCHEMAS_COLLECTION_INTERVAL + ) + collection_interval = config.resources_metadata_config.get( 'collection_interval', DEFAULT_RESOURCES_COLLECTION_INTERVAL ) @@ -62,9 +188,12 @@ def __init__(self, check, config, shutdown_callback): ) self._check = check self._config = config + self.db_pool = self._check.db_pool self._collect_pg_settings_enabled = is_affirmative(config.settings_metadata_config.get('enabled', False)) + self._collect_schemas_enabled = is_affirmative(config.schemas_metadata_config.get('enabled', False)) self._pg_settings_cached = None self._time_since_last_settings_query = 0 + self._time_since_last_schemas_query = 0 self._conn_ttl_ms = self._config.idle_connection_timeout self._tags_no_db = None self.tags = None @@ -108,6 +237,233 @@ def report_postgres_metadata(self): } self._check.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding)) + elapsed_s_schemas = time.time() - self._time_since_last_schemas_query + if elapsed_s_schemas >= self.schemas_collection_interval and self._collect_schemas_enabled: + metadata = self._collect_schema_info() + event = { + "host": self._check.resolved_hostname, + "agent_version": datadog_agent.get_version(), + "dbms": "postgres", + "kind": "pg_databases", + "collection_interval": self.schemas_collection_interval, + "dbms_version": self._payload_pg_version(), + "tags": self._tags_no_db, + "timestamp": time.time() * 1000, + "metadata": metadata, + "cloud_metadata": self._config.cloud_metadata, + } + json_event = json.dumps(event, default=default_json_event_encoding) + self._log.debug("Reporting the following payload for schema collection: {}".format(json_event)) + self._check.database_monitoring_metadata(json_event) + + def _payload_pg_version(self): + version = self._check.version + if not version: + return "" + return 'v{major}.{minor}.{patch}'.format(major=version.major, minor=version.minor, patch=version.patch) + + def _collect_schema_info(self): + databases = [] + if self._check.autodiscovery: + databases = self._check.autodiscovery.get_items() + else: + databases.append(self._config.dbname) + + metadata = [] + for database in databases: + metadata.append(self._collect_metadata_for_database(database)) + + self._time_since_last_schemas_query = time.time() + return metadata + + def _query_database_information( + self, cursor: psycopg2.extensions.cursor, dbname: str + ) -> Dict[str, Union[str, int]]: + """ + Collect database info. Returns + description: str + name: str + id: str + encoding: str + owner: str + """ + cursor.execute(DATABASE_INFORMATION_QUERY.format(dbname=dbname)) + row = cursor.fetchone() + return row + + def _query_schema_information(self, cursor: psycopg2.extensions.cursor, dbname: str) -> Dict[str, str]: + """ + Collect user schemas. Returns + id: str + name: str + owner: str + """ + cursor.execute(SCHEMA_QUERY) + rows = cursor.fetchall() + schemas = [] + for row in rows: + schemas.append({"id": str(row['id']), "name": row['name'], "owner": row['owner']}) + print(row['name']) + return schemas + + def _get_table_info(self, cursor, dbname, schema_id): + """ + Tables will be sorted by the number of total accesses (index_rel_scans + seq_scans) and truncated to + the max_tables limit. + + If any tables are partitioned, only the master paritition table name will be returned, and none of its children. + """ + limit = self._config.schemas_metadata_config.get('max_tables', 1000) + if self._config.relations: + if VersionUtils.transform_version(str(self._check.version))['version.major'] == "9": + cursor.execute(PG_TABLES_QUERY_V9.format(schema_oid=schema_id)) + else: + cursor.execute(PG_TABLES_QUERY_V10_PLUS.format(schema_oid=schema_id)) + rows = cursor.fetchall() + table_info = [dict(row) for row in rows] + table_info = self._filter_tables_with_no_relation_metrics(dbname, table_info) + return self._sort_and_limit_table_info(cursor, dbname, table_info, limit) + + else: + # Config error should catch the case where schema collection is enabled + # and relation metrics aren't, but adding a warning here just in case + self._check.log.warning("Relation metrics are not configured for {dbname}, so tables cannot be collected") + + def _filter_tables_with_no_relation_metrics( + self, dbname, table_info: List[Dict[str, Union[str, bool]]] + ) -> List[Dict[str, Union[str, bool]]]: + filtered_table_list = [] + cache = self._check.metrics_cache.table_activity_metrics + for table in table_info: + if table['name'] in cache[dbname].keys(): + filtered_table_list.append(table) + # partitioned tables will not have metrics recorded under the name of the partitioned table, + # so for now we always report them + elif table['has_partitions']: + filtered_table_list.append(table) + return filtered_table_list + + def _sort_and_limit_table_info( + self, cursor, dbname, table_info: List[Dict[str, Union[str, bool]]], limit: int + ) -> List[Dict[str, Union[str, bool]]]: + def sort_tables(info): + cache = self._check.metrics_cache.table_activity_metrics + # partition master tables won't get any metrics reported on them, + # so we have to grab the total partition activity + # note: partitions don't exist in V9, so we have to check this first + if ( + VersionUtils.transform_version(str(self._check.version))['version.major'] == "9" + or not info["has_partitions"] + ): + return ( + cache[dbname][info['name']]['postgresql.index_scans'] + + cache[dbname][info['name']]['postgresql.seq_scans'] + ) + else: + # get activity + cursor.execute(PARTITION_ACTIVITY_QUERY.format(parent_oid=info['id'])) + row = cursor.fetchone() + return row['total_activity'] + + # if relation metrics are enabled, sorted based on last activity information + table_info = sorted(table_info, key=sort_tables, reverse=True) + return table_info[:limit] + + def _query_table_information_for_schema( + self, cursor: psycopg2.extensions.cursor, schema_id: str, dbname: str + ) -> List[Dict[str, Union[str, Dict]]]: + """ + Collect table information per schema. Returns a list of dictionaries + with key/values: + "id": str + "name": str + "owner": str + "foreign_keys": dict (if has foreign keys) + name: str + definition: str + "indexes": dict (if has indexes) + name: str + definition: str + "columns": dict + name: str + data_type: str + default: str + nullable: bool + "toast_table": str (if associated toast table exists) + "partition_key": str (if has partitions) + "num_partitions": int (if has partitions) + """ + tables_info = self._get_table_info(cursor, dbname, schema_id) + table_payloads = [] + for table in tables_info: + this_payload = {} + name = table['name'] + table_id = table['id'] + this_payload.update({'id': str(table['id'])}) + this_payload.update({'name': name}) + if table["hasindexes"]: + cursor.execute(PG_INDEXES_QUERY.format(tablename=name)) + rows = cursor.fetchall() + idxs = [dict(row) for row in rows] + this_payload.update({'indexes': idxs}) + + if VersionUtils.transform_version(str(self._check.version))['version.major'] != "9": + if table['has_partitions']: + cursor.execute(PARTITION_KEY_QUERY.format(parent=name)) + row = cursor.fetchone() + this_payload.update({'partition_key': row['partition_key']}) + + cursor.execute(NUM_PARTITIONS_QUERY.format(parent_oid=table_id)) + row = cursor.fetchone() + this_payload.update({'num_partitions': row['num_partitions']}) + + if table['toast_table'] is not None: + this_payload.update({'toast_table': table['toast_table']}) + + # Get foreign keys + cursor.execute(PG_CHECK_FOR_FOREIGN_KEY.format(oid=table_id)) + row = cursor.fetchone() + if row['count'] > 0: + cursor.execute(PG_CONSTRAINTS_QUERY.format(oid=table_id)) + rows = cursor.fetchall() + if rows: + fks = [dict(row) for row in rows] + this_payload.update({'foreign_keys': fks}) + + # Get columns + cursor.execute(COLUMNS_QUERY.format(oid=table_id)) + rows = cursor.fetchall()[:] + max_columns = self._config.schemas_metadata_config.get('max_columns', 50) + columns = [dict(row) for row in rows][:max_columns] + this_payload.update({'columns': columns}) + + table_payloads.append(this_payload) + + return table_payloads + + def _collect_metadata_for_database(self, dbname): + metadata = {} + with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: + database_info = self._query_database_information(cursor, dbname) + metadata.update( + { + "description": database_info['description'], + "name": database_info['name'], + "id": str(database_info['id']), + "encoding": database_info['encoding'], + "owner": database_info['owner'], + "schemas": [], + } + ) + schema_info = self._query_schema_information(cursor, dbname) + for schema in schema_info: + tables_info = self._query_table_information_for_schema(cursor, schema['id'], dbname) + schema.update({"tables": tables_info}) + metadata['schemas'].append(schema) + + return metadata + @tracked_method(agent_check_getter=agent_check_getter) def _collect_postgres_settings(self): with self._check._get_main_db().cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: diff --git a/postgres/datadog_checks/postgres/metrics_cache.py b/postgres/datadog_checks/postgres/metrics_cache.py index 736361062a421..617a79b47f85e 100644 --- a/postgres/datadog_checks/postgres/metrics_cache.py +++ b/postgres/datadog_checks/postgres/metrics_cache.py @@ -44,6 +44,8 @@ def __init__(self, config): self.replication_stats_metrics = None self.activity_metrics = None self._count_metrics = None + if self.config.relations: + self.table_activity_metrics = {} def clean_state(self): self.instance_metrics = None @@ -52,6 +54,8 @@ def clean_state(self): self.replication_metrics = None self.replication_stats_metrics = None self.activity_metrics = None + if self.config.relations: + self.table_activity_metrics = {} def get_instance_metrics(self, version): """ diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index c57796d2385a2..2fe7717bc5f56 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -490,10 +490,32 @@ def _query_scope(self, cursor, scope, instance_tags, is_custom_metrics, dbname=N name, submit_metric = scope['metrics'][column] submit_metric(self, name, value, tags=set(tags), hostname=self.resolved_hostname) + # if relation-level metrics idx_scan or seq_scan, cache it + if name in ('postgresql.index_scans', 'postgresql.seq_scans'): + self._cache_table_activity(dbname, desc_map['table'], name, value) + num_results += 1 return num_results + def _cache_table_activity( + self, + dbname: str, + tablename: str, + metric_name: str, + value: int, + ): + db = dbname if self.autodiscovery else self._config.dbname + if db not in self.metrics_cache.table_activity_metrics.keys(): + self.metrics_cache.table_activity_metrics[db] = {} + if tablename not in self.metrics_cache.table_activity_metrics[db].keys(): + self.metrics_cache.table_activity_metrics[db][tablename] = { + 'postgresql.index_scans': 0, + 'postgresql.seq_scans': 0, + } + + self.metrics_cache.table_activity_metrics[db][tablename][metric_name] = value + def _collect_relations_autodiscovery(self, instance_tags, relations_scopes): if not self.autodiscovery: return diff --git a/postgres/datadog_checks/postgres/statement_samples.py b/postgres/datadog_checks/postgres/statement_samples.py index 45b7bc3c93b52..aee24d5e88132 100644 --- a/postgres/datadog_checks/postgres/statement_samples.py +++ b/postgres/datadog_checks/postgres/statement_samples.py @@ -345,6 +345,8 @@ def _filter_and_normalize_statement_rows(self, rows): 'backend_type', 'client backend' ) == 'client backend': continue + if row['client_addr']: + row['client_addr'] = str(row['client_addr']) query = row['query'] if query == '': insufficient_privilege_count += 1 @@ -644,21 +646,27 @@ def _run_explain_safe(self, dbname, statement, obfuscated_statement, query_signa return cached_error_response try: + # if the statement is a parameteredzied query, then we can't explain it directly + # we should directly jump into self._explain_parameterized_queries.explain_statement + # instead of trying to explain it then failing + if self._explain_parameterized_queries._is_parameterized_query(statement): + if is_affirmative(self._config.statement_samples_config.get('explain_parameterized_queries', True)): + plan = self._explain_parameterized_queries.explain_statement( + dbname, statement, obfuscated_statement + ) + if plan: + return plan, DBExplainError.explained_with_prepared_statement, None + e = psycopg2.errors.UndefinedParameter("Unable to explain parameterized query") + self._log.debug( + "Unable to collect execution plan, clients using the extended query protocol or prepared statements" + " can't be explained due to the separation of the parsed query and raw bind parameters: %s", + repr(e), + ) + error_response = None, DBExplainError.parameterized_query, '{}'.format(type(e)) + self._explain_errors_cache[query_signature] = error_response + self._emit_run_explain_error(dbname, DBExplainError.parameterized_query, e) + return error_response return self._run_explain(dbname, statement, obfuscated_statement), None, None - except psycopg2.errors.UndefinedParameter as e: - self._log.debug( - "Unable to collect execution plan, clients using the extended query protocol or prepared statements" - " can't be explained due to the separation of the parsed query and raw bind parameters: %s", - repr(e), - ) - if is_affirmative(self._config.statement_samples_config.get('explain_parameterized_queries', True)): - plan = self._explain_parameterized_queries.explain_statement(dbname, statement, obfuscated_statement) - if plan: - return plan, DBExplainError.explained_with_prepared_statement, None - error_response = None, DBExplainError.parameterized_query, '{}'.format(type(e)) - self._explain_errors_cache[query_signature] = error_response - self._emit_run_explain_error(dbname, DBExplainError.parameterized_query, e) - return error_response except psycopg2.errors.UndefinedTable as e: self._log.debug("Failed to collect execution plan: %s", repr(e)) error_response = None, DBExplainError.undefined_table, '{}'.format(type(e)) @@ -735,7 +743,7 @@ def _collect_plan_for_statement(self, row): "cloud_metadata": self._config.cloud_metadata, "network": { "client": { - "ip": row.get('client_addr', None), + "ip": str(row.get('client_addr', None)), "port": row.get('client_port', None), "hostname": row.get('client_hostname', None), } diff --git a/postgres/tests/test_explain_parameterized_queries.py b/postgres/tests/test_explain_parameterized_queries.py index 06a152f6a3b9b..0aae68386ecc3 100644 --- a/postgres/tests/test_explain_parameterized_queries.py +++ b/postgres/tests/test_explain_parameterized_queries.py @@ -89,3 +89,24 @@ def test_explain_parameterized_queries_generic_params(integration_check, dbm_ins assert expected_generic_values == explain_param_queries._get_number_of_parameters_for_prepared_statement( DB_NAME, query_signature ) + + +@pytest.mark.parametrize( + "query,statement_is_parameterized_query", + [ + ("SELECT * FROM products WHERE id = $1", True), + ("SELECT * FROM products WHERE id = '$1'", False), + ("SELECT * FROM products WHERE id = $1 AND name = $2", True), + ("SELECT * FROM products WHERE id = $1 AND name = '$2'", True), + ("SELECT * FROM products WHERE id = $1 AND name = $2 AND price = 3", True), + ("SELECT * FROM products WHERE id = $1 AND name = $2 AND price = '3'", True), + ("SELECT * FROM products WHERE id = $1 AND name = $2 AND price = '$3'", True), + ], +) +def test_explain_parameterized_queries_is_parameterized_query( + integration_check, dbm_instance, query, statement_is_parameterized_query +): + check = integration_check(dbm_instance) + check._connect() + explain_param_queries = check.statement_samples._explain_parameterized_queries + assert statement_is_parameterized_query == explain_param_queries._is_parameterized_query(query) diff --git a/postgres/tests/test_metadata.py b/postgres/tests/test_metadata.py index 64d8be56067f2..82dba24624f71 100644 --- a/postgres/tests/test_metadata.py +++ b/postgres/tests/test_metadata.py @@ -2,11 +2,15 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) from concurrent.futures.thread import ThreadPoolExecutor +from typing import List import pytest from datadog_checks.base.utils.db.utils import DBMAsyncJob +from .common import POSTGRES_VERSION +from .utils import run_one_check + pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] @@ -39,3 +43,76 @@ def test_collect_metadata(integration_check, dbm_instance, aggregator): assert event['dbms'] == "postgres" assert event['kind'] == "pg_settings" assert len(event["metadata"]) > 0 + + +def test_collect_schemas(integration_check, dbm_instance, aggregator): + dbm_instance["collect_schemas"] = {'enabled': True, 'collection_interval': 0.5} + dbm_instance['relations'] = [{'relation_regex': ".*"}] + dbm_instance["database_autodiscovery"] = {"enabled": True, "include": ["datadog"]} + del dbm_instance['dbname'] + check = integration_check(dbm_instance) + run_one_check(check, dbm_instance) + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + schema_event = next(e for e in dbm_metadata if e['kind'] == 'pg_databases') + + # there should only be one database, datadog_test + database_metadata = schema_event['metadata'] + assert len(database_metadata) == 1 + assert 'datadog_test' == database_metadata[0]['name'] + + # there should only two schemas, 'public' and 'datadog'. datadog is empty + schema_names = [s['name'] for s in database_metadata[0]['schemas']] + assert 'public' in schema_names + assert 'datadog' in schema_names + schema_public = None + for schema in database_metadata[0]['schemas']: + if schema['name'] == 'public': + schema_public = schema + + # check that all expected tables are present + tables_set = {'persons', "personsdup1", "personsdup2", "pgtable", "pg_newtable", "cities"} + # if version isn't 9 or 10, check that partition master is in tables + if float(POSTGRES_VERSION) >= 11: + tables_set.update({'test_part'}) + tables_not_reported_set = {'test_part1', 'test_part2'} + + tables_got = [] + for table in schema_public['tables']: + tables_got.append(table['name']) + + # make some assertions on fields + if table['name'] == "persons": + # check that foreign keys, indexes get reported + keys = list(table.keys()) + assert_fields(keys, ["foreign_keys", "columns", "toast_table", "id", "name"]) + assert_fields(list(table['foreign_keys'][0].keys()), ['name', 'definition']) + assert_fields( + list(table['columns'][0].keys()), + [ + 'name', + 'nullable', + 'data_type', + 'default', + ], + ) + if table['name'] == "cities": + keys = list(table.keys()) + assert_fields(keys, ["indexes", "columns", "toast_table", "id", "name"]) + assert_fields(list(table['indexes'][0].keys()), ['name', 'definition']) + if float(POSTGRES_VERSION) >= 11: + if table['name'] == 'test_part': + keys = list(table.keys()) + assert_fields(keys, ["num_partitions", "partition_key"]) + + assert_fields(tables_got, tables_set) + assert_not_fields(tables_got, tables_not_reported_set) + + +def assert_fields(keys: List[str], fields: List[str]): + for field in fields: + assert field in keys + + +def assert_not_fields(keys: List[str], fields: List[str]): + for field in fields: + assert field not in keys diff --git a/postgres/tests/test_pg_integration.py b/postgres/tests/test_pg_integration.py index 872df74a73ea0..244c3d5e35cf2 100644 --- a/postgres/tests/test_pg_integration.py +++ b/postgres/tests/test_pg_integration.py @@ -486,7 +486,9 @@ def test_wal_stats(aggregator, integration_check, pg_instance): ) # We should have at least one full page write assert_metric_at_least(aggregator, 'postgresql.wal.bytes', tags=expected_tags, count=1, lower_bound=wal_bytes + 100) - aggregator.assert_metric('postgresql.wal.full_page_images', tags=expected_tags, count=1) + assert_metric_at_least( + aggregator, 'postgresql.wal.full_page_images', tags=expected_tags, count=1, lower_bound=wal_fpi + 1 + ) def test_query_timeout(integration_check, pg_instance): diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 2d7e5b5aaf261..b4da8c37e4985 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -25,6 +25,7 @@ ) from datadog_checks.postgres.statements import PG_STAT_STATEMENTS_METRICS_COLUMNS, PG_STAT_STATEMENTS_TIMING_COLUMNS from datadog_checks.postgres.util import payload_pg_version +from datadog_checks.postgres.version_utils import V12 from .common import DB_NAME, HOST, PORT, PORT_REPLICA2, POSTGRES_VERSION from .utils import _get_conn, _get_superconn, requires_over_10, run_one_check @@ -946,6 +947,10 @@ def test_activity_snapshot_collection( expected_keys, expected_conn_out, ): + if POSTGRES_VERSION.split('.')[0] == "9" and pg_stat_activity_view == "pg_stat_activity": + # cannot catch any queries from other users + # only can see own queries + return dbm_instance['pg_stat_activity_view'] = pg_stat_activity_view # No need for query metrics here dbm_instance['query_metrics']['enabled'] = False @@ -1282,6 +1287,41 @@ def test_statement_run_explain_errors( ) +@pytest.mark.parametrize( + "query,expected_explain_err_code,expected_err", + [ + ( + "select * from pg_settings where name = $1", + DBExplainError.explained_with_prepared_statement, + None, + ), + ], +) +def test_statement_run_explain_parameterized_queries( + integration_check, + dbm_instance, + query, + expected_explain_err_code, + expected_err, +): + dbm_instance['query_activity']['enabled'] = False + dbm_instance['query_metrics']['enabled'] = False + dbm_instance['query_samples']['explain_parameterized_queries'] = True + check = integration_check(dbm_instance) + check._connect() + + check.check(dbm_instance) + if check.version < V12: + return + + run_one_check(check, dbm_instance) + _, explain_err_code, err = check.statement_samples._run_and_track_explain("datadog_test", query, query, query) + run_one_check(check, dbm_instance) + + assert explain_err_code == expected_explain_err_code + assert err == expected_err + + @pytest.mark.parametrize("dbstrict", [True, False]) def test_statement_samples_dbstrict(aggregator, integration_check, dbm_instance, dbstrict): dbm_instance['query_activity']['enabled'] = False