From 1432ec4a95bf8546c20e33da9a875e409967e175 Mon Sep 17 00:00:00 2001 From: Giuseppe Steduto Date: Mon, 2 Oct 2023 16:34:29 +0200 Subject: [PATCH] models: replace workflow.run_number with generation and restart number Change the workflow table to split the run_number into two integers: one referring to the generation of the workflows, and the other one referring to the restart number, thus removing the limit of 9 restarts. Closes #186. --- AUTHORS.rst | 1 + CHANGES.rst | 4 + ...e601de4_separate_run_and_restart_number.py | 93 +++++++++++++++++++ reana_db/models.py | 84 ++++++++++------- reana_db/utils.py | 30 ++++-- tests/test_models.py | 16 +++- tests/test_utils.py | 22 ++++- 7 files changed, 206 insertions(+), 44 deletions(-) create mode 100644 reana_db/alembic/versions/20231002_1208_b85c3e601de4_separate_run_and_restart_number.py diff --git a/AUTHORS.rst b/AUTHORS.rst index 35ea030..008f205 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -7,6 +7,7 @@ The list of contributors in alphabetical order: - `Camila Diaz `_ - `Diego Rodriguez `_ - `Dinos Kousidis `_ +- `Giuseppe Steduto `_ - `Jan Okraska `_ - `Leticia Wanderley `_ - `Marco Donadoni `_ diff --git a/CHANGES.rst b/CHANGES.rst index 9996a3f..3229242 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,10 @@ Changes ======= +Version 0.9.3 (UNRELEASED) +-------------------------- +- Replaces ``run_number`` column of the ``Workflow`` table with two new columns ``generation_number`` and ``restart_number``, to allow for more than 9 restarts. + Version 0.9.2 (2023-09-26) -------------------------- diff --git a/reana_db/alembic/versions/20231002_1208_b85c3e601de4_separate_run_and_restart_number.py b/reana_db/alembic/versions/20231002_1208_b85c3e601de4_separate_run_and_restart_number.py new file mode 100644 index 0000000..564feb9 --- /dev/null +++ b/reana_db/alembic/versions/20231002_1208_b85c3e601de4_separate_run_and_restart_number.py @@ -0,0 +1,93 @@ +"""Separate run number into generation and restart number. + +Revision ID: b85c3e601de4 +Revises: 377cfbfccf75 +Create Date: 2023-10-02 12:08:18.292490 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "b85c3e601de4" +down_revision = "377cfbfccf75" +branch_labels = None +depends_on = None + + +def upgrade(): + """Upgrade to b85c3e601de4 revision.""" + # Add new columns (generation_number, restart_number) + op.add_column( + "workflow", sa.Column("generation_number", sa.Integer()), schema="__reana" + ) + op.add_column( + "workflow", + sa.Column("restart_number", sa.Integer(), default=0), + schema="__reana", + ) + + # Data migration (split run_number into generation_number and restart_number) + op.get_bind().execute( + sa.text( + "UPDATE __reana.workflow" + " SET generation_number = FLOOR(run_number), " + " restart_number = (run_number - FLOOR(run_number)) * 10" + ), + ) + + # Delete old constraint + op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana") + + # Drop old run_number column + op.drop_column("workflow", "run_number", schema="__reana") + + # Add new constraint (the primary key is not run_number anymore, but with generation and restart number + op.create_unique_constraint( + "_user_workflow_run_uc", + "workflow", + ["name", "owner_id", "generation_number", "restart_number"], + schema="__reana", + ) + + +def downgrade(): + """Downgrade to 377cfbfccf75 revision.""" + # Revert constraint + op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana") + + # Add old run_number column back + op.add_column("workflow", sa.Column("run_number", sa.Float()), schema="__reana") + + # Check that there are no workflows discarded more than 10 times + # This is because of the way the info about restarts is stored in + # the run_number column (see https://github.com/reanahub/reana-db/issues/186) + restarted_ten_times = ( + op.get_bind() + .execute("SELECT COUNT(*) FROM __reana.workflow WHERE restart_number >= 10") + .fetchone()[0] + ) + if restarted_ten_times != 0: + raise ValueError( + "Cannot migrate database because some workflows have been restarted 10 or more times," + " and the previous database revision only supports up to 9 restarts." + " If you want to downgrade, you should manually delete them." + ) + + # Data migration (combine generation_number and restart_number back to run_number) + op.get_bind().execute( + "UPDATE __reana.workflow SET run_number=generation_number+(restart_number * 1.0 /10)" + ) + + # Drop new columns + op.drop_column("workflow", "generation_number", schema="__reana") + op.drop_column("workflow", "restart_number", schema="__reana") + + # Restore old constraint + op.create_unique_constraint( + "_user_workflow_run_uc", + "workflow", + ["name", "owner_id", "run_number"], + schema="__reana", + ) diff --git a/reana_db/models.py b/reana_db/models.py index d62c211..cca3e4f 100644 --- a/reana_db/models.py +++ b/reana_db/models.py @@ -60,6 +60,7 @@ from reana_db.utils import ( build_workspace_path, store_workflow_disk_quota, + split_run_number, update_users_cpu_quota, update_users_disk_quota, update_workflow_cpu_quota, @@ -459,7 +460,8 @@ class Workflow(Base, Timestamp, QuotaBase): run_started_at = Column(DateTime) run_finished_at = Column(DateTime) run_stopped_at = Column(DateTime) - _run_number = Column("run_number", Float) + generation_number = Column("generation_number", Integer) + restart_number = Column("restart_number", Integer, default=0) job_progress = Column(JSONType, default=dict) workspace_path = Column(String) restart = Column(Boolean, default=False) @@ -487,7 +489,11 @@ class Workflow(Base, Timestamp, QuotaBase): __table_args__ = ( UniqueConstraint( - "name", "owner_id", "run_number", name="_user_workflow_run_uc" + "name", + "owner_id", + "generation_number", + "restart_number", + name="_user_workflow_run_uc", ), {"schema": "__reana"}, ) @@ -527,7 +533,8 @@ def __init__( self.git_repo = git_repo self.git_provider = git_provider self.restart = restart - self._run_number = self.assign_run_number(run_number) + self.generation_number = self.assign_generation_number(run_number) + self.restart_number = self.assign_restart_number(run_number) self.workspace_path = workspace_path or build_workspace_path( self.owner_id, self.id_ ) @@ -538,53 +545,65 @@ def __repr__(self): return "" % self.id_ @hybrid_property - def run_number(self): + def run_number(self) -> str: """Property of run_number.""" - if self._run_number.is_integer(): - return int(self._run_number) - return self._run_number + if self.restart_number != 0: + return f"{self.generation_number}.{self.restart_number}" + return str(self.generation_number) - @run_number.expression - def run_number(cls): - return func.abs(cls._run_number) - - def assign_run_number(self, run_number): - """Assing run number.""" + def _get_last_workflow(self, run_number): + """Fetch the last workflow restart given a certain run number.""" from .database import Session if run_number: + generation_number, restart_number = split_run_number(run_number) last_workflow = ( Session.query(Workflow) .filter( Workflow.name == self.name, - Workflow.run_number >= int(run_number), - Workflow.run_number < int(run_number) + 1, + Workflow.generation_number == generation_number, Workflow.owner_id == self.owner_id, ) - .order_by(Workflow.run_number.desc()) + .order_by( + Workflow.generation_number.desc(), Workflow.restart_number.desc() + ) .first() ) else: last_workflow = ( Session.query(Workflow) .filter_by(name=self.name, restart=False, owner_id=self.owner_id) - .order_by(Workflow.run_number.desc()) + .order_by( + Workflow.generation_number.desc(), Workflow.restart_number.desc() + ) .first() ) - if last_workflow and self.restart: - # FIXME: remove the limit of nine restarts when we fix the way in which - # we save `run_number` in the DB - num_restarts = round(last_workflow.run_number * 10) % 10 - if num_restarts == LIMIT_RESTARTS: - raise REANAValidationError( - f"Cannot restart a workflow more than {LIMIT_RESTARTS} times" - ) - return round(last_workflow.run_number + 0.1, 1) + return last_workflow + + def assign_generation_number(self, run_number): + """Assign generation number.""" + last_workflow = self._get_last_workflow(run_number) + + if not last_workflow: + return 1 else: - if not last_workflow: - return 1 + if not self.restart: + return last_workflow.generation_number + 1 else: - return last_workflow.run_number + 1 + return last_workflow.generation_number + + def assign_restart_number(self, run_number): + """Assign restart number.""" + if not self.restart: + return 0 + last_workflow = self._get_last_workflow(run_number) + + if not last_workflow: + raise REANAValidationError( + "Cannot restart a workflow that has not been run before." + ) + else: + return last_workflow.restart_number + 1 def get_input_parameters(self): """Return workflow parameters.""" @@ -604,7 +623,7 @@ def get_owner_access_token(self): def get_full_workflow_name(self): """Return full workflow name including run number.""" - return "{}.{}".format(self.name, str(self.run_number)) + return "{}.{}".format(self.name, self.run_number) def get_workspace_disk_usage(self, summarize=False, search=None): """Retrieve disk usage information of a workspace.""" @@ -646,12 +665,11 @@ def get_all_restarts(self): the same name and the same run number (up to the dot). This includes the original workflow, as well as all the following restarts. """ - run_number = int(self.run_number) + generation_number, _ = split_run_number(self.run_number) restarts = Workflow.query.filter( Workflow.name == self.name, Workflow.owner_id == self.owner_id, - Workflow.run_number >= run_number, - Workflow.run_number < run_number + 1, + Workflow.generation_number == generation_number, ) return restarts diff --git a/reana_db/utils.py b/reana_db/utils.py index e56d91d..dec82ec 100644 --- a/reana_db/utils.py +++ b/reana_db/utils.py @@ -52,6 +52,14 @@ def build_workspace_path(user_id, workflow_id=None, workspace_root_path=None): return workspace_path +def split_run_number(run_number): + """Split run number into generation and restart numbers.""" + run_number = str(run_number) + if "." in run_number: + return tuple(map(int, run_number.split(".", maxsplit=1))) + return int(run_number), 0 + + def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid): """Get Workflow from database with uuid or name. @@ -128,21 +136,31 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid): return _get_workflow_by_name(workflow_name, user_uuid) # `run_number` was specified. - # Check `run_number` is valid. try: - run_number = float(run_number) + generation_number, restart_number = run_number.split(".", maxsplit=1) + except ValueError: + # There were not enough dot-separated substrings, so probably + # the `restart_number` was not specified. + generation_number = run_number + restart_number = 0 + + # Check `run_number` and `restart_number` are valid. + try: + generation_number = int(generation_number) + restart_number = int(restart_number) except ValueError: - # `uuid_or_name` was split, so it is a dot-separated string + # `uuid_or_name` was split, so it is a dot-separated string, # but it didn't contain a valid `run_number`. # Assume that this dot-separated string is the name of # the workflow and search with it. return _get_workflow_by_name(uuid_or_name, user_uuid) # `run_number` is valid. - # Search by `run_number` since it is a primary key. + # Search by `generation_number` and `restart_number`, since it is a primary key. workflow = Workflow.query.filter( Workflow.name == workflow_name, - Workflow.run_number == run_number, + Workflow.generation_number == generation_number, + Workflow.restart_number == restart_number, Workflow.owner_id == user_uuid, ).one_or_none() if not workflow: @@ -169,7 +187,7 @@ def _get_workflow_by_name(workflow_name, user_uuid): Workflow.query.filter( Workflow.name == workflow_name, Workflow.owner_id == user_uuid ) - .order_by(Workflow.run_number.desc()) + .order_by(Workflow.generation_number.desc(), Workflow.restart_number.desc()) .first() ) if not workflow: diff --git a/tests/test_models.py b/tests/test_models.py index 0831c6a..b99e618 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -51,7 +51,9 @@ def test_workflow_run_number_assignment(db, session, new_user): ) session.add(first_workflow) session.commit() - assert first_workflow.run_number == 1 + assert first_workflow.run_number == "1" + assert first_workflow.generation_number == 1 + assert first_workflow.restart_number == 0 second_workflow = Workflow( id_=str(uuid4()), name=workflow_name, @@ -62,7 +64,9 @@ def test_workflow_run_number_assignment(db, session, new_user): ) session.add(second_workflow) session.commit() - assert second_workflow.run_number == 2 + assert second_workflow.run_number == "2" + assert second_workflow.generation_number == 2 + assert second_workflow.restart_number == 0 first_workflow_restart = Workflow( id_=str(uuid4()), name=workflow_name, @@ -75,7 +79,9 @@ def test_workflow_run_number_assignment(db, session, new_user): ) session.add(first_workflow_restart) session.commit() - assert first_workflow_restart.run_number == 1.1 + assert first_workflow_restart.run_number == "1.1" + assert first_workflow_restart.generation_number == 1 + assert first_workflow_restart.restart_number == 1 first_workflow_second_restart = Workflow( id_=str(uuid4()), name=workflow_name, @@ -88,7 +94,9 @@ def test_workflow_run_number_assignment(db, session, new_user): ) session.add(first_workflow_second_restart) session.commit() - assert first_workflow_second_restart.run_number == 1.2 + assert first_workflow_second_restart.run_number == "1.2" + assert first_workflow_second_restart.generation_number == 1 + assert first_workflow_second_restart.restart_number == 2 def test_workflow_retention_rules(db, session, new_user): diff --git a/tests/test_utils.py b/tests/test_utils.py index 558b4ca..3a66d88 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -16,7 +16,27 @@ from reana_commons.config import SHARED_VOLUME_PATH from reana_db.models import Workflow -from reana_db.utils import _get_workflow_with_uuid_or_name +from reana_db.utils import _get_workflow_with_uuid_or_name, split_run_number + + +@pytest.mark.parametrize( + "run_number, generation_number, restart_number", + [ + ("1", 1, 0), + ("156.12", 156, 12), + ("2.4", 2, 4), + (3.22, 3, 22), + pytest.param( + "1.2.3", + None, + None, + marks=pytest.mark.xfail(raises=ValueError, strict=True), + ), + ], +) +def test_split_run_number(run_number, generation_number, restart_number): + """Tests for split_run_number().""" + assert split_run_number(run_number) == (generation_number, restart_number) @pytest.mark.parametrize(