From a1b3d39e54db5a85e0674ddaba5bc68554ca0f78 Mon Sep 17 00:00:00 2001 From: John Davis Date: Wed, 1 May 2024 17:30:21 -0400 Subject: [PATCH] Add history pruner + test Add __init__.py to test.unit.data.model.db --- .../model/scripts/history_table_pruner.py | 191 ++++++++++++++++++ test/unit/data/model/db/__init__.py | 0 .../model/db/test_history_table_pruner.py | 167 +++++++++++++++ 3 files changed, 358 insertions(+) create mode 100644 lib/galaxy/model/scripts/history_table_pruner.py create mode 100644 test/unit/data/model/db/__init__.py create mode 100644 test/unit/data/model/db/test_history_table_pruner.py diff --git a/lib/galaxy/model/scripts/history_table_pruner.py b/lib/galaxy/model/scripts/history_table_pruner.py new file mode 100644 index 000000000000..329d3f8bb7de --- /dev/null +++ b/lib/galaxy/model/scripts/history_table_pruner.py @@ -0,0 +1,191 @@ +import datetime +import logging + +from sqlalchemy import text + +TMP_TABLE = "tmp_unused_history" + +ASSOC_TABLES = ( + "event", + "history_audit", + "history_tag_association", + "history_annotation_association", + "history_rating_association", + "history_user_share_association", + "default_history_permissions", + "data_manager_history_association", + "cleanup_event_history_association", + "galaxy_session_to_history", +) + +EXCLUDED_ASSOC_TABLES = ( + "job_import_history_archive", + "job_export_history_archive", + "workflow_invocation", + "history_dataset_collection_association", + "job", + "history_dataset_association", +) + +DEFAULT_BATCH_SIZE = 1000 + +logging.basicConfig() +log = logging.getLogger(__name__) + + +class HistoryTablePruner: + """Removes unused histories (user is null, hid == 1).""" + + def __init__(self, engine, batch_size=None, max_create_time=None): + self.engine = engine + self.batch_size = batch_size or DEFAULT_BATCH_SIZE + self.max_create_time = max_create_time or self._get_default_max_create_time() + self.min_id, self.max_id = self._get_min_max_ids() + + def run(self): + """ + Due to the very large size of some tables, we run operations in batches, using low/high history id as boundaries. + """ + if self.min_id is None: + logging.info("No histories exist") + return + + low = self.min_id + high = min(self.max_id, low + self.batch_size) + while low <= self.max_id: + self._run_batch(low, high) + low = high + high = high + self.batch_size + + def _get_default_max_create_time(self): + """By default, do not delete histories created less than a month ago.""" + today = datetime.date.today() + return today.replace(month=today.month - 1) + + def _run_batch(self, low, high): + self._mark_histories_as_deleted_and_purged(low, high) + histories = self._get_histories(low, high) + exclude = self._get_histories_to_exclude(low, high) + + # Calculate set of histories to delete. + to_delete = set(histories) - exclude + if not to_delete: + logging.info(f"No histories to delete in the id range {low}-{high}") + return + + self._create_tmp_table() + try: + self._populate_tmp_table(to_delete) + self._delete_associations() + self._set_references_to_null() + self._delete_histories(low, high) + except Exception as e: + raise e + finally: + self._drop_tmp_table() + + def _get_min_max_ids(self): + stmt = text( + "SELECT min(id), max(id) FROM history WHERE user_id IS NULL AND hid_counter = 1 AND create_time < :create_time" + ) + params = {"create_time": self.max_create_time} + with self.engine.begin() as conn: + minmax = conn.execute(stmt, params).all() + return minmax[0][0], minmax[0][1] + + def _mark_histories_as_deleted_and_purged(self, low, high): + """Mark target histories as deleted and purged to prevent their further usage.""" + logging.info(f"Marking histories {low}-{high} as deleted and purged") + stmt = text( + """ + UPDATE history + SET deleted = TRUE, purged = TRUE + WHERE user_id IS NULL AND hid_counter = 1 AND create_time < :create_time AND id >= :low AND id < :high + """ + ) + params = self._get_stmt_params(low, high) + with self.engine.begin() as conn: + return conn.execute(stmt, params) + + def _get_histories(self, low, high): + """Return ids of histories to delete.""" + logging.info(f"Collecting history ids between {low}-{high}") + stmt = text( + "SELECT id FROM history WHERE user_id IS NULL AND hid_counter = 1 AND create_time < :create_time AND id >= :low AND id < :high" + ) + params = self._get_stmt_params(low, high) + with self.engine.begin() as conn: + return conn.scalars(stmt, params).all() + + def _get_histories_to_exclude(self, low, high): + """Retrieve histories that should NOT be deleted due to existence of associated records that should be preserved.""" + logging.info(f"Collecting ids of histories to exclude based on {len(EXCLUDED_ASSOC_TABLES)} associated tables:") + statements = [] + for table in EXCLUDED_ASSOC_TABLES: + statements.append((table, text(f"SELECT history_id FROM {table} WHERE history_id >= :low AND id < :high"))) + + params = self._get_stmt_params(low, high) + ids = [] + for table, stmt in statements: + with self.engine.begin() as conn: + logging.info(f"\tCollecting history_id from {table}") + ids += conn.scalars(stmt, params).all() + + excluded = set(ids) + if None in excluded: + excluded.remove(None) + return excluded + + def _create_tmp_table(self): + """Create temporary table to hold history ids.""" + stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _drop_tmp_table(self): + stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)") + stmt = text(f"DROP TABLE {TMP_TABLE}") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _populate_tmp_table(self, to_delete): + """Load ids of histories to delete into temporary table.""" + assert to_delete + logging.info("Populating temporary table") + sql_values = ",".join([f"({id})" for id in to_delete]) + stmt = text(f"INSERT INTO {TMP_TABLE} VALUES {sql_values}") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _delete_associations(self): + """Delete records associated with histories to be deleted.""" + logging.info("Deleting associated records from ...") + + for table in ASSOC_TABLES: + stmt = text(f"DELETE FROM {table} WHERE history_id IN (SELECT id FROM {TMP_TABLE})") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _set_references_to_null(self): + """Set history_id to null in galaxy_session table for records referring to histories to be deleted.""" + logging.info("Set history_id to null in galaxy_session") + stmt = text( + f"UPDATE galaxy_session SET current_history_id = NULL WHERE current_history_id IN (SELECT id FROM {TMP_TABLE})" + ) + with self.engine.begin() as conn: + conn.execute(stmt) + + def _delete_histories(self, low, high): + """Last step: delete histories that are safe to delete.""" + logging.info(f"Delete histories in the id range {low} - {high}") + stmt = text(f"DELETE FROM history WHERE id IN (SELECT id FROM {TMP_TABLE})") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _get_stmt_params(self, low, high): + params = { + "create_time": self.max_create_time, + "low": low, + "high": high, + } + return params diff --git a/test/unit/data/model/db/__init__.py b/test/unit/data/model/db/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/test/unit/data/model/db/test_history_table_pruner.py b/test/unit/data/model/db/test_history_table_pruner.py new file mode 100644 index 000000000000..376b0c52939c --- /dev/null +++ b/test/unit/data/model/db/test_history_table_pruner.py @@ -0,0 +1,167 @@ +import datetime + +import pytest +from sqlalchemy import ( + func, + select, + text, +) + +from galaxy import model as m +from galaxy.model.scripts.history_table_pruner import HistoryTablePruner + + +@pytest.fixture() +def setup_db( + db_url, + session, + make_user, + make_history, + make_event, + make_history_tag_association, + make_history_annotation_association, + make_history_rating_association, + make_history_user_share_association, + make_default_history_permissions, + make_data_manager_history_association, + make_cleanup_event_history_association, + make_galaxy_session_to_history_association, + make_job_import_history_archive, + make_job_export_history_archive, + make_workflow_invocation, + make_history_dataset_collection_association, + make_job, + make_history_dataset_association, + make_galaxy_session, +): + # 1. Create 100 histories; make them deletable: user = null, hid_counter = 1. + histories = [] + for id in range(100): + h = make_history(id=id) + h.user = None + h.hid_counter = 1 + histories.append(h) + + # 2. Set 10 histories as not deletable: hid_counter != 1. + for i in range(10): + histories[i].hid_counter = 42 + + # 3. Set next 10 histories as not deletable: user not null. + u = make_user() + for i in range(10, 20): + histories[i].user = u + + # 4. For the next 6 histories create associations that cannot be deleted. + make_job_import_history_archive(history=histories[20]) + make_job_export_history_archive(history=histories[21]) + make_workflow_invocation(history=histories[22]) + make_history_dataset_collection_association(history=histories[23]) + make_history_dataset_association(history=histories[25]) + make_job().history = histories[24] + + # 5. For the next 10 histories create associations that can be deleted. + make_event(history=histories[26]) + make_history_tag_association(history=histories[27]) + make_history_annotation_association(history=histories[28]) + make_history_rating_association(item=histories[29]) + make_history_user_share_association(history=histories[30]) + make_default_history_permissions(history=histories[31]) + make_data_manager_history_association(history=histories[32]) + make_cleanup_event_history_association(history_id=histories[33].id) + make_galaxy_session_to_history_association(history=histories[34]) + # HistoryAudit is not instantiable, so created association manually. + stmt = text("insert into history_audit values(:history_id, :update_time)") + params = {"history_id": histories[35].id, "update_time": datetime.date.today()} + session.execute(stmt, params) + + # 6. Create a galaxy_session record referring to a history. + # This cannot be deleted, but the history reference can be set to null. + make_galaxy_session(current_history=histories[36]) + + session.commit() + + # TOTAL counts of loaded histories: + # histories that should NOT be deleted: 10 + 10 + 6 = 26 + # histories that SHOULD be deleted: 100 - 26 = 74 + + +def test_run(setup_db, session, db_url, engine): + + def verify_counts(model, expected): + assert session.scalar(select(func.count()).select_from(model)) == expected + + # 1. Verify history counts + stmt = select(m.History).order_by(m.History.id) + result = session.scalars(stmt).all() + assert len(result) == 100 + for i, h in enumerate(result): + if i < 10: # first 10 + assert h.hid_counter > 1 + assert h.user is None + elif i < 20: # next 10 + assert h.hid_counter == 1 + assert h.user is not None + else: # the rest + assert h.hid_counter == 1 + assert h.user is None + + # 2. Verify association counts + for model in [ + m.JobImportHistoryArchive, + m.JobExportHistoryArchive, + m.WorkflowInvocation, + m.HistoryDatasetCollectionAssociation, + m.Job, + m.HistoryDatasetAssociation, + m.Event, + m.HistoryTagAssociation, + m.HistoryAnnotationAssociation, + m.HistoryRatingAssociation, + m.HistoryUserShareAssociation, + m.DefaultHistoryPermissions, + m.DataManagerHistoryAssociation, + m.CleanupEventHistoryAssociation, + m.GalaxySessionToHistoryAssociation, + m.HistoryAudit, + ]: + verify_counts(model, 1) + verify_counts( + m.GalaxySession, 2 + ) # one extra session was automatically created for GalaxySessionToHistoryAssociation + + # 3. Run pruning script + today = datetime.date.today() + newdate = today.replace(year=today.year + 1) + HistoryTablePruner(engine, max_create_time=newdate).run() + + # 4 Verify new counts (for details on expected counts see comments in setup_db) + + # 4.1 Verify new history counts + verify_counts(m.History, 26) + + # 4.2 Verify new association counts: no change (these associations should NOT be deleted) + for model in [ + m.JobImportHistoryArchive, + m.JobExportHistoryArchive, + m.WorkflowInvocation, + m.HistoryDatasetCollectionAssociation, + m.Job, + m.HistoryDatasetAssociation, + ]: + verify_counts(model, 1) + verify_counts(m.GalaxySession, 2) + + # 4.3 Verify new association counts: deleted (these associations SHOULD be deleted) + for model in [ + m.Event, + m.HistoryTagAssociation, + m.HistoryAnnotationAssociation, + m.HistoryRatingAssociation, + m.HistoryUserShareAssociation, + m.DefaultHistoryPermissions, + m.DataManagerHistoryAssociation, + m.CleanupEventHistoryAssociation, + m.GalaxySessionToHistoryAssociation, + m.HistoryAudit, + ]: + verify_counts(model, 0)