diff --git a/scripts/cleanup_histories.py b/scripts/cleanup_histories.py new file mode 100644 index 000000000000..7be6fb23bbc9 --- /dev/null +++ b/scripts/cleanup_histories.py @@ -0,0 +1,203 @@ +# Identify a set of unused histories (by whichever process is fastest) +# NEW: Mark those histories as deleted and purged to render them unusable +# Delete association tables we don't care about (currently, all the delete statements in the draft script) +# Update galaxy_session (set history_id to null for affected rows) +# Delete histories + +# TODO: add step 1 on n to logs (n should be correct) + +""" +args: + - max history updated date (recommended not less than a month?) + - history batch size + +""" + +import os +import sys + +sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "lib"))) + +import logging # TODO do we need more setup? + +from sqlalchemy import text, create_engine + +from galaxy.model.orm.scripts import get_config + +TMP_TABLE = "tmp_unused_history" + +ASSOC_TABLES = ( + "event", + "history_audit", + "history_tag_association", + "history_annotation_association", + "history_rating_association", + "history_user_share_association", + "default_history_permissions", + "data_manager_history_association", + "cleanup_event_history_association", + "galaxy_session_to_history", +) + +EXCLUDED_ASSOC_TABLES = ( + "job_import_history_archive", + "job_export_history_archive", + "workflow_invocation", + "history_dataset_collection_association", + "job", + "history_dataset_association", +) + + +class HistoryTablePruner: + """ Removes unused histories (user is null, hid == 1). """ + + def __init__(self): + # TODO add option to pass min id and max id + self.max_update_time = "01-01-2025" # TODO read from args + self.batch_size = 3 # TODO read from args + self.engine = self._create_engine() + self.min_id, self.max_id = self._get_min_max_ids() + + def run(self): + """ + Due to the very large size of some tables, we run operations in batches, using low/high history id as boundaries. + """ + low = self.min_id + high = min(self.max_id, low + self.batch_size) + while low <= self.max_id: + self._run_batch(low, high) + low = high + high = high + self.batch_size + + def _run_batch(self, low, high): + self._mark_histories_as_deleted_and_purged(low, high) + histories = self._get_histories(low, high) + exclude = self._get_histories_to_exclude(low, high) + + # Calculate set of histories to delete. + to_delete = set(histories) - exclude + if not to_delete: + logging.info(f"No histories to delete in the id range {low} - {high}") + return + + self._create_tmp_table() + try: + self._populate_tmp_table(to_delete) + self._delete_associations() + self._set_references_to_null() + self._delete_histories() + except Exception as e: + raise e + finally: + self._drop_tmp_table() + + def _get_min_max_ids(self): + stmt = text(f"SELECT min(id), max(id) FROM history") + with self.engine.begin() as conn: + minmax = conn.execute(stmt).all() + return minmax[0][0], minmax[0][1] + + def _create_engine(self): + db_url = get_config(sys.argv)["db_url"] + return create_engine(db_url) + + def _mark_histories_as_deleted_and_purged(self, low, high): + """ Mark target histories as deleted and purged to prevent their further usage.""" + logging.info(f"STEP 1 OF 10: Marking histories {low}-{high} as deleted and purged") + stmt = text(f""" + UPDATE history + SET deleted = TRUE, purged = TRUE + WHERE user_id IS NULL AND hid_counter = 1 AND update_time < :update_time AND id >= :low AND id < :high + """) + params = { + "update_time": self.max_update_time, + "low": low, + "high": high, + } + with self.engine.begin() as conn: + return conn.execute(stmt, params) + + def _get_histories(self, low, high): + """ Return ids of histories to delete.""" + logging.info(f"STEP 2 OF 10: Collecting history ids between {low}-{high}") + stmt = text("SELECT id FROM history WHERE user_id IS NULL AND hid_counter = 1 AND update_time < :update_time AND id >= :low AND id < :high") + params = { + "update_time": self.max_update_time, + "low": low, + "high": high, + } + with self.engine.begin() as conn: + return conn.scalars(stmt, params).all() + + def _get_histories_to_exclude(self, low, high): + """ Retrieve histories that should NOT be deleted due to existence of associated records that should be preserved.""" + logging.info(f"STEP 3 OF 10: Collecting ids of histories to exclude based on {len(EXCLUDED_ASSOC_TABLES)} associated tables:") + statements = [] + for table in EXCLUDED_ASSOC_TABLES: + statements.append((table, text(f"SELECT history_id FROM {table} WHERE history_id >= :low AND id < :high"))) + + params = { + "low": low, + "high": high, + } + + ids = [] + with self.engine.begin() as conn: # TODO or maybe 1 transaction per table? + for table, stmt in statements: + logging.info(f"\tCollecting history_id from {table}") + ids += conn.scalars(stmt, params).all() + + excluded = set(ids) + if None in excluded: + excluded.remove(None) + return excluded + + def _create_tmp_table(self): + """ Create temporary table to hold history ids.""" + stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _drop_tmp_table(self): + stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)") + stmt = text(f"DROP TABLE {TMP_TABLE}") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _populate_tmp_table(self, to_delete): + """ Load ids of histories to delete into temporary table.""" + assert to_delete + logging.info("STEP 4 OF 10: Populating temporary table") + sql_values = ",".join([f"({id})" for id in to_delete]) + stmt = text(f"INSERT INTO {TMP_TABLE} VALUES {sql_values}") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _delete_associations(self): + """ Delete records associated with histories to be deleted.""" + logging.info("STEP 5 OF 10: Deleting associated records from ...") + + for table in ASSOC_TABLES: + stmt = text(f"DELETE FROM {table} WHERE history_id IN (SELECT id FROM {TMP_TABLE})") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _set_references_to_null(self): + """ Set history_id to null in galaxy_session table for records referring to histories to be deleted.""" + logging.info("STEP 6 OF 10: Set history_id to null in galaxy_session") + stmt = text("UPDATE galaxy_session SET current_history_id = NULL WHERE current_history_id IN (SELECT id FROM {TMP_TABLE})") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _delete_histories(self): + """ Last step: delete histories that are safe to delete.""" + logging.info("STEP 7 OF 10: Delete histories in the id range {low} - {high}") + stmt = text("DELETE FROM history WHERE id IN (SELECT id FROM {TMP_TABLE})") + with self.engine.begin() as conn: + conn.execute(stmt) + + +if __name__ == "__main__": + htp = HistoryTablePruner() + htp.run() diff --git a/scripts/cleanup_histories.sh b/scripts/cleanup_histories.sh new file mode 100755 index 000000000000..b181b9a2829a --- /dev/null +++ b/scripts/cleanup_histories.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +####### +# TODO add description +####### + + +cd "$(dirname "$0")" || exit + +cd .. + +# TODO add params +# TODO add help +python ./scripts/cleanup_histories.py +#python ./scripts/cleanup_histories.py >> ./scripts/cleanup_histories.log diff --git a/test/unit/data/db/__init__.py b/test/unit/data/db/__init__.py new file mode 100644 index 000000000000..13a615086ebe --- /dev/null +++ b/test/unit/data/db/__init__.py @@ -0,0 +1,24 @@ +from collections import ( + Counter, + namedtuple, +) + +PRIVATE_OBJECT_STORE_ID = "my_private_data" + +MockTransaction = namedtuple("MockTransaction", "user") + + +class MockObjectStore: + + def is_private(self, object): + if object.object_store_id == PRIVATE_OBJECT_STORE_ID: + return True + else: + return False + + +def verify_items(items, expected_items): + """ + Assert that items and expected_items contain the same elements. + """ + assert Counter(items) == Counter(expected_items) diff --git a/test/unit/data/db/conftest.py b/test/unit/data/db/conftest.py new file mode 100644 index 000000000000..f4a12e71bff6 --- /dev/null +++ b/test/unit/data/db/conftest.py @@ -0,0 +1,535 @@ +import contextlib +import random +import string + +import pytest +from sqlalchemy import ( + create_engine, + text, +) +from sqlalchemy.orm import Session + +from galaxy import model as m +from galaxy.datatypes.registry import Registry as DatatypesRegistry +from galaxy.model.triggers.update_audit_table import install as install_timestamp_triggers +from . import MockObjectStore + +# utility fixtures + + +@contextlib.contextmanager +def transaction(session): + if not session.in_transaction(): + with session.begin(): + yield + else: + yield + + +@pytest.fixture(scope="module") +def engine(): + db_uri = "sqlite:///:memory:" + return create_engine(db_uri) + + +@pytest.fixture(autouse=True, scope="module") +def setup(engine): + m.mapper_registry.metadata.create_all(engine) + m.Dataset.object_store = MockObjectStore() # type:ignore[assignment] + datatypes_registry = DatatypesRegistry() + datatypes_registry.load_datatypes() + m.set_datatypes_registry(datatypes_registry) + install_timestamp_triggers(engine) + + +@pytest.fixture(autouse=True) +def teardown(engine): + """Delete all rows from all tables. Called after each test.""" + yield + with engine.begin() as conn: + for table in m.mapper_registry.metadata.tables: + stmt = text(f"DELETE FROM {table}") + conn.execute(stmt) + + +@pytest.fixture +def session(engine): + engine = engine + return Session(engine) + + +@pytest.fixture +def make_random_users(session, make_user): + def f(count): + return [make_user() for _ in range(count)] + + return f + + +# utility functions + + +def random_str(): + alphabet = string.ascii_lowercase + string.digits + size = random.randint(5, 10) + return "".join(random.choices(alphabet, k=size)) + + +def random_email(): + text = random_str() + return f"{text}@galaxy.testing" + + +# model fixture factories + + +@pytest.fixture +def make_dataset_collection(session): + def f(**kwd): + dc = m.DatasetCollection(**kwd) + with transaction(session): + session.add(dc) + session.commit() + return dc + + return f + + +@pytest.fixture +def make_dataset_collection_element(session, make_hda): + def f(**kwd): + kwd["element"] = kwd.get("element", make_hda()) + dce = m.DatasetCollectionElement(**kwd) + with transaction(session): + session.add(dce) + session.commit() + return dce + + return f + + +@pytest.fixture +def make_dataset_permissions(session): + def f(**kwd): + dp = m.DatasetPermissions(**kwd) + with transaction(session): + session.add(dp) + session.commit() + return dp + + return f + + +@pytest.fixture +def make_event(session): + def f(**kwd): + e = m.Event(**kwd) + with transaction(session): + session.add(e) + session.commit() + return e + + return f + + +@pytest.fixture +def make_galaxy_session(session): + def f(**kwd): + gs = m.GalaxySession(**kwd) + with transaction(session): + session.add(gs) + session.commit() + return gs + + return f + + +@pytest.fixture +def make_history(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user", make_user()) + history = m.History(**kwd) + with transaction(session): + session.add(history) + session.commit() + return history + + return f + + +@pytest.fixture +def make_history_annotation_association(session): + def f(**kwd): + haa = m.HistoryAnnotationAssociation(**kwd) + with transaction(session): + session.add(haa) + session.commit() + return haa + + return f + + +@pytest.fixture +def make_history_tag_association(session): + def f(**kwd): + hta = m.HistoryTagAssociation(**kwd) + with transaction(session): + session.add(hta) + session.commit() + return hta + + return f + + +@pytest.fixture +def make_hda(session, make_history): + def f(**kwd): + kwd["history"] = kwd.get("history", make_history()) + hda = m.HistoryDatasetAssociation(**kwd) + with transaction(session): + session.add(hda) + session.commit() + return hda + + return f + + +@pytest.fixture +def make_hdca(session): + def f(**kwd): + hdca = m.HistoryDatasetCollectionAssociation(**kwd) + with transaction(session): + session.add(hdca) + session.commit() + return hdca + + return f + + +@pytest.fixture +def make_job(session): + def f(**kwd): + job = m.Job(**kwd) + with transaction(session): + session.add(job) + session.commit() + return job + + return f + + +@pytest.fixture +def make_ldca(session): + def f(**kwd): + ldca = m.LibraryDatasetCollectionAssociation(**kwd) + with transaction(session): + session.add(ldca) + session.commit() + return ldca + + return f + + +@pytest.fixture +def make_ldda(session): + def f(**kwd): + ldda = m.LibraryDatasetDatasetAssociation(**kwd) + with transaction(session): + session.add(ldda) + session.commit() + return ldda + + return f + + +@pytest.fixture +def make_library(session): + def f(**kwd): + lib = m.Library(**kwd) + with transaction(session): + session.add(lib) + session.commit() + return lib + + return f + + +@pytest.fixture +def make_library_folder(session): + def f(**kwd): + lib_folder = m.LibraryFolder(**kwd) + with transaction(session): + session.add(lib_folder) + session.commit() + return lib_folder + + return f + + +@pytest.fixture +def make_library_permissions(session, make_library, make_role): + def f(**kwd): + action = kwd.get("action", random_str()) + library = kwd.get("library", make_library()) + role = kwd.get("role", make_role()) + lp = m.LibraryPermissions(action, library, role) + with transaction(session): + session.add(lp) + session.commit() + return lp + + return f + + +@pytest.fixture +def make_page(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user", make_user()) + page = m.Page(**kwd) + with transaction(session): + session.add(page) + session.commit() + return page + + return f + + +@pytest.fixture +def make_role(session): + def f(**kwd): + role = m.Role(**kwd) + with transaction(session): + session.add(role) + session.commit() + return role + + return f + + +@pytest.fixture +def make_stored_workflow(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user", make_user()) + sw = m.StoredWorkflow(**kwd) + with transaction(session): + session.add(sw) + session.commit() + return sw + + return f + + +@pytest.fixture +def make_task(session, make_job): + def f(**kwd): + kwd["job"] = kwd.get("job", make_job()) + # Assumption: if the following args are needed, a test should supply them + kwd["working_directory"] = kwd.get("working_directory", random_str()) + kwd["prepare_files_cmd"] = kwd.get("prepare_files_cmd", random_str()) + task = m.Task(**kwd) + with transaction(session): + session.add(task) + session.commit() + return task + + return f + + +@pytest.fixture +def make_user(session): + def f(**kwd): + kwd["username"] = kwd.get("username", random_str()) + kwd["email"] = kwd.get("email", random_email()) + kwd["password"] = kwd.get("password", random_str()) + user = m.User(**kwd) + with transaction(session): + session.add(user) + session.commit() + return user + + return f + + +@pytest.fixture +def make_user_item_rating_association(session): + def f(assoc_class, user, item, rating): + assoc = assoc_class(user, item, rating) + with transaction(session): + session.add(assoc) + session.commit() + return assoc + + return f + + +@pytest.fixture +def make_user_role_association(session): + def f(user, role): + assoc = m.UserRoleAssociation(user, role) + with transaction(session): + session.add(assoc) + session.commit() + return assoc + + return f + + +@pytest.fixture +def make_visualization(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user", make_user()) + vis = m.Visualization(**kwd) + with transaction(session): + session.add(vis) + session.commit() + return vis + + return f + + + + + + + + +@pytest.fixture +def make_history_rating_association(session, make_user, make_history): + def f(**kwd): + kwd["user"] = kwd.get("user", make_user()) + kwd["item"] = kwd.get("history", make_history()) + model = m.HistoryRatingAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_history_user_share_association(session): + def f(**kwd): + model = m.HistoryUserShareAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_default_history_permissions(session, make_history, make_role): + def f(**kwd): + kwd["history"] = kwd.get("history", make_history()) + kwd["action"] = kwd.get("action", random_str()) + kwd["role"] = kwd.get("role", make_role()) + model = m.DefaultHistoryPermissions(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_data_manager_history_association(session): + def f(**kwd): + model = m.DataManagerHistoryAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_cleanup_event_history_association(session): + def f(**kwd): + model = m.CleanupEventHistoryAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_galaxy_session_to_history_association(session, make_history, make_galaxy_session): + def f(**kwd): + kwd["galaxy_session"] = kwd.get("galaxy_session", make_galaxy_session()) + kwd["history"] = kwd.get("history", make_history()) + model = m.GalaxySessionToHistoryAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_job_import_history_archive(session): + def f(**kwd): + model = m.JobImportHistoryArchive(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_job_export_history_archive(session): + def f(**kwd): + model = m.JobExportHistoryArchive(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_workflow(session): + def f(**kwd): + model = m.Workflow(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_workflow_invocation(session, make_workflow): + def f(**kwd): + kwd["workflow"] = kwd.get("workflow", make_workflow()) + model = m.WorkflowInvocation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_history_dataset_collection_association(session): + def f(**kwd): + model = m.HistoryDatasetCollectionAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + +@pytest.fixture +def make_history_dataset_association(session): + def f(**kwd): + model = m.HistoryDatasetAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f diff --git a/test/unit/data/db/test_histories_cleanup_script.py b/test/unit/data/db/test_histories_cleanup_script.py new file mode 100644 index 000000000000..9aace1e3f757 --- /dev/null +++ b/test/unit/data/db/test_histories_cleanup_script.py @@ -0,0 +1,132 @@ +import pytest +from sqlalchemy import select, text, func +from galaxy import model as m + +""" +steps: + 1. setup data + 2. query: verify counts + 3. call HistoryCleaner().run() + 4. query: verify new counts +""" + +@pytest.fixture() +def setup_db( + session, + make_user, + make_history, + make_event, + make_history_tag_association, + make_history_annotation_association, + make_history_rating_association, + make_history_user_share_association, + make_default_history_permissions, + make_data_manager_history_association, + make_cleanup_event_history_association, + make_galaxy_session_to_history_association, + make_job_import_history_archive, + make_job_export_history_archive, + make_workflow_invocation, + make_history_dataset_collection_association, + make_job, + make_history_dataset_association, +): + # 1. Load histories + histories = [] + u = make_user() + for id in range(100): + histories.append(make_history()) + + # 2. Set histories id:1-10 as not prunable: hid_counter != 1 + for i in range(10): + histories[i].user = None + histories[i].hid_counter = 42 + + # 3. Set histories id:11-20 as not prunable: user not null + for i in range(10, 20): + histories[i].user = u + histories[i].hid_counter = 1 + + # 4. Set histories 21-100 as prunable: hid_counter == 1, user is null + for i in range(20, 100): + histories[i].user = None + histories[i].hid_counter = 1 + + # 5. Create associations that cannot be deleted for histories id:21-26 + make_job_import_history_archive(history=histories[20]) + make_job_export_history_archive(history=histories[21]) + make_workflow_invocation(history=histories[22]) + make_history_dataset_collection_association(history=histories[23]) + make_job().history=histories[24] + make_history_dataset_association(history=histories[25]) + + # 6. Create associations to be deleted for histories id:27-36 + make_event(history=histories[26]) + make_history_tag_association(history=histories[27]) + make_history_annotation_association(history = histories[28]) + make_history_rating_association(item=histories[29]) + make_history_user_share_association(history=histories[30]) + make_default_history_permissions().history=histories[31] + make_data_manager_history_association().history=histories[32] + make_cleanup_event_history_association().history=histories[33] + make_galaxy_session_to_history_association().history=histories[34] + + # HistoryAudit is not isntantiable, so do this manually + stmt = text("insert into history_audit values(:history_id, :update_time)") + params = {"history_id": histories[35].id, "update_time": "01-01-2020"} + session.execute(stmt, params) + + session.commit() + + +def test_script(setup_db, session): + # 1. Verify history counts + stmt = select(m.History).where(m.History.id <= 100) + result = session.scalars(stmt).all() + assert len(result) == 100 + for h in result: + if h.id <= 10: + assert h.hid_counter > 1 + assert h.user is None + elif h.id <= 20: + assert h.hid_counter == 1 + assert h.user is not None + else: + assert h.hid_counter == 1 + assert h.user is None + + # 2. Verify association counts + assert session.scalar(select(func.count()).select_from(m.JobImportHistoryArchive)) == 1 + assert session.scalar(select(func.count()).select_from(m.JobExportHistoryArchive)) == 1 + assert session.scalar(select(func.count()).select_from(m.WorkflowInvocation)) == 1 + assert session.scalar(select(func.count()).select_from(m.HistoryDatasetCollectionAssociation)) == 1 + assert session.scalar(select(func.count()).select_from(m.Job)) == 1 + assert session.scalar(select(func.count()).select_from(m.HistoryDatasetAssociation)) == 1 + + assert session.scalar(select(func.count()).select_from(m.Event)) == 1 + assert session.scalar(select(func.count()).select_from(m.HistoryTagAssociation)) == 1 + assert session.scalar(select(func.count()).select_from(m.HistoryAnnotationAssociation)) == 1 + assert session.scalar(select(func.count()).select_from(m.HistoryRatingAssociation)) == 1 + assert session.scalar(select(func.count()).select_from(m.HistoryUserShareAssociation)) == 1 + assert session.scalar(select(func.count()).select_from(m.DefaultHistoryPermissions)) == 1 + assert session.scalar(select(func.count()).select_from(m.DataManagerHistoryAssociation)) == 1 + assert session.scalar(select(func.count()).select_from(m.CleanupEventHistoryAssociation)) == 1 + assert session.scalar(select(func.count()).select_from(m.GalaxySessionToHistoryAssociation)) == 1 + + # manually check HistoryAudit + stmt = text("SELECT count() FROM history_audit WHERE history_id <= 100") + history_audit_n = session.scalar(stmt) + assert history_audit_n > 0 + + # 3. Run pruning script + + # 4. Verify new counts + + + + + + #stmt = select(History) + #result = session.scalars(stmt) + +