Skip to content

Commit

Permalink
[wip]
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Apr 24, 2024
1 parent 625cd8a commit c1ee9e9
Show file tree
Hide file tree
Showing 5 changed files with 909 additions and 0 deletions.
203 changes: 203 additions & 0 deletions scripts/cleanup_histories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# Identify a set of unused histories (by whichever process is fastest)
# NEW: Mark those histories as deleted and purged to render them unusable
# Delete association tables we don't care about (currently, all the delete statements in the draft script)
# Update galaxy_session (set history_id to null for affected rows)
# Delete histories

# TODO: add step 1 on n to logs (n should be correct)

"""
args:
- max history updated date (recommended not less than a month?)
- history batch size
"""

import os
import sys

sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "lib")))

import logging # TODO do we need more setup?

from sqlalchemy import text, create_engine

from galaxy.model.orm.scripts import get_config

TMP_TABLE = "tmp_unused_history"

ASSOC_TABLES = (
"event",
"history_audit",
"history_tag_association",
"history_annotation_association",
"history_rating_association",
"history_user_share_association",
"default_history_permissions",
"data_manager_history_association",
"cleanup_event_history_association",
"galaxy_session_to_history",
)

EXCLUDED_ASSOC_TABLES = (
"job_import_history_archive",
"job_export_history_archive",
"workflow_invocation",
"history_dataset_collection_association",
"job",
"history_dataset_association",
)


class HistoryTablePruner:
""" Removes unused histories (user is null, hid == 1). """

def __init__(self):
# TODO add option to pass min id and max id
self.max_update_time = "01-01-2025" # TODO read from args
self.batch_size = 3 # TODO read from args
self.engine = self._create_engine()
self.min_id, self.max_id = self._get_min_max_ids()

def run(self):
"""
Due to the very large size of some tables, we run operations in batches, using low/high history id as boundaries.
"""
low = self.min_id
high = min(self.max_id, low + self.batch_size)
while low <= self.max_id:
self._run_batch(low, high)
low = high
high = high + self.batch_size

def _run_batch(self, low, high):
self._mark_histories_as_deleted_and_purged(low, high)
histories = self._get_histories(low, high)
exclude = self._get_histories_to_exclude(low, high)

# Calculate set of histories to delete.
to_delete = set(histories) - exclude
if not to_delete:
logging.info(f"No histories to delete in the id range {low} - {high}")
return

self._create_tmp_table()
try:
self._populate_tmp_table(to_delete)
self._delete_associations()
self._set_references_to_null()
self._delete_histories()
except Exception as e:
raise e
finally:
self._drop_tmp_table()

def _get_min_max_ids(self):
stmt = text(f"SELECT min(id), max(id) FROM history")
with self.engine.begin() as conn:
minmax = conn.execute(stmt).all()
return minmax[0][0], minmax[0][1]

def _create_engine(self):
db_url = get_config(sys.argv)["db_url"]
return create_engine(db_url)

def _mark_histories_as_deleted_and_purged(self, low, high):
""" Mark target histories as deleted and purged to prevent their further usage."""
logging.info(f"STEP 1 OF 10: Marking histories {low}-{high} as deleted and purged")
stmt = text(f"""
UPDATE history
SET deleted = TRUE, purged = TRUE
WHERE user_id IS NULL AND hid_counter = 1 AND update_time < :update_time AND id >= :low AND id < :high
""")
params = {
"update_time": self.max_update_time,
"low": low,
"high": high,
}
with self.engine.begin() as conn:
return conn.execute(stmt, params)

def _get_histories(self, low, high):
""" Return ids of histories to delete."""
logging.info(f"STEP 2 OF 10: Collecting history ids between {low}-{high}")
stmt = text("SELECT id FROM history WHERE user_id IS NULL AND hid_counter = 1 AND update_time < :update_time AND id >= :low AND id < :high")
params = {
"update_time": self.max_update_time,
"low": low,
"high": high,
}
with self.engine.begin() as conn:
return conn.scalars(stmt, params).all()

def _get_histories_to_exclude(self, low, high):
""" Retrieve histories that should NOT be deleted due to existence of associated records that should be preserved."""
logging.info(f"STEP 3 OF 10: Collecting ids of histories to exclude based on {len(EXCLUDED_ASSOC_TABLES)} associated tables:")
statements = []
for table in EXCLUDED_ASSOC_TABLES:
statements.append((table, text(f"SELECT history_id FROM {table} WHERE history_id >= :low AND id < :high")))

params = {
"low": low,
"high": high,
}

ids = []
with self.engine.begin() as conn: # TODO or maybe 1 transaction per table?
for table, stmt in statements:
logging.info(f"\tCollecting history_id from {table}")
ids += conn.scalars(stmt, params).all()

excluded = set(ids)
if None in excluded:
excluded.remove(None)
return excluded

def _create_tmp_table(self):
""" Create temporary table to hold history ids."""
stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)")
with self.engine.begin() as conn:
conn.execute(stmt)

def _drop_tmp_table(self):
stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)")
stmt = text(f"DROP TABLE {TMP_TABLE}")
with self.engine.begin() as conn:
conn.execute(stmt)

def _populate_tmp_table(self, to_delete):
""" Load ids of histories to delete into temporary table."""
assert to_delete
logging.info("STEP 4 OF 10: Populating temporary table")
sql_values = ",".join([f"({id})" for id in to_delete])
stmt = text(f"INSERT INTO {TMP_TABLE} VALUES {sql_values}")
with self.engine.begin() as conn:
conn.execute(stmt)

def _delete_associations(self):
""" Delete records associated with histories to be deleted."""
logging.info("STEP 5 OF 10: Deleting associated records from ...")

for table in ASSOC_TABLES:
stmt = text(f"DELETE FROM {table} WHERE history_id IN (SELECT id FROM {TMP_TABLE})")
with self.engine.begin() as conn:
conn.execute(stmt)

def _set_references_to_null(self):
""" Set history_id to null in galaxy_session table for records referring to histories to be deleted."""
logging.info("STEP 6 OF 10: Set history_id to null in galaxy_session")
stmt = text("UPDATE galaxy_session SET current_history_id = NULL WHERE current_history_id IN (SELECT id FROM {TMP_TABLE})")
with self.engine.begin() as conn:
conn.execute(stmt)

def _delete_histories(self):
""" Last step: delete histories that are safe to delete."""
logging.info("STEP 7 OF 10: Delete histories in the id range {low} - {high}")
stmt = text("DELETE FROM history WHERE id IN (SELECT id FROM {TMP_TABLE})")
with self.engine.begin() as conn:
conn.execute(stmt)


if __name__ == "__main__":
htp = HistoryTablePruner()
htp.run()
15 changes: 15 additions & 0 deletions scripts/cleanup_histories.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/sh

#######
# TODO add description
#######


cd "$(dirname "$0")" || exit

cd ..

# TODO add params
# TODO add help
python ./scripts/cleanup_histories.py
#python ./scripts/cleanup_histories.py >> ./scripts/cleanup_histories.log
24 changes: 24 additions & 0 deletions test/unit/data/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from collections import (
Counter,
namedtuple,
)

PRIVATE_OBJECT_STORE_ID = "my_private_data"

MockTransaction = namedtuple("MockTransaction", "user")


class MockObjectStore:

def is_private(self, object):
if object.object_store_id == PRIVATE_OBJECT_STORE_ID:
return True
else:
return False


def verify_items(items, expected_items):
"""
Assert that items and expected_items contain the same elements.
"""
assert Counter(items) == Counter(expected_items)
Loading

0 comments on commit c1ee9e9

Please sign in to comment.