From e8a3f4314ed3b2cba24d8a7c739cc939a5f0f989 Mon Sep 17 00:00:00 2001 From: edengorevoy Date: Fri, 14 Jul 2023 21:25:58 +0000 Subject: [PATCH] first commit --- postgres/datadog_checks/postgres/config.py | 1 + postgres/datadog_checks/postgres/metadata.py | 182 ++++++++++++++++++- postgres/tests/test_metadata.py | 12 ++ 3 files changed, 194 insertions(+), 1 deletion(-) diff --git a/postgres/datadog_checks/postgres/config.py b/postgres/datadog_checks/postgres/config.py index 57eec971df07d..d1b88fd4ac233 100644 --- a/postgres/datadog_checks/postgres/config.py +++ b/postgres/datadog_checks/postgres/config.py @@ -100,6 +100,7 @@ def __init__(self, instance): self.pg_stat_activity_view = instance.get('pg_stat_activity_view', 'pg_stat_activity') self.statement_samples_config = instance.get('query_samples', instance.get('statement_samples', {})) or {} self.settings_metadata_config = instance.get('collect_settings', {}) or {} + self.schemas_metadata_config = instance.get('collect_schemas', {}) or {} self.resources_metadata_config = instance.get('collect_resources', {}) or {} self.statement_activity_config = instance.get('query_activity', {}) or {} self.statement_metrics_config = instance.get('query_metrics', {}) or {} diff --git a/postgres/datadog_checks/postgres/metadata.py b/postgres/datadog_checks/postgres/metadata.py index 8456526dc0aa3..ce5ce8d57164a 100644 --- a/postgres/datadog_checks/postgres/metadata.py +++ b/postgres/datadog_checks/postgres/metadata.py @@ -17,14 +17,91 @@ from datadog_checks.base.utils.tracking import tracked_method from datadog_checks.postgres.connections import MultiDatabaseConnectionPool -# default pg_settings collection interval in seconds +# default collection intervals in seconds DEFAULT_SETTINGS_COLLECTION_INTERVAL = 600 +DEFAULT_SCHEMAS_COLLECTION_INTERVAL = 600 DEFAULT_RESOURCES_COLLECTION_INTERVAL = 300 PG_SETTINGS_QUERY = """ SELECT name, setting FROM pg_settings """ +DATABASE_INFORMATION_QUERY = """ +SELECT db.oid as id, datname as name, pg_encoding_to_char(encoding) as encoding, rolname as owner, description + FROM pg_catalog.pg_database db + LEFT JOIN pg_catalog.pg_description dc ON dc.objoid = db.oid + JOIN pg_roles a on datdba = a.oid + WHERE datname LIKE '{dbname}'; +""" + + +PG_STAT_TABLES_QUERY = """ +SELECT st.relname as name,seq_scan,idx_scan,relhasindex as hasindexes,relowner::regrole as owner,relispartition as is_partition +FROM pg_stat_all_tables st +LEFT JOIN pg_class c ON c.relname = st.relname +AND relkind IN ('r', 'p') +WHERE schemaname = '{schemaname}' +ORDER BY coalesce(seq_scan, 0) + coalesce(idx_scan, 0) DESC; +""" + +PG_TABLES_QUERY = """ +SELECT tablename as name, hasindexes,relowner::regrole as owner,relispartition as is_partition, (case when relkind = 'p' then true else false end) as is_partitioned +FROM pg_tables st +LEFT JOIN pg_class c ON relname = tablename +AND relkind IN ('r', 'p') +WHERE schemaname = '{schemaname}'; +""" + + +SCHEMA_QUERY = """ + SELECT nspname as name, nspowner::regrole as owner FROM + pg_namespace + WHERE nspname not in ('information_schema', 'pg_catalog') + AND nspname NOT LIKE 'pg_toast%' and nspname NOT LIKE 'pg_temp_%'; +""" + +PG_INDEXES_QUERY = """ +SELECT indexname, indexdef +FROM pg_indexes +WHERE tablename LIKE '{tablename}'; +""" + +PG_CONSTRAINTS_QUERY = """ +SELECT conrelid::regclass AS table_name, conname AS foreign_key, contype, pg_get_constraintdef(oid) +FROM pg_constraint +WHERE contype = 'f' +AND conrelid = +'{tablename}'::regclass; +""" + +COLUMNS_QUERY = """ +SELECT attname as name, format_type(atttypid, atttypmod) AS data_type, attnotnull as not_nullable, pg_get_expr(adbin, adrelid) as default +FROM pg_attribute LEFT JOIN pg_attrdef ad ON adrelid=attrelid +WHERE attrelid = '{tablename}'::regclass +AND attnum > 0 +AND NOT attisdropped; +""" +# SELECT * +# FROM pg_attribute +# WHERE attrelid = 'apm_activity'::regclass +# AND attnum > 0 +# AND NOT attisdropped; + + +PARTITION_PARENT_AND_RANGE_QUERY = """ +SELECT parent, child, pg_get_expr(c.relpartbound, c.oid, true) as partition_range + FROM + (SELECT inhparent::regclass as parent, inhrelid::regclass as child + FROM pg_inherits + WHERE inhrelid = '{tablename}'::regclass::oid) AS inherits + LEFT JOIN pg_class c ON c.oid = child::regclass::oid; +""" + +PARTITION_KEY_QUERY = """ + SELECT relname, pg_get_partkeydef(oid) as partition_key +FROM pg_class WHERE '{parent}' = relname; +""" + def agent_check_getter(self): return self._check @@ -41,6 +118,10 @@ def __init__(self, check, config, shutdown_callback): self.pg_settings_collection_interval = config.settings_metadata_config.get( 'collection_interval', DEFAULT_SETTINGS_COLLECTION_INTERVAL ) + self.schemas_collection_interval = config.schemas_metadata_config.get( + 'collection_interval', DEFAULT_SETTINGS_COLLECTION_INTERVAL + ) + collection_interval = config.resources_metadata_config.get( 'collection_interval', DEFAULT_RESOURCES_COLLECTION_INTERVAL ) @@ -67,8 +148,10 @@ def shutdown_cb(): self._check = check self._config = config self._collect_pg_settings_enabled = is_affirmative(config.settings_metadata_config.get('enabled', False)) + self._collect_schemas_enabled = is_affirmative(config.schemas_metadata_config.get('enabled', False)) self._pg_settings_cached = None self._time_since_last_settings_query = 0 + self._time_since_last_schemas_query = 0 self._conn_ttl_ms = self._config.idle_connection_timeout self._tags_no_db = None self.tags = None @@ -112,11 +195,108 @@ def report_postgres_metadata(self): } self._check.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding)) + elapsed_s = time.time() - self._time_since_last_schemas_query + if elapsed_s >= self.schemas_collection_interval: + self._collect_schema_info() + def _payload_pg_version(self): version = self._check.version if not version: return "" return 'v{major}.{minor}.{patch}'.format(major=version.major, minor=version.minor, patch=version.patch) + + def _collect_schema_info(self): + databases = [] + if self._check.autodiscovery: + databases = self._check.autodiscovery.get_items() + elif self._config.dbname != 'postgres': + databases.append(self._config.dbname) + else: + # if we are only connecting to 'postgres' database, not worth reporting data model + return + + for database in databases: + self._collect_info_for_database(database) + + + def _collect_info_for_database(self, dbname): + with self._conn_pool.get_connection(dbname, ttl_ms=self._conn_ttl_ms) as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: + # collect database info + cursor.execute(DATABASE_INFORMATION_QUERY.format(dbname=dbname)) + rows = cursor.fetchall() + postgres_logical_database = [dict(row) for row in rows] + + self._log.warning(postgres_logical_database) + ## Collect user schemas. Returns + # name: str + # owner: str + cursor.execute(SCHEMA_QUERY) + rows = cursor.fetchall() + schemas = [dict(row) for row in rows] + + self._log.warning(schemas) + for schema in schemas: + ## Collect tables for schema. Returns + # name: str + # hasindexes: bool + # owner: str + # is_partition: bool + cursor.execute(PG_TABLES_QUERY.format(schemaname=schema['name'])) + rows = cursor.fetchall() + tables_info = [dict(row) for row in rows] + self._log.warning(tables_info) + table_to_payloads = {} + partitioned_tables = [] + for table in tables_info: + name = table['name'] + self._log.warning("Parsing table {}".format(name)) + table_to_payloads[name] = {'name': name} + if table["hasindexes"]: + cursor.execute(PG_INDEXES_QUERY.format(tablename=name)) + rows = cursor.fetchall() + indexes = {row[0]:row[1] for row in rows} + table_to_payloads[name].update({'indexes': indexes}) + + if table['is_partition']: + cursor.execute(PARTITION_PARENT_AND_RANGE_QUERY.format(tablename=name)) + row = cursor.fetchone() + partition_of = row['parent'] + partition_range = row['partition_range'] + partitioned_tables.append(partition_of) + table_to_payloads[name].update({'partition_of': partition_of}) + table_to_payloads[name].update({'partition_range': partition_range}) + + if table['is_partitioned']: + cursor.execute(PARTITION_KEY_QUERY.format(parent=name)) + row = cursor.fetchone() + self._log.warning(row) + table_to_payloads[name].update({'partition_key':row['partition_key']}) + + # get foreign keys + cursor.execute(PG_CONSTRAINTS_QUERY.format(tablename=table['name'])) + rows = cursor.fetchall() + if rows: + table_to_payloads[name].update({'foreign_keys': {}}) + + ## Get columns + # name: str + # data_type: str + # default: str + # is_nullable: bool + cursor.execute(COLUMNS_QUERY.format(tablename=name)) + rows = cursor.fetchall() + self._log.warning(rows) + columns = [dict(row) for row in rows] + table_to_payloads[name].update({'columns': columns}) + + + self._log.warning(table_to_payloads) + + + + + @tracked_method(agent_check_getter=agent_check_getter) def _collect_postgres_settings(self): diff --git a/postgres/tests/test_metadata.py b/postgres/tests/test_metadata.py index 3058e5150a215..83ab3d4fd82aa 100644 --- a/postgres/tests/test_metadata.py +++ b/postgres/tests/test_metadata.py @@ -38,3 +38,15 @@ def test_collect_metadata(integration_check, dbm_instance, aggregator): assert event['dbms'] == "postgres" assert event['kind'] == "pg_settings" assert len(event["metadata"]) > 0 + +def test_collect_schemas(integration_check, dbm_instance, aggregator): + dbm_instance["collect_schemas"] = {'enabled': True, 'collection_interval': 0.5} + check = integration_check(dbm_instance) + check.check(dbm_instance) + assert None is not None + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + event = dbm_metadata[0] + assert event['host'] == "stubbed.hostname" + assert event['dbms'] == "postgres" + assert event['kind'] == "pg_settings" + assert len(event["metadata"]) > 0 \ No newline at end of file