Skip to content

Commit

Permalink
Add history pruner + test
Browse files Browse the repository at this point in the history
Add __init__.py to test.unit.data.model.db
  • Loading branch information
jdavcs committed May 2, 2024
1 parent a3387ec commit a1b3d39
Show file tree
Hide file tree
Showing 3 changed files with 358 additions and 0 deletions.
191 changes: 191 additions & 0 deletions lib/galaxy/model/scripts/history_table_pruner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import datetime
import logging

from sqlalchemy import text

TMP_TABLE = "tmp_unused_history"

ASSOC_TABLES = (
"event",
"history_audit",
"history_tag_association",
"history_annotation_association",
"history_rating_association",
"history_user_share_association",
"default_history_permissions",
"data_manager_history_association",
"cleanup_event_history_association",
"galaxy_session_to_history",
)

EXCLUDED_ASSOC_TABLES = (
"job_import_history_archive",
"job_export_history_archive",
"workflow_invocation",
"history_dataset_collection_association",
"job",
"history_dataset_association",
)

DEFAULT_BATCH_SIZE = 1000

logging.basicConfig()
log = logging.getLogger(__name__)


class HistoryTablePruner:
"""Removes unused histories (user is null, hid == 1)."""

def __init__(self, engine, batch_size=None, max_create_time=None):
self.engine = engine
self.batch_size = batch_size or DEFAULT_BATCH_SIZE
self.max_create_time = max_create_time or self._get_default_max_create_time()
self.min_id, self.max_id = self._get_min_max_ids()

def run(self):
"""
Due to the very large size of some tables, we run operations in batches, using low/high history id as boundaries.
"""
if self.min_id is None:
logging.info("No histories exist")
return

low = self.min_id
high = min(self.max_id, low + self.batch_size)
while low <= self.max_id:
self._run_batch(low, high)
low = high
high = high + self.batch_size

def _get_default_max_create_time(self):
"""By default, do not delete histories created less than a month ago."""
today = datetime.date.today()
return today.replace(month=today.month - 1)

def _run_batch(self, low, high):
self._mark_histories_as_deleted_and_purged(low, high)
histories = self._get_histories(low, high)
exclude = self._get_histories_to_exclude(low, high)

# Calculate set of histories to delete.
to_delete = set(histories) - exclude
if not to_delete:
logging.info(f"No histories to delete in the id range {low}-{high}")
return

self._create_tmp_table()
try:
self._populate_tmp_table(to_delete)
self._delete_associations()
self._set_references_to_null()
self._delete_histories(low, high)
except Exception as e:
raise e
finally:
self._drop_tmp_table()

def _get_min_max_ids(self):
stmt = text(
"SELECT min(id), max(id) FROM history WHERE user_id IS NULL AND hid_counter = 1 AND create_time < :create_time"
)
params = {"create_time": self.max_create_time}
with self.engine.begin() as conn:
minmax = conn.execute(stmt, params).all()
return minmax[0][0], minmax[0][1]

def _mark_histories_as_deleted_and_purged(self, low, high):
"""Mark target histories as deleted and purged to prevent their further usage."""
logging.info(f"Marking histories {low}-{high} as deleted and purged")
stmt = text(
"""
UPDATE history
SET deleted = TRUE, purged = TRUE
WHERE user_id IS NULL AND hid_counter = 1 AND create_time < :create_time AND id >= :low AND id < :high
"""
)
params = self._get_stmt_params(low, high)
with self.engine.begin() as conn:
return conn.execute(stmt, params)

def _get_histories(self, low, high):
"""Return ids of histories to delete."""
logging.info(f"Collecting history ids between {low}-{high}")
stmt = text(
"SELECT id FROM history WHERE user_id IS NULL AND hid_counter = 1 AND create_time < :create_time AND id >= :low AND id < :high"
)
params = self._get_stmt_params(low, high)
with self.engine.begin() as conn:
return conn.scalars(stmt, params).all()

