Skip to content

Commit

Permalink
models: replace workflow.run_number with generation and restart number
Browse files Browse the repository at this point in the history
Change the workflow table to split the run_number into two integers: one
referring to the generation of the workflows, and the other one
referring to the restart number, thus removing the limit of 9 restarts.

Closes reanahub#186.
  • Loading branch information
giuseppe-steduto committed Oct 2, 2023
1 parent 6271ecd commit 1432ec4
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 44 deletions.
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The list of contributors in alphabetical order:
- `Camila Diaz <https://orcid.org/0000-0001-5543-797X>`_
- `Diego Rodriguez <https://orcid.org/0000-0003-0649-2002>`_
- `Dinos Kousidis <https://orcid.org/0000-0002-4914-4289>`_
- `Giuseppe Steduto <https://orcid.org/0009-0002-1258-8553>`_
- `Jan Okraska <https://orcid.org/0000-0002-1416-3244>`_
- `Leticia Wanderley <https://orcid.org/0000-0003-4649-6630>`_
- `Marco Donadoni <https://orcid.org/0000-0003-2922-5505>`_
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Changes
=======

Version 0.9.3 (UNRELEASED)
--------------------------
- Replaces ``run_number`` column of the ``Workflow`` table with two new columns ``generation_number`` and ``restart_number``, to allow for more than 9 restarts.

Version 0.9.2 (2023-09-26)
--------------------------

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Separate run number into generation and restart number.
Revision ID: b85c3e601de4
Revises: 377cfbfccf75
Create Date: 2023-10-02 12:08:18.292490
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "b85c3e601de4"
down_revision = "377cfbfccf75"
branch_labels = None
depends_on = None


def upgrade():
"""Upgrade to b85c3e601de4 revision."""
# Add new columns (generation_number, restart_number)
op.add_column(
"workflow", sa.Column("generation_number", sa.Integer()), schema="__reana"
)
op.add_column(
"workflow",
sa.Column("restart_number", sa.Integer(), default=0),
schema="__reana",
)

# Data migration (split run_number into generation_number and restart_number)
op.get_bind().execute(
sa.text(
"UPDATE __reana.workflow"
" SET generation_number = FLOOR(run_number), "
" restart_number = (run_number - FLOOR(run_number)) * 10"
),
)

# Delete old constraint
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")

# Drop old run_number column
op.drop_column("workflow", "run_number", schema="__reana")

# Add new constraint (the primary key is not run_number anymore, but with generation and restart number
op.create_unique_constraint(
"_user_workflow_run_uc",
"workflow",
["name", "owner_id", "generation_number", "restart_number"],
schema="__reana",
)


def downgrade():
"""Downgrade to 377cfbfccf75 revision."""
# Revert constraint
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")

# Add old run_number column back
op.add_column("workflow", sa.Column("run_number", sa.Float()), schema="__reana")

# Check that there are no workflows discarded more than 10 times
# This is because of the way the info about restarts is stored in
# the run_number column (see https://github.com/reanahub/reana-db/issues/186)
restarted_ten_times = (
op.get_bind()
.execute("SELECT COUNT(*) FROM __reana.workflow WHERE restart_number >= 10")
.fetchone()[0]
)
if restarted_ten_times != 0:
raise ValueError(
"Cannot migrate database because some workflows have been restarted 10 or more times,"
" and the previous database revision only supports up to 9 restarts."
" If you want to downgrade, you should manually delete them."
)

# Data migration (combine generation_number and restart_number back to run_number)
op.get_bind().execute(
"UPDATE __reana.workflow SET run_number=generation_number+(restart_number * 1.0 /10)"
)

# Drop new columns
op.drop_column("workflow", "generation_number", schema="__reana")
op.drop_column("workflow", "restart_number", schema="__reana")

