diff --git a/postgres/CHANGELOG.md b/postgres/CHANGELOG.md index 710df677e4a24..563e5a0a50abb 100644 --- a/postgres/CHANGELOG.md +++ b/postgres/CHANGELOG.md @@ -4,6 +4,7 @@ ***Added***: +* Add schema collection to Postgres integration ([#15484](https://github.com/DataDog/integrations-core/pull/15484)) * Add support for sending `database_instance` metadata ([#15559](https://github.com/DataDog/integrations-core/pull/15559)) * Update dependencies for Agent 7.48 ([#15585](https://github.com/DataDog/integrations-core/pull/15585)) diff --git a/postgres/assets/configuration/spec.yaml b/postgres/assets/configuration/spec.yaml index c4eca579f2000..0bc78044b5474 100644 --- a/postgres/assets/configuration/spec.yaml +++ b/postgres/assets/configuration/spec.yaml @@ -513,6 +513,40 @@ files: type: number example: 600 + - name: collect_schemas + description: | + Enable collection of database schemas. In order to collect schemas from all user databases, + enable `database_autodiscovery`. To collect from a single database, set `dbname` to collect + the schema for that database. + Relation metrics must be enabled for schema collection. + options: + - name: enabled + description: | + Enable collection of database schemas. Requires `dbm: true` and relation metrics must be enabled. + value: + type: boolean + example: false + - name: max_tables + description: | + Maximum amount of tables the Agent collects from the instance. + value: + type: number + example: 1000 + display_default: 1000 + - name: max_columns + description: | + Maximum amount of columns the Agent collects per table. + value: + type: number + example: 50 + display_default: 50 + - name: collection_interval + description: | + The database schema collection interval (in seconds). + value: + type: number + example: 600 + - name: aws description: | This block defines the configuration for AWS RDS and Aurora instances. diff --git a/postgres/datadog_checks/postgres/config.py b/postgres/datadog_checks/postgres/config.py index 7ace701a6c9d8..cfb2b7ba94f45 100644 --- a/postgres/datadog_checks/postgres/config.py +++ b/postgres/datadog_checks/postgres/config.py @@ -60,7 +60,7 @@ def __init__(self, instance): self.max_connections = instance.get('max_connections', 30) self.tags = self._build_tags(instance.get('tags', [])) - ssl = instance.get('ssl', False) + ssl = instance.get('ssl', "false") if ssl in SSL_MODES: self.ssl_mode = ssl else: @@ -100,6 +100,12 @@ def __init__(self, instance): self.pg_stat_activity_view = instance.get('pg_stat_activity_view', 'pg_stat_activity') self.statement_samples_config = instance.get('query_samples', instance.get('statement_samples', {})) or {} self.settings_metadata_config = instance.get('collect_settings', {}) or {} + self.schemas_metadata_config = instance.get('collect_schemas', {"enabled": False}) + if not self.relations and self.schemas_metadata_config['enabled']: + raise ConfigurationError( + 'In order to collect schemas on this database, you must enable relation metrics collection.' + ) + self.resources_metadata_config = instance.get('collect_resources', {}) or {} self.statement_activity_config = instance.get('query_activity', {}) or {} self.statement_metrics_config = instance.get('query_metrics', {}) or {} diff --git a/postgres/datadog_checks/postgres/config_models/instance.py b/postgres/datadog_checks/postgres/config_models/instance.py index ae115b3489de3..4816713240c5d 100644 --- a/postgres/datadog_checks/postgres/config_models/instance.py +++ b/postgres/datadog_checks/postgres/config_models/instance.py @@ -38,6 +38,17 @@ class Azure(BaseModel): fully_qualified_domain_name: Optional[str] = None +class CollectSchemas(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + collection_interval: Optional[float] = None + enabled: Optional[bool] = None + max_columns: Optional[float] = None + max_tables: Optional[float] = None + + class CollectSettings(BaseModel): model_config = ConfigDict( arbitrary_types_allowed=True, @@ -154,6 +165,7 @@ class InstanceConfig(BaseModel): collect_database_size_metrics: Optional[bool] = None collect_default_database: Optional[bool] = None collect_function_metrics: Optional[bool] = None + collect_schemas: Optional[CollectSchemas] = None collect_settings: Optional[CollectSettings] = None collect_wal_metrics: Optional[bool] = None custom_queries: Optional[tuple[MappingProxyType[str, Any], ...]] = None diff --git a/postgres/datadog_checks/postgres/data/conf.yaml.example b/postgres/datadog_checks/postgres/data/conf.yaml.example index c43a6360e64b1..25ccfdb3cba83 100644 --- a/postgres/datadog_checks/postgres/data/conf.yaml.example +++ b/postgres/datadog_checks/postgres/data/conf.yaml.example @@ -406,6 +406,33 @@ instances: # # collection_interval: 600 + ## Enable collection of database schemas. In order to collect schemas from all user databases, + ## enable `database_autodiscovery`. To collect from a single database, set `dbname` to collect + ## the schema for that database. + ## Relation metrics must be enabled for schema collection. + # + # collect_schemas: + + ## @param enabled - boolean - optional - default: false + ## Enable collection of database schemas. Requires `dbm: true` and relation metrics must be enabled. + # + # enabled: false + + ## @param max_tables - number - optional - default: 1000 + ## Maximum amount of tables the Agent collects from the instance. + # + # max_tables: 1000 + + ## @param max_columns - number - optional - default: 50 + ## Maximum amount of columns the Agent collects per table. + # + # max_columns: 50 + + ## @param collection_interval - number - optional - default: 600 + ## The database schema collection interval (in seconds). + # + # collection_interval: 600 + ## This block defines the configuration for AWS RDS and Aurora instances. ## ## Complete this section if you have installed the Datadog AWS Integration diff --git a/postgres/datadog_checks/postgres/metadata.py b/postgres/datadog_checks/postgres/metadata.py index 433b9c9f615d8..0e23694a05ced 100644 --- a/postgres/datadog_checks/postgres/metadata.py +++ b/postgres/datadog_checks/postgres/metadata.py @@ -2,7 +2,7 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) import time -from typing import Dict, Optional, Tuple # noqa: F401 +from typing import Dict, List, Optional, Tuple, Union # noqa: F401 import psycopg from psycopg.rows import dict_row @@ -18,15 +18,137 @@ from datadog_checks.base.utils.tracking import tracked_method from .util import payload_pg_version +from .version_utils import VersionUtils -# default pg_settings collection interval in seconds +# default collection intervals in seconds DEFAULT_SETTINGS_COLLECTION_INTERVAL = 600 +DEFAULT_SCHEMAS_COLLECTION_INTERVAL = 600 DEFAULT_RESOURCES_COLLECTION_INTERVAL = 300 PG_SETTINGS_QUERY = """ SELECT name, setting FROM pg_settings """ +DATABASE_INFORMATION_QUERY = """ +SELECT db.oid AS id, + datname AS NAME, + pg_encoding_to_char(encoding) AS encoding, + rolname AS owner, + description +FROM pg_catalog.pg_database db + LEFT JOIN pg_catalog.pg_description dc + ON dc.objoid = db.oid + JOIN pg_roles a + ON datdba = a.oid +WHERE datname LIKE '{dbname}'; +""" + +PG_TABLES_QUERY_V10_PLUS = """ +SELECT c.oid AS id, + c.relname AS name, + c.relhasindex AS hasindexes, + c.relowner :: regrole AS owner, + ( CASE + WHEN c.relkind = 'p' THEN TRUE + ELSE FALSE + END ) AS has_partitions, + t.relname AS toast_table +FROM pg_class c + left join pg_class t + ON c.reltoastrelid = t.oid +WHERE c.relkind IN ( 'r', 'p' ) + AND c.relispartition != 't' + AND c.relnamespace = {schema_oid}; +""" + +PG_TABLES_QUERY_V9 = """ +SELECT c.oid AS id, + c.relname AS name, + c.relhasindex AS hasindexes, + c.relowner :: regrole AS owner, + t.relname AS toast_table +FROM pg_class c + left join pg_class t + ON c.reltoastrelid = t.oid +WHERE c.relkind IN ( 'r' ) + AND c.relnamespace = {schema_oid}; +""" + + +SCHEMA_QUERY = """ +SELECT nsp.oid AS id, + nspname AS name, + nspowner :: regrole AS owner +FROM pg_namespace nsp + LEFT JOIN pg_roles r on nsp.nspowner = r.oid +WHERE nspname NOT IN ( 'information_schema', 'pg_catalog' ) + AND nspname NOT LIKE 'pg_toast%' + AND nspname NOT LIKE 'pg_temp_%' + AND r.rolname != 'rds_superuser' + AND r.rolname != 'rdsadmin'; +""" + +PG_INDEXES_QUERY = """ +SELECT indexname AS NAME, + indexdef AS definition +FROM pg_indexes +WHERE tablename LIKE '{tablename}'; +""" + +PG_CHECK_FOR_FOREIGN_KEY = """ +SELECT count(conname) +FROM pg_constraint +WHERE contype = 'f' + AND conrelid = {oid}; +""" + +PG_CONSTRAINTS_QUERY = """ +SELECT conname AS name, + pg_get_constraintdef(oid) AS definition +FROM pg_constraint +WHERE contype = 'f' + AND conrelid = {oid}; +""" + +COLUMNS_QUERY = """ +SELECT attname AS name, + Format_type(atttypid, atttypmod) AS data_type, + NOT attnotnull AS nullable, + pg_get_expr(adbin, adrelid) AS default +FROM pg_attribute + LEFT JOIN pg_attrdef ad + ON adrelid = attrelid + AND adnum = attnum +WHERE attrelid = {oid} + AND attnum > 0 + AND NOT attisdropped; +""" + +PARTITION_KEY_QUERY = """ +SELECT relname, + pg_get_partkeydef(oid) AS partition_key +FROM pg_class +WHERE '{parent}' = relname; +""" + +NUM_PARTITIONS_QUERY = """ +SELECT count(inhrelid :: regclass) AS num_partitions +FROM pg_inherits +WHERE inhparent = {parent_oid}; +""" + +PARTITION_ACTIVITY_QUERY = """ +SELECT pi.inhparent :: regclass AS parent_table_name, + SUM(psu.seq_scan + psu.idx_scan) AS total_activity +FROM pg_catalog.pg_stat_user_tables psu + join pg_class pc + ON psu.relname = pc.relname + join pg_inherits pi + ON pi.inhrelid = pc.oid +WHERE pi.inhparent = {parent_oid} +GROUP BY pi.inhparent; +""" + def agent_check_getter(self): return self._check @@ -43,6 +165,10 @@ def __init__(self, check, config): self.pg_settings_collection_interval = config.settings_metadata_config.get( 'collection_interval', DEFAULT_SETTINGS_COLLECTION_INTERVAL ) + self.schemas_collection_interval = config.schemas_metadata_config.get( + 'collection_interval', DEFAULT_SCHEMAS_COLLECTION_INTERVAL + ) + collection_interval = config.resources_metadata_config.get( 'collection_interval', DEFAULT_RESOURCES_COLLECTION_INTERVAL ) @@ -62,9 +188,12 @@ def __init__(self, check, config): ) self._check = check self._config = config + self.db_pool = self._check.db_pool self._collect_pg_settings_enabled = is_affirmative(config.settings_metadata_config.get('enabled', False)) + self._collect_schemas_enabled = is_affirmative(config.schemas_metadata_config.get('enabled', False)) self._pg_settings_cached = None self._time_since_last_settings_query = 0 + self._time_since_last_schemas_query = 0 self._conn_ttl_ms = self._config.idle_connection_timeout self._tags_no_db = None self.tags = None @@ -108,6 +237,231 @@ def report_postgres_metadata(self): } self._check.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding)) + elapsed_s_schemas = time.time() - self._time_since_last_schemas_query + if elapsed_s_schemas >= self.schemas_collection_interval and self._collect_schemas_enabled: + metadata = self._collect_schema_info() + event = { + "host": self._check.resolved_hostname, + "agent_version": datadog_agent.get_version(), + "dbms": "postgres", + "kind": "pg_databases", + "collection_interval": self.schemas_collection_interval, + "dbms_version": self._payload_pg_version(), + "tags": self._tags_no_db, + "timestamp": time.time() * 1000, + "metadata": metadata, + "cloud_metadata": self._config.cloud_metadata, + } + json_event = json.dumps(event, default=default_json_event_encoding) + self._log.debug("Reporting the following payload for schema collection: {}".format(json_event)) + self._check.database_monitoring_metadata(json_event) + + def _payload_pg_version(self): + version = self._check.version + if not version: + return "" + return 'v{major}.{minor}.{patch}'.format(major=version.major, minor=version.minor, patch=version.patch) + + def _collect_schema_info(self): + databases = [] + if self._check.autodiscovery: + databases = self._check.autodiscovery.get_items() + else: + databases.append(self._config.dbname) + + metadata = [] + for database in databases: + metadata.append(self._collect_metadata_for_database(database)) + + self._time_since_last_schemas_query = time.time() + return metadata + + def _query_database_information(self, cursor: psycopg.cursor, dbname: str) -> Dict[str, Union[str, int]]: + """ + Collect database info. Returns + description: str + name: str + id: str + encoding: str + owner: str + """ + cursor.execute(DATABASE_INFORMATION_QUERY.format(dbname=dbname)) + row = cursor.fetchone() + return row + + def _query_schema_information(self, cursor: psycopg.cursor, dbname: str) -> Dict[str, str]: + """ + Collect user schemas. Returns + id: str + name: str + owner: str + """ + cursor.execute(SCHEMA_QUERY) + rows = cursor.fetchall() + schemas = [] + for row in rows: + schemas.append({"id": str(row['id']), "name": row['name'], "owner": row['owner']}) + print(row['name']) + return schemas + + def _get_table_info(self, cursor, dbname, schema_id): + """ + Tables will be sorted by the number of total accesses (index_rel_scans + seq_scans) and truncated to + the max_tables limit. + + If any tables are partitioned, only the master paritition table name will be returned, and none of its children. + """ + limit = self._config.schemas_metadata_config.get('max_tables', 1000) + if self._config.relations: + if VersionUtils.transform_version(str(self._check._version))['version.major'] == "9": + cursor.execute(PG_TABLES_QUERY_V9.format(schema_oid=schema_id)) + else: + cursor.execute(PG_TABLES_QUERY_V10_PLUS.format(schema_oid=schema_id)) + rows = cursor.fetchall() + table_info = [dict(row) for row in rows] + table_info = self._filter_tables_with_no_relation_metrics(dbname, table_info) + return self._sort_and_limit_table_info(cursor, dbname, table_info, limit) + + else: + # Config error should catch the case where schema collection is enabled + # and relation metrics aren't, but adding a warning here just in case + self._check.log.warning("Relation metrics are not configured for {dbname}, so tables cannot be collected") + + def _filter_tables_with_no_relation_metrics( + self, dbname, table_info: List[Dict[str, Union[str, bool]]] + ) -> List[Dict[str, Union[str, bool]]]: + filtered_table_list = [] + cache = self._check.metrics_cache.table_activity_metrics + for table in table_info: + if table['name'] in cache[dbname].keys(): + filtered_table_list.append(table) + # partitioned tables will not have metrics recorded under the name of the partitioned table, + # so for now we always report them + elif table['has_partitions']: + filtered_table_list.append(table) + return filtered_table_list + + def _sort_and_limit_table_info( + self, cursor, dbname, table_info: List[Dict[str, Union[str, bool]]], limit: int + ) -> List[Dict[str, Union[str, bool]]]: + def sort_tables(info): + cache = self._check.metrics_cache.table_activity_metrics + # partition master tables won't get any metrics reported on them, + # so we have to grab the total partition activity + # note: partitions don't exist in V9, so we have to check this first + if ( + VersionUtils.transform_version(str(self._check._version))['version.major'] == "9" + or not info["has_partitions"] + ): + return ( + cache[dbname][info['name']]['postgresql.index_scans'] + + cache[dbname][info['name']]['postgresql.seq_scans'] + ) + else: + # get activity + cursor.execute(PARTITION_ACTIVITY_QUERY.format(parent_oid=info['id'])) + row = cursor.fetchone() + return row['total_activity'] + + # if relation metrics are enabled, sorted based on last activity information + table_info = sorted(table_info, key=sort_tables, reverse=True) + return table_info[:limit] + + def _query_table_information_for_schema( + self, cursor: psycopg.cursor, schema_id: str, dbname: str + ) -> List[Dict[str, Union[str, Dict]]]: + """ + Collect table information per schema. Returns a list of dictionaries + with key/values: + "id": str + "name": str + "owner": str + "foreign_keys": dict (if has foreign keys) + name: str + definition: str + "indexes": dict (if has indexes) + name: str + definition: str + "columns": dict + name: str + data_type: str + default: str + nullable: bool + "toast_table": str (if associated toast table exists) + "partition_key": str (if has partitions) + "num_partitions": int (if has partitions) + """ + tables_info = self._get_table_info(cursor, dbname, schema_id) + table_payloads = [] + for table in tables_info: + this_payload = {} + name = table['name'] + table_id = table['id'] + this_payload.update({'id': str(table['id'])}) + this_payload.update({'name': name}) + if table["hasindexes"]: + cursor.execute(PG_INDEXES_QUERY.format(tablename=name)) + rows = cursor.fetchall() + idxs = [dict(row) for row in rows] + this_payload.update({'indexes': idxs}) + + if VersionUtils.transform_version(str(self._check._version))['version.major'] != "9": + if table['has_partitions']: + cursor.execute(PARTITION_KEY_QUERY.format(parent=name)) + row = cursor.fetchone() + this_payload.update({'partition_key': row['partition_key']}) + + cursor.execute(NUM_PARTITIONS_QUERY.format(parent_oid=table_id)) + row = cursor.fetchone() + this_payload.update({'num_partitions': row['num_partitions']}) + + if table['toast_table'] is not None: + this_payload.update({'toast_table': table['toast_table']}) + + # Get foreign keys + cursor.execute(PG_CHECK_FOR_FOREIGN_KEY.format(oid=table_id)) + row = cursor.fetchone() + if row['count'] > 0: + cursor.execute(PG_CONSTRAINTS_QUERY.format(oid=table_id)) + rows = cursor.fetchall() + if rows: + fks = [dict(row) for row in rows] + this_payload.update({'foreign_keys': fks}) + + # Get columns + cursor.execute(COLUMNS_QUERY.format(oid=table_id)) + rows = cursor.fetchall()[:] + max_columns = self._config.schemas_metadata_config.get('max_columns', 50) + columns = [dict(row) for row in rows][:max_columns] + this_payload.update({'columns': columns}) + + table_payloads.append(this_payload) + + return table_payloads + + def _collect_metadata_for_database(self, dbname): + metadata = {} + with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn: + with conn.cursor(row_factory=dict_row) as cursor: + database_info = self._query_database_information(cursor, dbname) + metadata.update( + { + "description": database_info['description'], + "name": database_info['name'], + "id": str(database_info['id']), + "encoding": database_info['encoding'], + "owner": database_info['owner'], + "schemas": [], + } + ) + schema_info = self._query_schema_information(cursor, dbname) + for schema in schema_info: + tables_info = self._query_table_information_for_schema(cursor, schema['id'], dbname) + schema.update({"tables": tables_info}) + metadata['schemas'].append(schema) + + return metadata + @tracked_method(agent_check_getter=agent_check_getter) def _collect_postgres_settings(self): with self._check.get_main_db().cursor(row_factory=dict_row) as cursor: diff --git a/postgres/datadog_checks/postgres/metrics_cache.py b/postgres/datadog_checks/postgres/metrics_cache.py index edd3d4f09a4d6..ea32f546d22b9 100644 --- a/postgres/datadog_checks/postgres/metrics_cache.py +++ b/postgres/datadog_checks/postgres/metrics_cache.py @@ -44,6 +44,8 @@ def __init__(self, config): self.replication_stats_metrics = None self.activity_metrics = None self._count_metrics = None + if self.config.relations: + self.table_activity_metrics = {} def clean_state(self): self.instance_metrics = None @@ -52,6 +54,8 @@ def clean_state(self): self.replication_metrics = None self.replication_stats_metrics = None self.activity_metrics = None + if self.config.relations: + self.table_activity_metrics = {} def get_instance_metrics(self, version): """ diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index ffe49caeee5a2..a76f4a9eaee2d 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -512,10 +512,32 @@ def _query_scope(self, cursor, scope, instance_tags, is_custom_metrics, dbname=N name, submit_metric = scope['metrics'][column] submit_metric(self, name, value, tags=set(tags), hostname=self.resolved_hostname) + # if relation-level metrics idx_scan or seq_scan, cache it + if name in ('postgresql.index_scans', 'postgresql.seq_scans'): + self._cache_table_activity(dbname, desc_map['table'], name, value) + num_results += 1 return num_results + def _cache_table_activity( + self, + dbname: str, + tablename: str, + metric_name: str, + value: int, + ): + db = dbname if self.autodiscovery else self._config.dbname + if db not in self.metrics_cache.table_activity_metrics.keys(): + self.metrics_cache.table_activity_metrics[db] = {} + if tablename not in self.metrics_cache.table_activity_metrics[db].keys(): + self.metrics_cache.table_activity_metrics[db][tablename] = { + 'postgresql.index_scans': 0, + 'postgresql.seq_scans': 0, + } + + self.metrics_cache.table_activity_metrics[db][tablename][metric_name] = value + def _collect_relations_autodiscovery(self, instance_tags, relations_scopes): if not self.autodiscovery: return diff --git a/postgres/datadog_checks/postgres/statement_samples.py b/postgres/datadog_checks/postgres/statement_samples.py index 968333f157d98..f507a0877af41 100644 --- a/postgres/datadog_checks/postgres/statement_samples.py +++ b/postgres/datadog_checks/postgres/statement_samples.py @@ -344,6 +344,8 @@ def _filter_and_normalize_statement_rows(self, rows): 'backend_type', 'client backend' ) == 'client backend': continue + if row['client_addr']: + row['client_addr'] = str(row['client_addr']) query = row['query'] if query == '': insufficient_privilege_count += 1 @@ -736,7 +738,7 @@ def _collect_plan_for_statement(self, row): "cloud_metadata": self._config.cloud_metadata, "network": { "client": { - "ip": row.get('client_addr', None), + "ip": str(row.get('client_addr', None)), "port": row.get('client_port', None), "hostname": row.get('client_hostname', None), } diff --git a/postgres/tests/common.py b/postgres/tests/common.py index 3bc1dfbbc6028..ccbe7cce86590 100644 --- a/postgres/tests/common.py +++ b/postgres/tests/common.py @@ -148,13 +148,13 @@ def check_common_metrics(aggregator, expected_tags, count=1): def check_db_count(aggregator, expected_tags, count=1): - table_count = 5 + table_count = 6 # We create 2 additional partition tables when partition is available if float(POSTGRES_VERSION) >= 11.0: - table_count = 7 + table_count = 8 # And PG >= 14 will also report the parent table if float(POSTGRES_VERSION) >= 14.0: - table_count = 8 + table_count = 9 aggregator.assert_metric( 'postgresql.table.count', value=table_count, diff --git a/postgres/tests/compose/resources/03_load_data.sh b/postgres/tests/compose/resources/03_load_data.sh index d74d85f37cd8c..c7eb5f8786b2f 100755 --- a/postgres/tests/compose/resources/03_load_data.sh +++ b/postgres/tests/compose/resources/03_load_data.sh @@ -1,8 +1,10 @@ #!/bin/bash set -e -psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" datadog_test <<-EOSQL - CREATE TABLE persons (personid SERIAL, lastname VARCHAR(255), firstname VARCHAR(255), address VARCHAR(255), city VARCHAR(255)); +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" datadog_test <<-EOSQL + CREATE TABLE cities (city VARCHAR(255), country VARCHAR(255), PRIMARY KEY(city)); + INSERT INTO cities VALUES ('New York', 'USA'), ('Beautiful city of lights', 'France'); + CREATE TABLE persons (personid SERIAL, lastname VARCHAR(255), firstname VARCHAR(255), address VARCHAR(255), city VARCHAR(255) DEFAULT 'New York', CONSTRAINT fk_city FOREIGN KEY (city) REFERENCES cities(city)); INSERT INTO persons (lastname, firstname, address, city) VALUES ('Cavaille', 'Leo', 'Midtown', 'New York'), ('Someveryveryveryveryveryveryveryveryveryverylongname', 'something', 'Avenue des Champs Elysees', 'Beautiful city of lights'); CREATE TABLE personsdup1 (personid SERIAL, lastname VARCHAR(255), firstname VARCHAR(255), address VARCHAR(255), city VARCHAR(255)); INSERT INTO personsdup1 (lastname, firstname, address, city) VALUES ('Cavaille', 'Leo', 'Midtown', 'New York'), ('Someveryveryveryveryveryveryveryveryveryverylongname', 'something', 'Avenue des Champs Elysees', 'Beautiful city of lights'); diff --git a/postgres/tests/test_metadata.py b/postgres/tests/test_metadata.py index 64d8be56067f2..82dba24624f71 100644 --- a/postgres/tests/test_metadata.py +++ b/postgres/tests/test_metadata.py @@ -2,11 +2,15 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) from concurrent.futures.thread import ThreadPoolExecutor +from typing import List import pytest from datadog_checks.base.utils.db.utils import DBMAsyncJob +from .common import POSTGRES_VERSION +from .utils import run_one_check + pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] @@ -39,3 +43,76 @@ def test_collect_metadata(integration_check, dbm_instance, aggregator): assert event['dbms'] == "postgres" assert event['kind'] == "pg_settings" assert len(event["metadata"]) > 0 + + +def test_collect_schemas(integration_check, dbm_instance, aggregator): + dbm_instance["collect_schemas"] = {'enabled': True, 'collection_interval': 0.5} + dbm_instance['relations'] = [{'relation_regex': ".*"}] + dbm_instance["database_autodiscovery"] = {"enabled": True, "include": ["datadog"]} + del dbm_instance['dbname'] + check = integration_check(dbm_instance) + run_one_check(check, dbm_instance) + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + schema_event = next(e for e in dbm_metadata if e['kind'] == 'pg_databases') + + # there should only be one database, datadog_test + database_metadata = schema_event['metadata'] + assert len(database_metadata) == 1 + assert 'datadog_test' == database_metadata[0]['name'] + + # there should only two schemas, 'public' and 'datadog'. datadog is empty + schema_names = [s['name'] for s in database_metadata[0]['schemas']] + assert 'public' in schema_names + assert 'datadog' in schema_names + schema_public = None + for schema in database_metadata[0]['schemas']: + if schema['name'] == 'public': + schema_public = schema + + # check that all expected tables are present + tables_set = {'persons', "personsdup1", "personsdup2", "pgtable", "pg_newtable", "cities"} + # if version isn't 9 or 10, check that partition master is in tables + if float(POSTGRES_VERSION) >= 11: + tables_set.update({'test_part'}) + tables_not_reported_set = {'test_part1', 'test_part2'} + + tables_got = [] + for table in schema_public['tables']: + tables_got.append(table['name']) + + # make some assertions on fields + if table['name'] == "persons": + # check that foreign keys, indexes get reported + keys = list(table.keys()) + assert_fields(keys, ["foreign_keys", "columns", "toast_table", "id", "name"]) + assert_fields(list(table['foreign_keys'][0].keys()), ['name', 'definition']) + assert_fields( + list(table['columns'][0].keys()), + [ + 'name', + 'nullable', + 'data_type', + 'default', + ], + ) + if table['name'] == "cities": + keys = list(table.keys()) + assert_fields(keys, ["indexes", "columns", "toast_table", "id", "name"]) + assert_fields(list(table['indexes'][0].keys()), ['name', 'definition']) + if float(POSTGRES_VERSION) >= 11: + if table['name'] == 'test_part': + keys = list(table.keys()) + assert_fields(keys, ["num_partitions", "partition_key"]) + + assert_fields(tables_got, tables_set) + assert_not_fields(tables_got, tables_not_reported_set) + + +def assert_fields(keys: List[str], fields: List[str]): + for field in fields: + assert field in keys + + +def assert_not_fields(keys: List[str], fields: List[str]): + for field in fields: + assert field not in keys diff --git a/postgres/tests/test_pg_integration.py b/postgres/tests/test_pg_integration.py index 775890bc278ea..2df477d4e65c7 100644 --- a/postgres/tests/test_pg_integration.py +++ b/postgres/tests/test_pg_integration.py @@ -39,7 +39,7 @@ check_wal_receiver_metrics, requires_static_version, ) -from .utils import _get_conn, _get_superconn, requires_over_10, requires_over_14 +from .utils import _get_conn, _get_superconn, requires_over_10, requires_over_14, run_one_check CONNECTION_METRICS = ['postgresql.max_connections', 'postgresql.percent_usage_connections'] @@ -482,7 +482,9 @@ def test_wal_stats(aggregator, integration_check, pg_instance): ) # We should have at least one full page write assert_metric_at_least(aggregator, 'postgresql.wal.bytes', tags=expected_tags, count=1, lower_bound=wal_bytes + 100) - aggregator.assert_metric('postgresql.wal.full_page_images', tags=expected_tags, count=1, value=wal_fpi + 1) + assert_metric_at_least( + aggregator, 'postgresql.wal.full_page_images', tags=expected_tags, count=1, lower_bound=wal_fpi + 1 + ) def test_query_timeout(integration_check, pg_instance): @@ -611,6 +613,7 @@ def test_correct_hostname(dbm_enabled, reported_hostname, expected_hostname, agg ) +# @pytest.mark.skip(reason='debugging flaky test (2023--03)') @pytest.mark.parametrize( 'dbm_enabled, reported_hostname', [ @@ -622,14 +625,14 @@ def test_correct_hostname(dbm_enabled, reported_hostname, expected_hostname, agg ) @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') -def test_database_instance_metadata(aggregator, dd_run_check, pg_instance, dbm_enabled, reported_hostname): +def test_database_instance_metadata(aggregator, pg_instance, dbm_enabled, reported_hostname): pg_instance['dbm'] = dbm_enabled if reported_hostname: pg_instance['reported_hostname'] = reported_hostname expected_host = reported_hostname if reported_hostname else 'stubbed.hostname' expected_tags = pg_instance['tags'] + ['port:{}'.format(pg_instance['port'])] check = PostgreSql('test_instance', {}, [pg_instance]) - dd_run_check(check) + run_one_check(check, pg_instance) dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") event = next((e for e in dbm_metadata if e['kind'] == 'database_instance'), None) @@ -646,7 +649,7 @@ def test_database_instance_metadata(aggregator, dd_run_check, pg_instance, dbm_e # Run a second time and expect the metadata to not be emitted again because of the cache TTL aggregator.reset() - dd_run_check(check) + run_one_check(check, pg_instance) dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") event = next((e for e in dbm_metadata if e['kind'] == 'database_instance'), None)