def _get_histories_to_exclude(self, low, high):
"""Retrieve histories that should NOT be deleted due to existence of associated records that should be preserved."""
logging.info(f"Collecting ids of histories to exclude based on {len(EXCLUDED_ASSOC_TABLES)} associated tables:")
statements = []
for table in EXCLUDED_ASSOC_TABLES:
statements.append((table, text(f"SELECT history_id FROM {table} WHERE history_id >= :low AND id < :high")))

params = self._get_stmt_params(low, high)
ids = []
for table, stmt in statements:
with self.engine.begin() as conn:
logging.info(f"\tCollecting history_id from {table}")
ids += conn.scalars(stmt, params).all()

excluded = set(ids)
if None in excluded:
excluded.remove(None)
return excluded

def _create_tmp_table(self):
"""Create temporary table to hold history ids."""
stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)")
with self.engine.begin() as conn:
conn.execute(stmt)

def _drop_tmp_table(self):
stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)")
stmt = text(f"DROP TABLE {TMP_TABLE}")
with self.engine.begin() as conn:
conn.execute(stmt)

def _populate_tmp_table(self, to_delete):
"""Load ids of histories to delete into temporary table."""
assert to_delete
logging.info("Populating temporary table")
sql_values = ",".join([f"({id})" for id in to_delete])
stmt = text(f"INSERT INTO {TMP_TABLE} VALUES {sql_values}")
with self.engine.begin() as conn:
conn.execute(stmt)

def _delete_associations(self):
"""Delete records associated with histories to be deleted."""
logging.info("Deleting associated records from ...")

for table in ASSOC_TABLES:
stmt = text(f"DELETE FROM {table} WHERE history_id IN (SELECT id FROM {TMP_TABLE})")
with self.engine.begin() as conn:
conn.execute(stmt)

def _set_references_to_null(self):
"""Set history_id to null in galaxy_session table for records referring to histories to be deleted."""
logging.info("Set history_id to null in galaxy_session")
stmt = text(
f"UPDATE galaxy_session SET current_history_id = NULL WHERE current_history_id IN (SELECT id FROM {TMP_TABLE})"
)
with self.engine.begin() as conn:
conn.execute(stmt)

def _delete_histories(self, low, high):
"""Last step: delete histories that are safe to delete."""
logging.info(f"Delete histories in the id range {low} - {high}")
stmt = text(f"DELETE FROM history WHERE id IN (SELECT id FROM {TMP_TABLE})")
with self.engine.begin() as conn:
conn.execute(stmt)

def _get_stmt_params(self, low, high):
params = {
"create_time": self.max_create_time,
"low": low,
"high": high,
}
return params
Empty file.
167 changes: 167 additions & 0 deletions test/unit/data/model/db/test_history_table_pruner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import datetime

import pytest
from sqlalchemy import (
func,
select,
text,
)

from galaxy import model as m
from galaxy.model.scripts.history_table_pruner import HistoryTablePruner


@pytest.fixture()
def setup_db(
db_url,
session,
make_user,
make_history,
make_event,
make_history_tag_association,
make_history_annotation_association,
make_history_rating_association,
make_history_user_share_association,
make_default_history_permissions,
make_data_manager_history_association,
make_cleanup_event_history_association,
make_galaxy_session_to_history_association,
make_job_import_history_archive,
make_job_export_history_archive,
make_workflow_invocation,
make_history_dataset_collection_association,
make_job,
make_history_dataset_association,
make_galaxy_session,
):
# 1. Create 100 histories; make them deletable: user = null, hid_counter = 1.
histories = []
for id in range(100):
h = make_history(id=id)
h.user = None
h.hid_counter = 1
histories.append(h)

# 2. Set 10 histories as not deletable: hid_counter != 1.
for i in range(10):
histories[i].hid_counter = 42

# 3. Set next 10 histories as not deletable: user not null.
u = make_user()
for i in range(10, 20):
histories[i].user = u