# Restore old constraint
op.create_unique_constraint(
"_user_workflow_run_uc",
"workflow",
["name", "owner_id", "run_number"],
schema="__reana",
)
84 changes: 51 additions & 33 deletions reana_db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from reana_db.utils import (
build_workspace_path,
store_workflow_disk_quota,
split_run_number,
update_users_cpu_quota,
update_users_disk_quota,
update_workflow_cpu_quota,
Expand Down Expand Up @@ -459,7 +460,8 @@ class Workflow(Base, Timestamp, QuotaBase):
run_started_at = Column(DateTime)
run_finished_at = Column(DateTime)
run_stopped_at = Column(DateTime)
_run_number = Column("run_number", Float)
generation_number = Column("generation_number", Integer)
restart_number = Column("restart_number", Integer, default=0)
job_progress = Column(JSONType, default=dict)
workspace_path = Column(String)
restart = Column(Boolean, default=False)
Expand Down Expand Up @@ -487,7 +489,11 @@ class Workflow(Base, Timestamp, QuotaBase):

__table_args__ = (
UniqueConstraint(
"name", "owner_id", "run_number", name="_user_workflow_run_uc"
"name",
"owner_id",
"generation_number",
"restart_number",
name="_user_workflow_run_uc",
),
{"schema": "__reana"},
)
Expand Down Expand Up @@ -527,7 +533,8 @@ def __init__(
self.git_repo = git_repo
self.git_provider = git_provider
self.restart = restart
self._run_number = self.assign_run_number(run_number)
self.generation_number = self.assign_generation_number(run_number)
self.restart_number = self.assign_restart_number(run_number)
self.workspace_path = workspace_path or build_workspace_path(
self.owner_id, self.id_
)
Expand All @@ -538,53 +545,65 @@ def __repr__(self):
return "<Workflow %r>" % self.id_

@hybrid_property
def run_number(self):
def run_number(self) -> str:
"""Property of run_number."""
if self._run_number.is_integer():
return int(self._run_number)
return self._run_number
if self.restart_number != 0:
return f"{self.generation_number}.{self.restart_number}"
return str(self.generation_number)

@run_number.expression
def run_number(cls):
return func.abs(cls._run_number)

def assign_run_number(self, run_number):
"""Assing run number."""
def _get_last_workflow(self, run_number):
"""Fetch the last workflow restart given a certain run number."""
from .database import Session

if run_number:
generation_number, restart_number = split_run_number(run_number)
last_workflow = (
Session.query(Workflow)
.filter(
Workflow.name == self.name,
Workflow.run_number >= int(run_number),
Workflow.run_number < int(run_number) + 1,
Workflow.generation_number == generation_number,
Workflow.owner_id == self.owner_id,
)
.order_by(Workflow.run_number.desc())
.order_by(
Workflow.generation_number.desc(), Workflow.restart_number.desc()
)
.first()
)
else:
last_workflow = (
Session.query(Workflow)
.filter_by(name=self.name, restart=False, owner_id=self.owner_id)
.order_by(Workflow.run_number.desc())
.order_by(
Workflow.generation_number.desc(), Workflow.restart_number.desc()
)
.first()
)
if last_workflow and self.restart:
# FIXME: remove the limit of nine restarts when we fix the way in which
# we save `run_number` in the DB
num_restarts = round(last_workflow.run_number * 10) % 10
if num_restarts == LIMIT_RESTARTS:
raise REANAValidationError(
f"Cannot restart a workflow more than {LIMIT_RESTARTS} times"
)
return round(last_workflow.run_number + 0.1, 1)
return last_workflow

def assign_generation_number(self, run_number):
"""Assign generation number."""
last_workflow = self._get_last_workflow(run_number)

if not last_workflow:
return 1
else:
if not last_workflow:
return 1
if not self.restart:
return last_workflow.generation_number + 1
else:
return last_workflow.run_number + 1
return last_workflow.generation_number

def assign_restart_number(self, run_number):
"""Assign restart number."""
if not self.restart:
return 0
last_workflow = self._get_last_workflow(run_number)

if not last_workflow:
raise REANAValidationError(
"Cannot restart a workflow that has not been run before."
)
else:
return last_workflow.restart_number + 1

def get_input_parameters(self):
"""Return workflow parameters."""
Expand All @@ -604,7 +623,7 @@ def get_owner_access_token(self):

def get_full_workflow_name(self):
"""Return full workflow name including run number."""
return "{}.{}".format(self.name, str(self.run_number))
return "{}.{}".format(self.name, self.run_number)

def get_workspace_disk_usage(self, summarize=False, search=None):
"""Retrieve disk usage information of a workspace."""
Expand Down Expand Up @@ -646,12 +665,11 @@ def get_all_restarts(self):
the same name and the same run number (up to the dot). This includes the
original workflow, as well as all the following restarts.
"""
run_number = int(self.run_number)
generation_number, _ = split_run_number(self.run_number)
restarts = Workflow.query.filter(
Workflow.name == self.name,
Workflow.owner_id == self.owner_id,
Workflow.run_number >= run_number,
Workflow.run_number < run_number + 1,
Workflow.generation_number == generation_number,
)
return restarts

Expand Down
30 changes: 24 additions & 6 deletions reana_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ def build_workspace_path(user_id, workflow_id=None, workspace_root_path=None):
return workspace_path


def split_run_number(run_number):
"""Split run number into generation and restart numbers."""
run_number = str(run_number)
if "." in run_number:
return tuple(map(int, run_number.split(".", maxsplit=1)))
return int(run_number), 0


def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
"""Get Workflow from database with uuid or name.
Expand Down Expand Up @@ -128,21 +136,31 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
return _get_workflow_by_name(workflow_name, user_uuid)

# `run_number` was specified.
# Check `run_number` is valid.
try:
run_number = float(run_number)
generation_number, restart_number = run_number.split(".", maxsplit=1)
except ValueError:
# There were not enough dot-separated substrings, so probably
# the `restart_number` was not specified.
generation_number = run_number
restart_number = 0

# Check `run_number` and `restart_number` are valid.
try:
generation_number = int(generation_number)
restart_number = int(restart_number)
except ValueError:
# `uuid_or_name` was split, so it is a dot-separated string
# `uuid_or_name` was split, so it is a dot-separated string,
# but it didn't contain a valid `run_number`.
# Assume that this dot-separated string is the name of
# the workflow and search with it.
return _get_workflow_by_name(uuid_or_name, user_uuid)

# `run_number` is valid.
# Search by `run_number` since it is a primary key.
# Search by `generation_number` and `restart_number`, since it is a primary key.
workflow = Workflow.query.filter(
Workflow.name == workflow_name,
Workflow.run_number == run_number,
Workflow.generation_number == generation_number,
Workflow.restart_number == restart_number,
Workflow.owner_id == user_uuid,
).one_or_none()
if not workflow:
Expand All @@ -169,7 +187,7 @@ def _get_workflow_by_name(workflow_name, user_uuid):
Workflow.query.filter(
Workflow.name == workflow_name, Workflow.owner_id == user_uuid
)
.order_by(Workflow.run_number.desc())
.order_by(Workflow.generation_number.desc(), Workflow.restart_number.desc())
.first()
)
if not workflow:
Expand Down
16 changes: 12 additions & 4 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def test_workflow_run_number_assignment(db, session, new_user):
)
session.add(first_workflow)
session.commit()
assert first_workflow.run_number == 1
assert first_workflow.run_number == "1"
assert first_workflow.generation_number == 1
assert first_workflow.restart_number == 0
second_workflow = Workflow(
id_=str(uuid4()),
name=workflow_name,
Expand All @@ -62,7 +64,9 @@ def test_workflow_run_number_assignment(db, session, new_user):
)
session.add(second_workflow)
session.commit()
assert second_workflow.run_number == 2
assert second_workflow.run_number == "2"
assert second_workflow.generation_number == 2
assert second_workflow.restart_number == 0
first_workflow_restart = Workflow(
id_=str(uuid4()),
name=workflow_name,
Expand All @@ -75,7 +79,9 @@ def test_workflow_run_number_assignment(db, session, new_user):
)
session.add(first_workflow_restart)
session.commit()
assert first_workflow_restart.run_number == 1.1
assert first_workflow_restart.run_number == "1.1"
assert first_workflow_restart.generation_number == 1
assert first_workflow_restart.restart_number == 1
first_workflow_second_restart = Workflow(
id_=str(uuid4()),
name=workflow_name,
Expand All @@ -88,7 +94,9 @@ def test_workflow_run_number_assignment(db, session, new_user):
)
session.add(first_workflow_second_restart)
session.commit()
assert first_workflow_second_restart.run_number == 1.2
assert first_workflow_second_restart.run_number == "1.2"
assert first_workflow_second_restart.generation_number == 1
assert first_workflow_second_restart.restart_number == 2


def test_workflow_retention_rules(db, session, new_user):
Expand Down
Loading

0 comments on commit 1432ec4

Please sign in to comment.