# 4. For the next 6 histories create associations that cannot be deleted.
make_job_import_history_archive(history=histories[20])
make_job_export_history_archive(history=histories[21])
make_workflow_invocation(history=histories[22])
make_history_dataset_collection_association(history=histories[23])
make_history_dataset_association(history=histories[25])
make_job().history = histories[24]

# 5. For the next 10 histories create associations that can be deleted.
make_event(history=histories[26])
make_history_tag_association(history=histories[27])
make_history_annotation_association(history=histories[28])
make_history_rating_association(item=histories[29])
make_history_user_share_association(history=histories[30])
make_default_history_permissions(history=histories[31])
make_data_manager_history_association(history=histories[32])
make_cleanup_event_history_association(history_id=histories[33].id)
make_galaxy_session_to_history_association(history=histories[34])
# HistoryAudit is not instantiable, so created association manually.
stmt = text("insert into history_audit values(:history_id, :update_time)")
params = {"history_id": histories[35].id, "update_time": datetime.date.today()}
session.execute(stmt, params)

# 6. Create a galaxy_session record referring to a history.
# This cannot be deleted, but the history reference can be set to null.
make_galaxy_session(current_history=histories[36])

session.commit()

# TOTAL counts of loaded histories:
# histories that should NOT be deleted: 10 + 10 + 6 = 26
# histories that SHOULD be deleted: 100 - 26 = 74


def test_run(setup_db, session, db_url, engine):

def verify_counts(model, expected):
assert session.scalar(select(func.count()).select_from(model)) == expected

# 1. Verify history counts
stmt = select(m.History).order_by(m.History.id)
result = session.scalars(stmt).all()
assert len(result) == 100
for i, h in enumerate(result):
if i < 10: # first 10
assert h.hid_counter > 1
assert h.user is None
elif i < 20: # next 10
assert h.hid_counter == 1
assert h.user is not None
else: # the rest
assert h.hid_counter == 1
assert h.user is None

# 2. Verify association counts
for model in [
m.JobImportHistoryArchive,
m.JobExportHistoryArchive,
m.WorkflowInvocation,
m.HistoryDatasetCollectionAssociation,
m.Job,
m.HistoryDatasetAssociation,
m.Event,
m.HistoryTagAssociation,
m.HistoryAnnotationAssociation,
m.HistoryRatingAssociation,
m.HistoryUserShareAssociation,
m.DefaultHistoryPermissions,
m.DataManagerHistoryAssociation,
m.CleanupEventHistoryAssociation,
m.GalaxySessionToHistoryAssociation,
m.HistoryAudit,
]:
verify_counts(model, 1)
verify_counts(
m.GalaxySession, 2
) # one extra session was automatically created for GalaxySessionToHistoryAssociation

# 3. Run pruning script
today = datetime.date.today()
newdate = today.replace(year=today.year + 1)
HistoryTablePruner(engine, max_create_time=newdate).run()

# 4 Verify new counts (for details on expected counts see comments in setup_db)

# 4.1 Verify new history counts
verify_counts(m.History, 26)

# 4.2 Verify new association counts: no change (these associations should NOT be deleted)
for model in [
m.JobImportHistoryArchive,
m.JobExportHistoryArchive,
m.WorkflowInvocation,
m.HistoryDatasetCollectionAssociation,
m.Job,
m.HistoryDatasetAssociation,
]:
verify_counts(model, 1)
verify_counts(m.GalaxySession, 2)

# 4.3 Verify new association counts: deleted (these associations SHOULD be deleted)
for model in [
m.Event,
m.HistoryTagAssociation,
m.HistoryAnnotationAssociation,
m.HistoryRatingAssociation,
m.HistoryUserShareAssociation,
m.DefaultHistoryPermissions,
m.DataManagerHistoryAssociation,
m.CleanupEventHistoryAssociation,
m.GalaxySessionToHistoryAssociation,
m.HistoryAudit,
]:
verify_counts(model, 0)

0 comments on commit a1b3d39

Please sign in to comment.