From fec56fc8e78bf31599c680ecff6d18f2e7704b74 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Mon, 6 Mar 2023 10:06:00 +0100 Subject: [PATCH 01/27] Make "pi" and "description" mandatory This is because they are required by DDS when creating a project. --- delivery/handlers/dds_handlers.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/delivery/handlers/dds_handlers.py b/delivery/handlers/dds_handlers.py index ff31e2e..3dc87f9 100644 --- a/delivery/handlers/dds_handlers.py +++ b/delivery/handlers/dds_handlers.py @@ -46,7 +46,11 @@ async def post(self, project_name): response = requests.request("POST", url, json=payload) """ - required_members = ["auth_token"] + required_members = [ + "auth_token", + "pi", + "description", + ] project_metadata = self.body_as_object( required_members=required_members) From f553cf668aa855b3c282bffa83a4a543cc3c6066 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Mon, 6 Mar 2023 10:09:09 +0100 Subject: [PATCH 02/27] Rename `put` into `deliver` --- delivery/handlers/delivery_handlers.py | 2 +- delivery/models/project.py | 4 ++-- tests/unit_tests/services/test_dds.py | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index b3a203d..314a709 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -50,7 +50,7 @@ def post(self, staging_id): auth_token, delivery_project_id) - delivery_id = yield dds_project.put( + delivery_id = yield dds_project.deliver( staging_id, skip_delivery=skip_delivery, deadline=deadline, diff --git a/delivery/models/project.py b/delivery/models/project.py index 47a69c0..d5267dc 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -254,7 +254,7 @@ def get_ngi_project_name(self): return self._ngi_project_name @gen.coroutine - def put( + def deliver( self, staging_id, skip_delivery=False, @@ -263,7 +263,7 @@ def put( email=True, ): """ - Upload staged data to DDS + Deliver staged data to DDS Parameters ---------- diff --git a/tests/unit_tests/services/test_dds.py b/tests/unit_tests/services/test_dds.py index 8e58c62..b0de8cf 100644 --- a/tests/unit_tests/services/test_dds.py +++ b/tests/unit_tests/services/test_dds.py @@ -97,7 +97,7 @@ def test_dds_put(self): '.DDSProject.get_ngi_project_name', new_callable=AsyncMock, return_value='AB-1234'): - yield dds_project.put( + yield dds_project.deliver( staging_id=1, deadline=deadline, ) @@ -161,7 +161,7 @@ def test_dds_put_no_release(self): '.DDSProject.get_ngi_project_name', new_callable=AsyncMock, return_value='AB-1234'): - yield dds_project.put( + yield dds_project.deliver( staging_id=1, deadline=deadline, release=False, @@ -220,7 +220,7 @@ def test_dds_put_raises_on_non_existent_stage_id(self): '.DDSProject.get_ngi_project_name', new_callable=AsyncMock, return_value='AB-1234'): - yield dds_project.put(staging_id=1) + yield dds_project.deliver(staging_id=1) @gen_test def test_dds_put_raises_on_non_successful_stage_id(self): @@ -240,7 +240,7 @@ def test_dds_put_raises_on_non_successful_stage_id(self): '.DDSProject.get_ngi_project_name', new_callable=AsyncMock, return_value='AB-1234'): - yield dds_project.put(staging_id=1) + yield dds_project.deliver(staging_id=1) def test_delivery_order_by_id(self): delivery_order = DeliveryOrder( @@ -276,7 +276,7 @@ def test_possible_to_delivery_by_staging_id_and_skip_delivery(self): '.DDSProject.get_ngi_project_name', new_callable=AsyncMock, return_value='AB-1234'): - yield dds_project.put(staging_id=1, skip_delivery=True) + yield dds_project.deliver(staging_id=1, skip_delivery=True) def _get_delivery_order(): return self.delivery_order.delivery_status From 9f626a5d83e2c6a994c8a479d7a8bb7daa0675d6 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Wed, 8 Mar 2023 12:27:06 +0100 Subject: [PATCH 03/27] Implement database models for DDSPut and DDSDelivery --- delivery/models/db_models.py | 59 ++++++++++++- delivery/repositories/dds_repository.py | 107 ++++++++++++++++++++++++ 2 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 delivery/repositories/dds_repository.py diff --git a/delivery/models/db_models.py b/delivery/models/db_models.py index 52577c0..dbb6470 100644 --- a/delivery/models/db_models.py +++ b/delivery/models/db_models.py @@ -2,7 +2,8 @@ import os import enum as base_enum -from sqlalchemy import Column, Integer, BigInteger, String, Enum +from sqlalchemy import Column, ForeignKey +from sqlalchemy import Integer, BigInteger, String, Enum, DateTime from sqlalchemy.ext.declarative import declarative_base """ @@ -138,3 +139,59 @@ def __repr__(self): f"status: {self.delivery_status}, " " }" ) + +class DDSDelivery(SQLAlchemyBase): + + __tablename__ = "dds_deliveries" + + dds_project_id = Column(String, primary_key=True) + ngi_project_name = Column(String) + + date_started = Column(DateTime, nullable=False) + date_completed = Column(DateTime) + delivery_status = Column(Enum(DeliveryStatus), nullable=False) + + def __repr__(self): + return ( + "DDS Delivery: {" + f"dds_project: '{self.dds_project}', " + f"ngi_project_name: '{self.ngi_project_name}', " + f"date_started: {self.date_started}, " + f"date_completed: {self.date_completed}, " + f"delivery_status: {self.delivery_status}, " + " }" + ) + +class DDSPut(SQLAlchemyBase): + + __tablename__ = "dds_puts" + + id = Column(Integer, primary_key=True, autoincrement=True) + dds_project_id = Column( + String, + ForeignKey("dds_deliveries.dds_project_id"), + nullable=False) + dds_pid = Column(Integer, nullable=False) + + delivery_source = Column(String, nullable=False) + delivery_path = Column(String, nullable=False) + destination = Column(String) + + date_started = Column(DateTime, nullable=False) + date_completed = Column(DateTime) + delivery_status = Column(Enum(DeliveryStatus), nullable=False) + + def __repr__(self): + return ( + "DDS Put: {" + f"id: {self.id}," + f"dds_project: '{self.dds_project}', " + f"dds_pid: {self.dds_pid}, " + f"delivery_source: '{self.delivery_source}', " + f"delivery_path: '{self.delivery_path}', " + f"destination: '{self.destination}', " + f"date_started: {self.date_started}, " + f"date_completed: {self.date_completed}, " + f"delivery_status: {self.delivery_status}, " + " }" + ) diff --git a/delivery/repositories/dds_repository.py b/delivery/repositories/dds_repository.py new file mode 100644 index 0000000..76fa750 --- /dev/null +++ b/delivery/repositories/dds_repository.py @@ -0,0 +1,107 @@ +import datetime + +from delivery.models.db_models import DDSDelivery, DDSPut, DeliveryStatus + + +class DatabaseBasedDDSRepository: + def __init__(self, session_factory): + """ + Instantiate a new DatabaseBasedDDSRepository + :param session_factory: a factory method that can create a new + sqlalchemy Session object. + """ + self.session = session_factory() + + def _get_row(self, primary_key): + raise NotImplementedError + + def set_to_completed(self, primary_key, date_completed=None): + if not date_completed: + date_completed = datetime.datetime.now() + + row = self._get_row(primary_key) + + row.delivery_status = DeliveryStatus.delivery_successful + row.date_completed = date_completed + + self.session.commit() + + def update_status(self, primary_key, new_status): + row = self._get_row(primary_key) + row.delivery_status = new_status + self.session.commit() + + +class DatabaseBasedDDSDeliveryRepository(DatabaseBasedDDSRepository): + """ + Class to manipulate the DDSDelivery database. + """ + def _get_row(self, primary_key): + return self.session.get(DDSDelivery, primary_key) + + def register_dds_delivery( + self, + dds_project_id, + ngi_project_name, + date_started=None, + delivery_status=None, + ): + if not date_started: + date_started = datetime.datetime.now() + + if not delivery_status: + delivery_status = DeliveryStatus.delivery_in_progress + + dds_delivery = DDSDelivery( + dds_project_id=dds_project_id, + ngi_project_name=ngi_project_name, + date_started=date_started, + delivery_status=delivery_status, + ) + + self.session.add(dds_delivery) + self.session.commit() + + return dds_delivery + + def get_dds_delivery(self, dds_project_id): + return self._get_row(dds_project_id) + + +class DatabaseBasedDDSPutRepository(DatabaseBasedDDSRepository): + def _get_row(self, primary_key): + return self.session.get(DDSPut, primary_key) + + def register_dds_put( + self, + dds_project_id, + dds_pid, + delivery_source, + delivery_path, + destination=None, + date_started=None, + delivery_status=None, + ): + if not date_started: + date_started = datetime.datetime.now() + + if not delivery_status: + delivery_status = DeliveryStatus.delivery_in_progress + + dds_put = DDSPut( + dds_project_id=dds_project_id, + dds_pid=dds_pid, + delivery_source=delivery_source, + delivery_path=delivery_path, + destination=destination, + date_started=date_started, + delivery_status=delivery_status, + ) + + self.session.add(dds_put) + self.session.commit() + + return dds_put + + def get_dds_put(self, row_id): + return self._get_row(row_id) From fb3e68468911b607bb7bec3fbb812e3931dbf961 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Thu, 9 Mar 2023 11:01:05 +0100 Subject: [PATCH 04/27] Connect dds repos to dds_service --- delivery/app.py | 8 ++++++++ delivery/services/dds_service.py | 7 +++++++ 2 files changed, 15 insertions(+) diff --git a/delivery/app.py b/delivery/app.py index 6b4aa7a..3e793d2 100644 --- a/delivery/app.py +++ b/delivery/app.py @@ -25,6 +25,8 @@ FileSystemBasedUnorganisedRunfolderRepository from delivery.repositories.staging_repository import DatabaseBasedStagingRepository from delivery.repositories.deliveries_repository import DatabaseBasedDeliveriesRepository +from delivery.repositories.dds_repository import DatabaseBasedDDSDeliveryRepository, \ + DatabaseBasedDDSPutRepository from delivery.repositories.project_repository import GeneralProjectRepository, UnorganisedRunfolderProjectRepository from delivery.repositories.delivery_sources_repository import DatabaseBasedDeliverySourcesRepository from delivery.repositories.sample_repository import RunfolderProjectBasedSampleRepository @@ -159,6 +161,10 @@ def _assert_is_dir(directory): delivery_repo = DatabaseBasedDeliveriesRepository( session_factory=session_factory) + dds_delivery_repo = DatabaseBasedDDSDeliveryRepository( + session_factory=session_factory) + dds_put_repo = DatabaseBasedDDSPutRepository( + session_factory=session_factory) dds_conf = config['dds_conf'] dds_service = DDSService( @@ -166,6 +172,8 @@ def _assert_is_dir(directory): staging_service=staging_service, staging_dir=staging_dir, delivery_repo=delivery_repo, + dds_delivery_repo=dds_delivery_repo, + dds_put_repo=dds_put_repo, session_factory=session_factory, dds_conf=dds_conf) diff --git a/delivery/services/dds_service.py b/delivery/services/dds_service.py index bc32384..1f5d231 100644 --- a/delivery/services/dds_service.py +++ b/delivery/services/dds_service.py @@ -12,13 +12,20 @@ def __init__( staging_service, staging_dir, delivery_repo, + dds_delivery_repo, + dds_put_repo, session_factory, dds_conf): self.external_program_service = external_program_service self.dds_external_program_service = self.external_program_service self.staging_service = staging_service self.staging_dir = staging_dir + # `delivery_repo` and `dds_delivery_repo` have similar names here but + # `delivery_repo` will be removed once we decommission the old API + # version /AC 2023-03-09 self.delivery_repo = delivery_repo + self.dds_delivery_repo = dds_delivery_repo + self.dds_put_repo = dds_put_repo self.session_factory = session_factory self.dds_conf = dds_conf From 16288e79ad5f74adcb717470dee26ff072e231cc Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Thu, 9 Mar 2023 11:03:47 +0100 Subject: [PATCH 05/27] Register dds project in db upon creation --- delivery/models/project.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/delivery/models/project.py b/delivery/models/project.py index d5267dc..8b0894b 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -214,6 +214,11 @@ def new( stdout = yield self._run(cmd) self.project_id = cls._parse_dds_project_id(stdout) + self.dds_service.dds_delivery_repo.register_dds_delivery( + self.project_id, + ngi_project_name, + ) + self._ngi_project_name = ngi_project_name return self From ebda4a04e5df3797cbedf490cd173ff123162889 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Thu, 9 Mar 2023 11:06:53 +0100 Subject: [PATCH 06/27] Update `get_ngi_project_name` to call db instead --- delivery/models/project.py | 36 ++++----------------------- tests/unit_tests/services/test_dds.py | 7 +----- 2 files changed, 6 insertions(+), 37 deletions(-) diff --git a/delivery/models/project.py b/delivery/models/project.py index 8b0894b..2e05b8c 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -223,40 +223,14 @@ def new( return self - @gen.coroutine def get_ngi_project_name(self): """ NGI project name (e.g. AB-1234). - - If the attribute is not set, it will fetched from DDS. """ - try: - return self._ngi_project_name - except AttributeError: - cmd = self._base_cmd[:] - cmd += [ - 'ls', - '--json', - ] - - dds_output = yield self._run(cmd) - try: - dds_project_title = next( - project["Title"] - for project in json.loads(dds_output) - if project["Project ID"] == self.project_id - ) - - self._ngi_project_name = re.sub( - r"(\D{2})(\d{4})", - r"\1-\2", - dds_project_title) - except StopIteration: - err_msg = "Project {self.project_id} not found in DDS." - log.error(err_msg) - raise ProjectNotFoundException(err_msg) - - return self._ngi_project_name + + dds_delivery = self.dds_service.dds_delivery_repo \ + .get_dds_delivery(self.project_id) + return dds_delivery.ngi_project_name @gen.coroutine def deliver( @@ -295,7 +269,7 @@ def deliver( "Only deliver by staging_id if it has a successful status!" "Staging order was: {}".format(staging_order)) - ngi_project_name = yield self.get_ngi_project_name() + ngi_project_name = self.get_ngi_project_name() delivery_order = self.dds_service.delivery_repo.create_delivery_order( delivery_source=staging_order.get_staging_path(), diff --git a/tests/unit_tests/services/test_dds.py b/tests/unit_tests/services/test_dds.py index b0de8cf..d489105 100644 --- a/tests/unit_tests/services/test_dds.py +++ b/tests/unit_tests/services/test_dds.py @@ -95,7 +95,6 @@ def test_dds_put(self): with patch( 'delivery.models.project' '.DDSProject.get_ngi_project_name', - new_callable=AsyncMock, return_value='AB-1234'): yield dds_project.deliver( staging_id=1, @@ -159,7 +158,6 @@ def test_dds_put_no_release(self): with patch( 'delivery.models.project' '.DDSProject.get_ngi_project_name', - new_callable=AsyncMock, return_value='AB-1234'): yield dds_project.deliver( staging_id=1, @@ -218,7 +216,6 @@ def test_dds_put_raises_on_non_existent_stage_id(self): with patch( 'delivery.models.project' '.DDSProject.get_ngi_project_name', - new_callable=AsyncMock, return_value='AB-1234'): yield dds_project.deliver(staging_id=1) @@ -238,7 +235,6 @@ def test_dds_put_raises_on_non_successful_stage_id(self): with patch( 'delivery.models.project' '.DDSProject.get_ngi_project_name', - new_callable=AsyncMock, return_value='AB-1234'): yield dds_project.deliver(staging_id=1) @@ -274,7 +270,6 @@ def test_possible_to_delivery_by_staging_id_and_skip_delivery(self): with patch( 'delivery.models.project' '.DDSProject.get_ngi_project_name', - new_callable=AsyncMock, return_value='AB-1234'): yield dds_project.deliver(staging_id=1, skip_delivery=True) @@ -408,5 +403,5 @@ def test_get_dds_project_title(self): dds_project_id=mock_dds_project[0]["Project ID"], ) - ngi_project_name = yield dds_project.get_ngi_project_name() + ngi_project_name = dds_project.get_ngi_project_name() self.assertEqual(ngi_project_name, "AB-1234") From 641667cbeb0931be24df0bbd96ddd74bd3851af3 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Fri, 10 Mar 2023 13:41:19 +0100 Subject: [PATCH 07/27] Use `external_program_service` instead of private method _run --- delivery/models/project.py | 29 ++----------------- delivery/services/external_program_service.py | 11 ++++++- 2 files changed, 13 insertions(+), 27 deletions(-) diff --git a/delivery/models/project.py b/delivery/models/project.py index 2e05b8c..ce0c88d 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -211,7 +211,8 @@ def new( for args in ['--researcher', researcher] ] - stdout = yield self._run(cmd) + stdout = yield self.dds_service.external_program_service \ + .run_and_wait(cmd).stdout self.project_id = cls._parse_dds_project_id(stdout) self.dds_service.dds_delivery_repo.register_dds_delivery( @@ -329,31 +330,7 @@ def release(self, deadline=None, email=True): if not email: cmd.append('--no-mail') - yield self._run(cmd) - - @gen.coroutine - def _run(self, cmd): - """ - Run a dds command and wait for result. - - Parameters - ---------- - cmd: str - shell command to run. - """ - log.debug(f"Running dds with command: {' '.join(cmd)}") - execution = self.dds_service.external_program_service.run(cmd) - execution_result = yield self.dds_service.external_program_service \ - .wait_for_execution(execution) - - if execution_result.status_code != 0: - error_msg = ( - f"Failed to run DDS command: {execution_result.stderr}." - f" DDS returned status code: {execution_result.status_code}") - log.error(error_msg) - raise RuntimeError(error_msg) - - return execution_result.stdout + yield self.dds_service.external_program_service.run_and_wait(cmd) @gen.coroutine def _run_delivery( diff --git a/delivery/services/external_program_service.py b/delivery/services/external_program_service.py index c0c0cf3..2d2116a 100644 --- a/delivery/services/external_program_service.py +++ b/delivery/services/external_program_service.py @@ -2,11 +2,13 @@ from tornado.process import Subprocess from tornado import gen +import logging from subprocess import PIPE from delivery.models.execution import ExecutionResult, Execution +log = logging.getLogger(__name__) class ExternalProgramService(object): """ @@ -20,6 +22,7 @@ def run(cmd): :param cmd: the command to run as a list, i.e. ['ls','-l', '/'] :return: A instance of Execution """ + log.info(f"Running external command: {' '.join(cmd)}") p = Subprocess(cmd, stdout=PIPE, stderr=PIPE, @@ -38,6 +41,13 @@ def wait_for_execution(execution): out = execution.process_obj.stdout.read().decode('UTF-8') err = execution.process_obj.stderr.read().decode('UTF-8') + if status_code != 0: + error_msg = ( + f"Failed to run external command: {err}." + f" Program returned status code: {status_code}") + log.error(error_msg) + raise RuntimeError(error_msg) + return ExecutionResult(out, err, status_code) @staticmethod @@ -49,4 +59,3 @@ def run_and_wait(cmd): """ execution = ExternalProgramService.run(cmd) return ExternalProgramService.wait_for_execution(execution) - From d5f210e3c9a47b9df9928ff0ceadb95e4d633c66 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Fri, 10 Mar 2023 13:58:05 +0100 Subject: [PATCH 08/27] Implement DDSProject.put --- delivery/models/project.py | 41 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/delivery/models/project.py b/delivery/models/project.py index ce0c88d..33f9d33 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -233,6 +233,47 @@ def get_ngi_project_name(self): .get_dds_delivery(self.project_id) return dds_delivery.ngi_project_name + @gen.coroutine + def put( + self, + source, + source_path, + destination=None, + ): + cmd = self._base_cmd[:] + + cmd += [ + 'data', 'put', + '--mount-dir', self.dds_service.staging_dir, + '--source', source_path, + '--project', self.project_id, + '--break-on-fail', + '--silent', + ] + + if destination: + cmd += ['--destination', destination] + + execution = self.dds_service.external_program_service.run(cmd) + + dds_put = self.dds_service.dds_put_repo.register_dds_put( + self.project_id, + execution.pid, + source, + source_path, + destination, + ) + + try: + yield self.dds_service.external_program_service \ + .wait_for_execution(execution) + dds_put.delivery_status = DeliveryStatus.delivery_successful + except RuntimeError: + dds_put.delivery_status = DeliveryStatus.delivery_failed + raise + finally: + self.dds_service.dds_put_repo.session.commit() + @gen.coroutine def deliver( self, From e7a735b34ed1433f510a1b1bb168c26f123af287 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Fri, 10 Mar 2023 15:50:59 +0100 Subject: [PATCH 09/27] Implement DDSPut.is_running --- delivery/models/db_models.py | 17 +++++++++++++++++ requirements/prod | 1 + 2 files changed, 18 insertions(+) diff --git a/delivery/models/db_models.py b/delivery/models/db_models.py index dbb6470..a32ee6f 100644 --- a/delivery/models/db_models.py +++ b/delivery/models/db_models.py @@ -1,6 +1,8 @@ import os +import datetime import enum as base_enum +import psutil from sqlalchemy import Column, ForeignKey from sqlalchemy import Integer, BigInteger, String, Enum, DateTime @@ -195,3 +197,18 @@ def __repr__(self): f"delivery_status: {self.delivery_status}, " " }" ) + + def is_running(self): + try: + process = psutil.Process(self.dds_pid) + date_created = datetime.datetime.fromtimestamp( + process.create_time()) + return ( + process.name() == 'dds' + and ( + abs(date_created - self.date_started) + < datetime.timedelta(minutes=1) + ) + ) + except psutil.NoSuchProcess: + return False diff --git a/requirements/prod b/requirements/prod index fbec63a..bbb00cd 100644 --- a/requirements/prod +++ b/requirements/prod @@ -5,4 +5,5 @@ sqlalchemy==1.4.35 alembic==1.7.7 enum34==1.1.10 arteria==1.1.4 +psutil==5.9.4 dds-cli From 324a7b1ee18b971c33c476f41075398db895bcb5 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Mon, 20 Mar 2023 13:47:02 +0100 Subject: [PATCH 10/27] Remove `run_and_wait` method It seems this method confuses pytest too much and makes mocking impossible. --- delivery/models/project.py | 12 ++++++++---- delivery/services/external_program_service.py | 10 ---------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/delivery/models/project.py b/delivery/models/project.py index 33f9d33..d5aec6e 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -211,9 +211,11 @@ def new( for args in ['--researcher', researcher] ] - stdout = yield self.dds_service.external_program_service \ - .run_and_wait(cmd).stdout - self.project_id = cls._parse_dds_project_id(stdout) + execution = self.dds_service.external_program_service \ + .run(cmd) + result = yield self.dds_service.external_program_service \ + .wait_for_execution(execution) + self.project_id = cls._parse_dds_project_id(result.stdout) self.dds_service.dds_delivery_repo.register_dds_delivery( self.project_id, @@ -371,7 +373,9 @@ def release(self, deadline=None, email=True): if not email: cmd.append('--no-mail') - yield self.dds_service.external_program_service.run_and_wait(cmd) + execution = self.dds_service.external_program_service.run(cmd) + yield self.dds_service.external_program_service. \ + wait_for_execution(execution) @gen.coroutine def _run_delivery( diff --git a/delivery/services/external_program_service.py b/delivery/services/external_program_service.py index 2d2116a..7c05866 100644 --- a/delivery/services/external_program_service.py +++ b/delivery/services/external_program_service.py @@ -49,13 +49,3 @@ def wait_for_execution(execution): raise RuntimeError(error_msg) return ExecutionResult(out, err, status_code) - - @staticmethod - def run_and_wait(cmd): - """ - Run an external command and wait for it to finish - :param cmd: the command to run as a list, i.e. ['ls','-l', '/'] - :return: an ExecutionResult for the execution - """ - execution = ExternalProgramService.run(cmd) - return ExternalProgramService.wait_for_execution(execution) From 6c554a9a23fae84367fa9e426dd8d301b67704d7 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 21 Mar 2023 16:59:58 +0100 Subject: [PATCH 11/27] Fix integration tests OBS: there is a lot of code repetition in there and it's probably possible to make these tests more concise by writting some helper functions in the base class, but these tests are soon going to be obsolete and are going to be removed once the new delivery endpoint is released and the old ones are decommissioned. --- tests/integration_tests/base.py | 4 +- .../integration_tests/test_integration_dds.py | 104 ++++++++++++++++-- tests/unit_tests/services/test_dds.py | 31 +----- 3 files changed, 101 insertions(+), 38 deletions(-) diff --git a/tests/integration_tests/base.py b/tests/integration_tests/base.py index afdccdc..0a96663 100644 --- a/tests/integration_tests/base.py +++ b/tests/integration_tests/base.py @@ -93,9 +93,8 @@ def mock_delivery(cmd): log.debug(f"Mock is called with {cmd}") shell = False if cmd[0].endswith('dds'): - new_cmd = ['sleep', str(self.mock_duration)] - if 'project' in cmd: + new_cmd = ['sleep', str(0.01)] dds_output = f"""Current user: bio Project created with id: {project_id} User forskare was associated with Project {project_id} as Owner=True. An e-mail notification has not been sent. @@ -119,6 +118,7 @@ def mock_delivery(cmd): new_cmd = " ".join(new_cmd) shell = True elif 'put' in cmd: + new_cmd = ['sleep', str(self.mock_duration)] source_file = cmd[cmd.index("--source") + 1] auth_token = cmd[cmd.index("--token-path") + 1] new_cmd += ['&&', 'test', '-e', source_file] diff --git a/tests/integration_tests/test_integration_dds.py b/tests/integration_tests/test_integration_dds.py index 7527642..5bdaced 100644 --- a/tests/integration_tests/test_integration_dds.py +++ b/tests/integration_tests/test_integration_dds.py @@ -23,6 +23,23 @@ def test_can_stage_and_delivery_runfolder(self): self._create_projects_dir_with_random_data(tmp_dir) self._create_checksums_file(tmp_dir) + project_name = "AB-1234" + url = "/".join([self.API_BASE, "dds_project", "create", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + dds_project_id = json.loads(response.body)["dds_project_id"] + url = "/".join([self.API_BASE, "stage", "runfolder", dir_name]) response = yield self.http_client.fetch(self.get_url(url), method='POST', body='') self.assertEqual(response.code, 202) @@ -48,8 +65,8 @@ def test_can_stage_and_delivery_runfolder(self): self.assertTrue(os.path.exists(f"/tmp/{staging_id}/{project}")) delivery_url = '/'.join([self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { - 'delivery_project_id': 'snpseq00025', - 'ngi_project_name': 'AB-1234', + 'delivery_project_id': dds_project_id, + 'ngi_project_name': project_name, 'auth_token': '1234', 'skip_delivery': True, } @@ -69,6 +86,22 @@ def test_can_stage_and_delivery_project_dir(self): # expected to be installed on the system where this runs) with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = "AB-1234" + url = "/".join([self.API_BASE, "dds_project", "create", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + dds_project_id = json.loads(response.body)["dds_project_id"] dir_name = os.path.basename(tmp_dir) url = "/".join([self.API_BASE, "stage", "project", dir_name]) @@ -90,8 +123,8 @@ def test_can_stage_and_delivery_project_dir(self): for project, staging_id in staging_order_project_and_id.items(): delivery_url = '/'.join([self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { - 'delivery_project_id': 'snpseq00025', - 'ngi_project_name': 'AB-1234', + 'delivery_project_id': dds_project_id, + 'ngi_project_name': project_name, 'skip_delivery': True, 'dds': True, 'auth_token': '1234', @@ -258,6 +291,23 @@ def test_mock_duration_is_2(self): self._create_projects_dir_with_random_data(tmp_dir) self._create_checksums_file(tmp_dir) + project_name = "AB-1234" + url = "/".join([self.API_BASE, "dds_project", "create", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + dds_project_id = json.loads(response.body)["dds_project_id"] + url = "/".join([self.API_BASE, "stage", "runfolder", dir_name]) response = yield self.http_client.fetch( self.get_url(url), @@ -272,8 +322,8 @@ def test_mock_duration_is_2(self): delivery_url = '/'.join([ self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { - 'delivery_project_id': 'snpseq00025', - 'ngi_project_name': 'AB-1234', + 'delivery_project_id': dds_project_id, + 'ngi_project_name': project_name, 'dds': True, 'auth_token': '1234', 'skip_delivery': False, @@ -309,6 +359,23 @@ def test_can_delivery_data_asynchronously(self): self._create_projects_dir_with_random_data(tmp_dir) self._create_checksums_file(tmp_dir) + project_name = "AB-1234" + url = "/".join([self.API_BASE, "dds_project", "create", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + dds_project_id = json.loads(response.body)["dds_project_id"] + url = "/".join([self.API_BASE, "stage", "runfolder", dir_name]) response = yield self.http_client.fetch( self.get_url(url), @@ -323,8 +390,8 @@ def test_can_delivery_data_asynchronously(self): delivery_url = '/'.join([ self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { - 'delivery_project_id': 'snpseq00025', - 'ngi_project_name': 'AB-1234', + 'delivery_project_id': dds_project_id, + 'ngi_project_name': project_name, 'dds': True, 'auth_token': '1234', 'skip_delivery': False, @@ -366,6 +433,23 @@ def test_can_deliver_and_not_timeout(self): dir='./tests/resources/runfolders/', prefix='160930_ST-E00216_0111_BH37CWALXX_') as tmp_dir: + project_name = "CD-1234" + url = "/".join([self.API_BASE, "dds_project", "create", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + dds_project_id = json.loads(response.body)["dds_project_id"] + dir_name = os.path.basename(tmp_dir) self._create_projects_dir_with_random_data(tmp_dir) self._create_checksums_file(tmp_dir) @@ -383,8 +467,8 @@ def test_can_deliver_and_not_timeout(self): delivery_url = '/'.join([ self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { - 'delivery_project_id': 'snpseq00025', - 'ngi_project_name': 'AB-1234', + 'delivery_project_id': dds_project_id, + 'ngi_project_name': project_name, 'dds': True, 'auth_token': '1234', 'skip_delivery': False, diff --git a/tests/unit_tests/services/test_dds.py b/tests/unit_tests/services/test_dds.py index d489105..fe98cef 100644 --- a/tests/unit_tests/services/test_dds.py +++ b/tests/unit_tests/services/test_dds.py @@ -38,6 +38,9 @@ def wait_as_coroutine(x): self.mock_staging_service = MagicMock() self.mock_delivery_repo = MagicMock() + self.mock_dds_delivery_repo = MagicMock() + self.mock_dds_put_repo = MagicMock() + self.delivery_order = DeliveryOrder( id=1, delivery_source="/staging/dir/bar", @@ -56,6 +59,8 @@ def wait_as_coroutine(x): staging_service=self.mock_staging_service, staging_dir='/foo/bar/staging_dir', delivery_repo=self.mock_delivery_repo, + dds_delivery_repo=self.mock_dds_delivery_repo, + dds_put_repo=self.mock_dds_put_repo, session_factory=self.mock_session_factory, dds_conf=self.mock_dds_config, ) @@ -379,29 +384,3 @@ def test_release_project_nomail(self): '--deadline', deadline, '--no-mail', ]) - - @gen_test - def test_get_dds_project_title(self): - mock_dds_project = [{ - "Access": True, - "Last updated": "Fri, 01 Jul 2022 14:31:13 CEST", - "PI": "pi@email.com", - "Project ID": "snpseq00025", - "Size": 25856185058, - "Status": "In Progress", - "Title": "AB1234" - }] - - with patch( - 'delivery.models.project.DDSProject._run', - new_callable=AsyncMock, - return_value=json.dumps(mock_dds_project), - ): - dds_project = DDSProject( - dds_service=self.dds_service, - auth_token=self.token_file.name, - dds_project_id=mock_dds_project[0]["Project ID"], - ) - - ngi_project_name = dds_project.get_ngi_project_name() - self.assertEqual(ngi_project_name, "AB-1234") From 564011d99be8acc4baa1f71af9bf7db2ba67a4af Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 21 Mar 2023 17:01:59 +0100 Subject: [PATCH 12/27] Add alembic db migration script --- ...e3ecb_add_ddsdelivery_and_ddsput_models.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 alembic/versions/69ebc95e3ecb_add_ddsdelivery_and_ddsput_models.py diff --git a/alembic/versions/69ebc95e3ecb_add_ddsdelivery_and_ddsput_models.py b/alembic/versions/69ebc95e3ecb_add_ddsdelivery_and_ddsput_models.py new file mode 100644 index 0000000..2ac482c --- /dev/null +++ b/alembic/versions/69ebc95e3ecb_add_ddsdelivery_and_ddsput_models.py @@ -0,0 +1,61 @@ +"""Add DDSDelivery and DDSPut Models + +Revision ID: 69ebc95e3ecb +Revises: 74b309c44134 +Create Date: 2023-03-10 16:10:37.844346 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '69ebc95e3ecb' +down_revision = '74b309c44134' +branch_labels = None +depends_on = None + +def upgrade(): + op.create_table( + 'dds_deliveries', + sa.Column('dds_project_id', sa.String, primary_key=True), + sa.Column('ngi_project_name', sa.String), + sa.Column('date_started', sa.DateTime, nullable=False), + sa.Column('date_completed', sa.DateTime), + sa.Column('delivery_status', sa.Enum( + 'pending', + 'delivery_in_progress', + 'delivery_successful', + 'delivery_failed', + 'delivery_skipped', + name='deliverystatus' + )), + ) + + op.create_table( + 'dds_puts', + sa.Column('id', sa.Integer, primary_key=True, autoincrement=True), + sa.Column( + 'dds_project_id', + sa.String, + sa.ForeignKey("dds_deliveries.dds_project_id"), + nullable=False), + sa.Column('dds_pid', sa.Integer, nullable=False), + sa.Column('delivery_source', sa.String, nullable=False), + sa.Column('delivery_path', sa.String, nullable=False), + sa.Column('destination', sa.String), + sa.Column('date_started', sa.DateTime, nullable=False), + sa.Column('date_completed', sa.DateTime), + sa.Column('delivery_status', sa.Enum( + 'pending', + 'delivery_in_progress', + 'delivery_successful', + 'delivery_failed', + 'delivery_skipped', + name='deliverystatus' + )), + ) + +def downgrade(): + op.drop_table('dds_puts') + op.drop_table('dds_deliveries') From 156530a20486834439e01196e2854f6dbe572efc Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Wed, 22 Mar 2023 15:21:02 +0100 Subject: [PATCH 13/27] Implement new project delivery handler --- delivery/handlers/delivery_handlers.py | 86 +++++++++++++++++++++++++ delivery/repositories/dds_repository.py | 17 +++++ 2 files changed, 103 insertions(+) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index 314a709..5931e43 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -12,6 +12,92 @@ log = logging.getLogger(__name__) + +class DeliverProjectHandler(ArteriaDeliveryBaseHandler): + """ + Handler for delivering a project a project (i.e. a directory placed in the + directory defined by the arteria delivery service + `general_project_directory` configuration). + """ + def initialize(self, **kwargs): + self.dds_service = kwargs["dds_service"] + self.general_project_repo = kwargs["general_project_repo"] + super().initialize(kwargs) + + async def post(self, project_name): + required_members = [ + "auth_token", + "pi", + "description", + ] + + request_data = self.body_as_object( + required_members=required_members) + + project_metadata = { + key: request_data[key] + for key in [ + "pi", + "description", + "owners", + "researchers", + "project_alias", + ] + if key in request_data + } + + force_delivery = request_data.get("force_delivery", False) + + if ( + self.dds_service.dds_put_repo + .was_delivered_before(project_name, project_name) + and not force_delivery + ): + self.set_status( + FORBIDDEN, + f"The project {project_name} has already been delivered. " + "Use the force to bypass and deliver anyway." + ) + return + + project_path = self.general_project_repo.get_project( + project_metadata.get("project_alias", project_name)).path + + dds_project = await DDSProject.new( + project_name, + project_metadata, + request_data["auth_token"], + self.dds_service) + + log.info( + f"New dds project created for project {project_name} " + f"with id {dds_project.project_id}" + ) + + self.set_status(ACCEPTED) + self.write_json({ + 'dds_project_id': dds_project.project_id, + 'status_link': "{0}://{1}{2}".format( + self.request.protocol, + self.request.host, + self.reverse_url("delivery_status", dds_project.project_id) + ) + }) + self.finish() + + await dds_project.put(project_name, project_path) + log.info(f"Uploaded {project_path} to {dds_project.project_id}") + + if request_data.get("release"): + await dds_project.release( + deadline=request_data.get("deadline", None), + email=request_data.get("email", True), + ) + log.info(f"Released project {dds_project.project_id}") + + dds_project.complete() + + class DeliverByStageIdHandler(ArteriaDeliveryBaseHandler): """ Handler for starting deliveries based on a previously staged directory/file diff --git a/delivery/repositories/dds_repository.py b/delivery/repositories/dds_repository.py index 76fa750..db4b6eb 100644 --- a/delivery/repositories/dds_repository.py +++ b/delivery/repositories/dds_repository.py @@ -4,6 +4,9 @@ class DatabaseBasedDDSRepository: + """ + Base class for DDSDelivery and DDSPut + """ def __init__(self, session_factory): """ Instantiate a new DatabaseBasedDDSRepository @@ -105,3 +108,17 @@ def register_dds_put( def get_dds_put(self, row_id): return self._get_row(row_id) + + def was_delivered_before(self, ngi_project_name, source): + return self.session.query(DDSPut) \ + .join(DDSDelivery) \ + .filter(DDSDelivery.ngi_project_name == ngi_project_name) \ + .filter(DDSPut.delivery_source == source) \ + .first() is not None + + def get_dds_put_by_status(self, dds_project_id, delivery_status): + return self.session.query(DDSPut) \ + .join(DDSDelivery) \ + .filter(DDSDelivery.dds_project_id == dds_project_id) \ + .filter(DDSPut.delivery_status == delivery_status) \ + .all() From 2905775beaf3ecacb05cbc2d4d94cfc6a8839c8f Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Wed, 22 Mar 2023 15:21:19 +0100 Subject: [PATCH 14/27] Reorder methods in DDSProject This is in preparation for the big clean-up --- delivery/models/project.py | 59 +++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/delivery/models/project.py b/delivery/models/project.py index d5aec6e..1e7029d 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -276,6 +276,36 @@ def put( finally: self.dds_service.dds_put_repo.session.commit() + @gen.coroutine + def release(self, deadline=None, email=True): + """ + Release the project in DDS + + Parameters + ---------- + deadline: int + project deadline in days. + """ + cmd = self._base_cmd[:] + + cmd += [ + 'project', 'status', 'release', + '--project', self.project_id, + ] + + if deadline: + cmd += [ + '--deadline', str(deadline), + ] + + if not email: + cmd.append('--no-mail') + + execution = self.dds_service.external_program_service.run(cmd) + yield self.dds_service.external_program_service. \ + wait_for_execution(execution) + +# The code below will be removed once the new delivery flow is in place @gen.coroutine def deliver( self, @@ -348,35 +378,6 @@ def deliver( return delivery_order.id - @gen.coroutine - def release(self, deadline=None, email=True): - """ - Release the project in DDS - - Parameters - ---------- - deadline: int - project deadline in days. - """ - cmd = self._base_cmd[:] - - cmd += [ - 'project', 'status', 'release', - '--project', self.project_id, - ] - - if deadline: - cmd += [ - '--deadline', str(deadline), - ] - - if not email: - cmd.append('--no-mail') - - execution = self.dds_service.external_program_service.run(cmd) - yield self.dds_service.external_program_service. \ - wait_for_execution(execution) - @gen.coroutine def _run_delivery( self, From 50a62c09fd6112c10167548047f5e6559ce80522 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 8 Aug 2023 13:57:59 +0200 Subject: [PATCH 15/27] Add route to new delivery handler --- delivery/app.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/delivery/app.py b/delivery/app.py index 3e793d2..9229ed1 100644 --- a/delivery/app.py +++ b/delivery/app.py @@ -16,7 +16,8 @@ from delivery.handlers.project_handlers import ProjectHandler, ProjectsForRunfolderHandler, \ BestPracticeProjectSampleHandler from delivery.handlers.dds_handlers import DDSCreateProjectHandler -from delivery.handlers.delivery_handlers import DeliverByStageIdHandler, DeliveryStatusHandler +from delivery.handlers.delivery_handlers import DeliverByStageIdHandler, \ + DeliveryStatusHandler, DeliverProjectHandler from delivery.handlers.staging_handlers import StagingRunfolderHandler, StagingHandler,\ StageGeneralDirectoryHandler, StagingProjectRunfoldersHandler from delivery.handlers.organise_handlers import OrganiseRunfolderHandler @@ -76,6 +77,8 @@ def routes(**kwargs): url(r"/api/1.0/deliver/status/(.+)", DeliveryStatusHandler, name="delivery_status", kwargs=kwargs), + url(r"/api/1.0/deliver/project/(.+)", DeliverProjectHandler, + name="deliver_project", kwargs=kwargs), url(r"/api/1.0/dds_project/create/(.+)", DDSCreateProjectHandler, name="create_dds_project", kwargs=kwargs), From 18df8ab328eab48a2e2dc6a3d68414ef0402dc4d Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 8 Aug 2023 14:00:33 +0200 Subject: [PATCH 16/27] Update status to support new delivery method --- delivery/handlers/delivery_handlers.py | 34 +++++++++++++++++++++----- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index 5931e43..f10eff8 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -2,6 +2,7 @@ import os import json import logging +import re import tempfile from tornado.gen import coroutine @@ -162,16 +163,37 @@ def initialize(self, **kwargs): @coroutine def get(self, delivery_order_id): - delivery_order = self.delivery_service\ - .get_delivery_order_by_id(delivery_order_id) + """ + Returns project status. + """ - delivery_order = yield self.delivery_service.update_delivery_status( + pattern_dds_project = re.compile(r"snpseq\d+") + + if pattern_dds_project.fullmatch(delivery_order_id): + delivery_project = DDSProject( + self.delivery_service, + "", delivery_order_id) - body = { - 'id': delivery_order.id, - 'status': delivery_order.delivery_status.name, + try: + body = { + 'id': delivery_order_id, + 'status': delivery_project.get_db_entry().delivery_status.name, } + except AttributeError: + self.set_status(NOT_FOUND) + return + else: + delivery_order = self.delivery_service\ + .get_delivery_order_by_id(delivery_order_id) + + delivery_order = yield self.delivery_service.update_delivery_status( + delivery_order_id) + + body = { + 'id': delivery_order.id, + 'status': delivery_order.delivery_status.name, + } self.write_json(body) self.set_status(OK) From 1db11d18373b95a73dc7029a91221e3f7e88b1ab Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 8 Aug 2023 14:01:50 +0200 Subject: [PATCH 17/27] Check only one dds put is running at a time --- delivery/models/project.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/delivery/models/project.py b/delivery/models/project.py index 1e7029d..86bc7ef 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -235,6 +235,26 @@ def get_ngi_project_name(self): .get_dds_delivery(self.project_id) return dds_delivery.ngi_project_name + def get_db_entry(self): + """ + Returns project entry from the delivery database, or None if project is + not found. + + Returns + ------- + db_models.DDSDelivery + """ + return self.dds_service.dds_delivery_repo \ + .get_dds_delivery(self.project_id) + + def has_ongoing_puts(self): + """ + Returns if there are any ongoing uploads with this project. + """ + return bool(self.dds_service.dds_put_repo.get_dds_put_by_status( + self.project_id, + DeliveryStatus.delivery_in_progress)) + @gen.coroutine def put( self, @@ -242,6 +262,9 @@ def put( source_path, destination=None, ): + assert not self.has_ongoing_puts(), \ + "Only one upload is permitted at a time" + cmd = self._base_cmd[:] cmd += [ @@ -286,6 +309,9 @@ def release(self, deadline=None, email=True): deadline: int project deadline in days. """ + assert not self.has_ongoing_puts(), \ + "Cannot release project while uploads are ongoing" + cmd = self._base_cmd[:] cmd += [ From 668588cbed7c0be383ca74fd2900d2de3081f841 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 8 Aug 2023 14:02:22 +0200 Subject: [PATCH 18/27] Add complete method This set a project as completed in the database --- delivery/models/project.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/delivery/models/project.py b/delivery/models/project.py index 86bc7ef..dc36b85 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -331,6 +331,18 @@ def release(self, deadline=None, email=True): yield self.dds_service.external_program_service. \ wait_for_execution(execution) + def complete(self): + """ + Set project status to completed in the database + """ + assert not self.has_ongoing_puts(), \ + "Cannot complete project while uploads are ongoing" + + dds_delivery = self.dds_service.dds_delivery_repo \ + .get_dds_delivery(self.project_id) + dds_delivery.delivery_status = DeliveryStatus.delivery_successful + self.dds_service.dds_delivery_repo.session.commit() + # The code below will be removed once the new delivery flow is in place @gen.coroutine def deliver( From b964d2c84d5f793e2bee1b8dcf9a7f2d883a4cac Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 8 Aug 2023 14:03:13 +0200 Subject: [PATCH 19/27] Add integration tests --- .../integration_tests/test_integration_dds.py | 141 ++++++++++++++++++ tests/unit_tests/services/test_dds.py | 1 + 2 files changed, 142 insertions(+) diff --git a/tests/integration_tests/test_integration_dds.py b/tests/integration_tests/test_integration_dds.py index 5bdaced..871c3b6 100644 --- a/tests/integration_tests/test_integration_dds.py +++ b/tests/integration_tests/test_integration_dds.py @@ -1,4 +1,5 @@ import json +import pathlib import time import tempfile @@ -415,6 +416,146 @@ def test_can_delivery_data_asynchronously(self): == DeliveryStatus.delivery_failed.name): raise Exception("Delivery failed") + @gen_test + def test_can_deliver_project_dir_stage_free(self): + with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = pathlib.Path(tmp_dir).name + url = "/".join([self.API_BASE, "deliver", "project", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + response_body = json.loads(response.body) + status_link = response_body["status_link"] + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + @gen_test + def test_cannot_deliver_project_twice(self): + with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = pathlib.Path(tmp_dir).name + url = "/".join([self.API_BASE, "deliver", "project", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + response_body = json.loads(response.body) + status_link = response_body["status_link"] + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload), + raise_error=False, + ) + self.assertEqual(response.code, 403) + + @gen_test + def test_can_deliver_project_twice_with_force(self): + with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = pathlib.Path(tmp_dir).name + url = "/".join([self.API_BASE, "deliver", "project", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + response_body = json.loads(response.body) + status_link = response_body["status_link"] + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + "force_delivery": True, + } + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + self.assertEqual(response.code, 202) + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + @gen_test + def test_getting_unknown_status_returns_not_found(self): + url = "/".join([self.API_BASE, "deliver", "status", "snpseq00000"]) + response = yield self.http_client.fetch( + self.get_url(url), + raise_error=False, + ) + self.assertEqual(response.code, 404) + class TestIntegrationDDSLongWait(BaseIntegration): def __init__(self, *args): diff --git a/tests/unit_tests/services/test_dds.py b/tests/unit_tests/services/test_dds.py index fe98cef..5b2d81a 100644 --- a/tests/unit_tests/services/test_dds.py +++ b/tests/unit_tests/services/test_dds.py @@ -40,6 +40,7 @@ def wait_as_coroutine(x): self.mock_dds_delivery_repo = MagicMock() self.mock_dds_put_repo = MagicMock() + self.mock_dds_put_repo.get_dds_put_by_status.return_value = None self.delivery_order = DeliveryOrder( id=1, From 22735313abbdd7de32e71fc6440b71f34fedc7df Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Wed, 9 Aug 2023 14:23:25 +0200 Subject: [PATCH 20/27] Add documentation --- delivery/handlers/delivery_handlers.py | 33 ++++++ delivery/models/db_models.py | 1 + delivery/models/project.py | 22 +++- delivery/repositories/dds_repository.py | 129 +++++++++++++++++++++++- 4 files changed, 179 insertions(+), 6 deletions(-) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index f10eff8..ecd35c7 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -26,6 +26,39 @@ def initialize(self, **kwargs): super().initialize(kwargs) async def post(self, project_name): + """ + Deliver a project (represented by a directory under the + `general_project_directory` path defined in the configuration). This + will create a new project in DDS, upload the data and release the + project. + + The payload can include the following fields: + auth_token: str (required) + token to authenticate in DDS, can be either the token string itself + or a path to the token file. + pi: str (required) + email address the the principal investigator of the project + description: str (required) + description of the project + owners: [str] + email addresses of the people who are to be set as owners of the + project. + researchers: [str] + email addresses of the people who are to be set as researchers in + the project. + project_alias: str + name of the directory containing the project in case it is + different from the project name + force_delivery: bool + enforce delivery, regardless if the data has been delivered before + or not. + deadline: int + number of days when the user will be able to download the data + (otherwise the value defined in the DDS aggreement will be used). + email: bool + whether or not an email should be sent to the user when the project + is *released* (default is true). + """ required_members = [ "auth_token", "pi", diff --git a/delivery/models/db_models.py b/delivery/models/db_models.py index a32ee6f..b16d365 100644 --- a/delivery/models/db_models.py +++ b/delivery/models/db_models.py @@ -164,6 +164,7 @@ def __repr__(self): " }" ) + class DDSPut(SQLAlchemyBase): __tablename__ = "dds_puts" diff --git a/delivery/models/project.py b/delivery/models/project.py index dc36b85..155c1a1 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -1,3 +1,4 @@ +import datetime import os import re import json @@ -262,6 +263,21 @@ def put( source_path, destination=None, ): + """ + Upload source to the DDS project. + + Parameters + ---------- + source: str + unique identifier to the folder being uploaded (e.g. project name + or runfolder name). This is used to avoid delivering the same item + twice. + source_path: str + path to the data to upload + destination: str + path where to upload the data (will be uploaded to the project's + root by default) + """ assert not self.has_ongoing_puts(), \ "Only one upload is permitted at a time" @@ -293,6 +309,7 @@ def put( yield self.dds_service.external_program_service \ .wait_for_execution(execution) dds_put.delivery_status = DeliveryStatus.delivery_successful + dds_put.date_completed = datetime.datetime.now() except RuntimeError: dds_put.delivery_status = DeliveryStatus.delivery_failed raise @@ -338,10 +355,7 @@ def complete(self): assert not self.has_ongoing_puts(), \ "Cannot complete project while uploads are ongoing" - dds_delivery = self.dds_service.dds_delivery_repo \ - .get_dds_delivery(self.project_id) - dds_delivery.delivery_status = DeliveryStatus.delivery_successful - self.dds_service.dds_delivery_repo.session.commit() + self.dds_service.dds_delivery_repo.set_to_completed(self.project_id) # The code below will be removed once the new delivery flow is in place @gen.coroutine diff --git a/delivery/repositories/dds_repository.py b/delivery/repositories/dds_repository.py index db4b6eb..bee378f 100644 --- a/delivery/repositories/dds_repository.py +++ b/delivery/repositories/dds_repository.py @@ -19,6 +19,17 @@ def _get_row(self, primary_key): raise NotImplementedError def set_to_completed(self, primary_key, date_completed=None): + """ + Set status to `delivery_successful` and set date of completion. + + Parameters + ---------- + primary_key: str + id to row in database + date_completed: datetime + date of completion, if None, the current time and date will be + used. + """ if not date_completed: date_completed = datetime.datetime.now() @@ -30,6 +41,16 @@ def set_to_completed(self, primary_key, date_completed=None): self.session.commit() def update_status(self, primary_key, new_status): + """ + Update status of the given delivery + + Parameters + ---------- + primary_key: str + id to row in database + new_status: DeliveryStatus + new delivery status + """ row = self._get_row(primary_key) row.delivery_status = new_status self.session.commit() @@ -49,6 +70,26 @@ def register_dds_delivery( date_started=None, delivery_status=None, ): + """ + Add a new delivery to the database. By default, starting date will be + set to the current date and delivery status will be set to + `delivery_in_progress`. + + Parameters + ---------- + dds_project_id: str (e.g. snpseq00000) + project id provided by dds, must be unique + ngi_project_name: str (e.g. AB-1234) + name of the project (typically project name in Clarity) + date_started: datetime + date when the delivery was started + delivery_status: DeliveryStatus + Initial delivery status + + Returns + ------- + DDSDelivery + """ if not date_started: date_started = datetime.datetime.now() @@ -68,6 +109,18 @@ def register_dds_delivery( return dds_delivery def get_dds_delivery(self, dds_project_id): + """ + Get a delivery by dds project id from the database + + Parameters + ---------- + dds_project_id: str (e.g. snpseq00000) + id of the project + + Returns + ------- + DDSDelivery + """ return self._get_row(dds_project_id) @@ -85,6 +138,35 @@ def register_dds_put( date_started=None, delivery_status=None, ): + """ + Register a new upload in the database. By default, starting date will + be set to the current date and delivery status will be set to + `delivery_in_progress`. + + Parameters + ---------- + dds_project_id: str (e.g. snpseq00000) + id of the project the data is uploaded to + dds_pid: int + id of the dds process uploading the data + delivery_source: str + unique identifier to the folder being uploaded (e.g. project name + or runfolder name). This is used to avoid delivering the same item + twice. + delivery_path: str + path to the data being uploaded + destination: str + path where to upload the data in the dds project + date_started: datetime + date when the upload was started (by default, will use the current + time) + delivery_status: DeliveryStatus + Initial status of the upload + + Returns + ------- + DDSPut + """ if not date_started: date_started = datetime.datetime.now() @@ -106,10 +188,39 @@ def register_dds_put( return dds_put - def get_dds_put(self, row_id): - return self._get_row(row_id) + def get_dds_put(self, primary_key): + """ + Return a dds put entry from the database + + Parameters + ---------- + primary_key: int + id of the entry + + Returns + ------- + DDSPut + """ + return self._get_row(primary_key) def was_delivered_before(self, ngi_project_name, source): + """ + Returns if a source belonging to a specific project has been delivered + before. + + Parameters + ---------- + ngi_project_name: str (e.g. AB-1234) + project name + source: str + unique identifier to the folder being uploaded (e.g. project name + or runfolder name). This is used to avoid delivering the same item + twice. + + Returns + ------- + bool + """ return self.session.query(DDSPut) \ .join(DDSDelivery) \ .filter(DDSDelivery.ngi_project_name == ngi_project_name) \ @@ -117,6 +228,20 @@ def was_delivered_before(self, ngi_project_name, source): .first() is not None def get_dds_put_by_status(self, dds_project_id, delivery_status): + """ + Returns all uploads to a specific project with the given status + + Parameters + ---------- + dds_project_id: str (e.g. snpseq00000) + dds project where the uploads were made + delivery_status: DeliveryStatus + status to filter by + + Returns + ------- + [DDSPut] + """ return self.session.query(DDSPut) \ .join(DDSDelivery) \ .filter(DDSDelivery.dds_project_id == dds_project_id) \ From 81032d7255b6ce4b138fb1b89b92e7c3fc73aa26 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Wed, 9 Aug 2023 16:40:06 +0200 Subject: [PATCH 21/27] Test with and without project alias --- delivery/handlers/delivery_handlers.py | 14 +- .../integration_tests/test_integration_dds.py | 377 ++++++++++++------ 2 files changed, 275 insertions(+), 116 deletions(-) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index ecd35c7..e0593da 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -2,6 +2,7 @@ import os import json import logging +import pathlib import re import tempfile @@ -82,9 +83,15 @@ async def post(self, project_name): force_delivery = request_data.get("force_delivery", False) + project_path = self.general_project_repo.get_project( + project_metadata.get("project_alias", project_name)).path + source = pathlib.Path(project_path).name + if ( self.dds_service.dds_put_repo - .was_delivered_before(project_name, project_name) + .was_delivered_before( + project_name, + source) and not force_delivery ): self.set_status( @@ -94,9 +101,6 @@ async def post(self, project_name): ) return - project_path = self.general_project_repo.get_project( - project_metadata.get("project_alias", project_name)).path - dds_project = await DDSProject.new( project_name, project_metadata, @@ -119,7 +123,7 @@ async def post(self, project_name): }) self.finish() - await dds_project.put(project_name, project_path) + await dds_project.put(source, project_path) log.info(f"Uploaded {project_path} to {dds_project.project_id}") if request_data.get("release"): diff --git a/tests/integration_tests/test_integration_dds.py b/tests/integration_tests/test_integration_dds.py index 871c3b6..42d841b 100644 --- a/tests/integration_tests/test_integration_dds.py +++ b/tests/integration_tests/test_integration_dds.py @@ -275,6 +275,267 @@ def test_can_create_two_projects(self): self.assertNotEqual(dds_project_id1, dds_project_id2) + @gen_test + def test_can_deliver_project_dir_stage_free(self): + with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = "AB-1235" + url = "/".join([self.API_BASE, "deliver", "project", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + "project_alias": pathlib.Path(tmp_dir).name, + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + response_body = json.loads(response.body) + status_link = response_body["status_link"] + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + @gen_test + def test_cannot_deliver_project_twice(self): + with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = "AB-1238" + url = "/".join([self.API_BASE, "deliver", "project", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + "project_alias": pathlib.Path(tmp_dir).name, + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + response_body = json.loads(response.body) + status_link = response_body["status_link"] + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload), + raise_error=False, + ) + self.assertEqual(response.code, 403) + + @gen_test + def test_can_deliver_project_twice_with_force(self): + with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = "AB-1229" + url = "/".join([self.API_BASE, "deliver", "project", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + "project_alias": pathlib.Path(tmp_dir).name, + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + response_body = json.loads(response.body) + status_link = response_body["status_link"] + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + payload["force_delivery"] = True + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + self.assertEqual(response.code, 202) + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + @gen_test + def test_can_deliver_project_dir_stage_free_no_alias(self): + with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = pathlib.Path(tmp_dir).name + url = "/".join([self.API_BASE, "deliver", "project", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + response_body = json.loads(response.body) + status_link = response_body["status_link"] + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + @gen_test + def test_cannot_deliver_project_twice_no_alias(self): + with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = pathlib.Path(tmp_dir).name + url = "/".join([self.API_BASE, "deliver", "project", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + response_body = json.loads(response.body) + status_link = response_body["status_link"] + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload), + raise_error=False, + ) + self.assertEqual(response.code, 403) + + @gen_test + def test_can_deliver_project_twice_with_force_no_alias(self): + with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: + project_name = pathlib.Path(tmp_dir).name + url = "/".join([self.API_BASE, "deliver", "project", project_name]) + payload = { + "description": "Dummy project", + "pi": "alex@doe.com", + "researchers": ["robin@doe.com", "kim@doe.com"], + "owners": ["alex@doe.com"], + "auth_token": '1234', + } + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + + self.assertEqual(response.code, 202) + response_body = json.loads(response.body) + status_link = response_body["status_link"] + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + payload["force_delivery"] = True + + response = yield self.http_client.fetch( + self.get_url(url), method='POST', + body=json.dumps(payload)) + self.assertEqual(response.code, 202) + + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + + @gen_test + def test_getting_unknown_status_returns_not_found(self): + url = "/".join([self.API_BASE, "deliver", "status", "snpseq00000"]) + response = yield self.http_client.fetch( + self.get_url(url), + raise_error=False, + ) + self.assertEqual(response.code, 404) class TestIntegrationDDSShortWait(BaseIntegration): def __init__(self, *args): @@ -292,7 +553,7 @@ def test_mock_duration_is_2(self): self._create_projects_dir_with_random_data(tmp_dir) self._create_checksums_file(tmp_dir) - project_name = "AB-1234" + project_name = "AB-1230" url = "/".join([self.API_BASE, "dds_project", "create", project_name]) payload = { "description": "Dummy project", @@ -360,7 +621,7 @@ def test_can_delivery_data_asynchronously(self): self._create_projects_dir_with_random_data(tmp_dir) self._create_checksums_file(tmp_dir) - project_name = "AB-1234" + project_name = "AB-1233" url = "/".join([self.API_BASE, "dds_project", "create", project_name]) payload = { "description": "Dummy project", @@ -417,82 +678,9 @@ def test_can_delivery_data_asynchronously(self): raise Exception("Delivery failed") @gen_test - def test_can_deliver_project_dir_stage_free(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: - project_name = pathlib.Path(tmp_dir).name - url = "/".join([self.API_BASE, "deliver", "project", project_name]) - payload = { - "description": "Dummy project", - "pi": "alex@doe.com", - "researchers": ["robin@doe.com", "kim@doe.com"], - "owners": ["alex@doe.com"], - "auth_token": '1234', - } - - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload)) - - self.assertEqual(response.code, 202) - response_body = json.loads(response.body) - status_link = response_body["status_link"] - - while True: - status_response = yield self.http_client.fetch( - status_link) - self.assertEqual(status_response.code, 200) - if (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_successful.name): - break - elif (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_failed.name): - raise Exception("Delivery failed") - time.sleep(1) - - @gen_test - def test_cannot_deliver_project_twice(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: - project_name = pathlib.Path(tmp_dir).name - url = "/".join([self.API_BASE, "deliver", "project", project_name]) - payload = { - "description": "Dummy project", - "pi": "alex@doe.com", - "researchers": ["robin@doe.com", "kim@doe.com"], - "owners": ["alex@doe.com"], - "auth_token": '1234', - } - - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload)) - - self.assertEqual(response.code, 202) - response_body = json.loads(response.body) - status_link = response_body["status_link"] - - while True: - status_response = yield self.http_client.fetch( - status_link) - self.assertEqual(status_response.code, 200) - if (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_successful.name): - break - elif (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_failed.name): - raise Exception("Delivery failed") - time.sleep(1) - - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload), - raise_error=False, - ) - self.assertEqual(response.code, 403) - - @gen_test - def test_can_deliver_project_twice_with_force(self): + def test_can_deliver_project_dir_stage_free_short_wait(self): with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: - project_name = pathlib.Path(tmp_dir).name + project_name = "AB-1235" url = "/".join([self.API_BASE, "deliver", "project", project_name]) payload = { "description": "Dummy project", @@ -500,6 +688,7 @@ def test_can_deliver_project_twice_with_force(self): "researchers": ["robin@doe.com", "kim@doe.com"], "owners": ["alex@doe.com"], "auth_token": '1234', + "project_alias": pathlib.Path(tmp_dir).name, } response = yield self.http_client.fetch( @@ -522,40 +711,6 @@ def test_can_deliver_project_twice_with_force(self): raise Exception("Delivery failed") time.sleep(1) - payload = { - "description": "Dummy project", - "pi": "alex@doe.com", - "researchers": ["robin@doe.com", "kim@doe.com"], - "owners": ["alex@doe.com"], - "auth_token": '1234', - "force_delivery": True, - } - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload)) - self.assertEqual(response.code, 202) - - while True: - status_response = yield self.http_client.fetch( - status_link) - self.assertEqual(status_response.code, 200) - if (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_successful.name): - break - elif (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_failed.name): - raise Exception("Delivery failed") - time.sleep(1) - - @gen_test - def test_getting_unknown_status_returns_not_found(self): - url = "/".join([self.API_BASE, "deliver", "status", "snpseq00000"]) - response = yield self.http_client.fetch( - self.get_url(url), - raise_error=False, - ) - self.assertEqual(response.code, 404) - class TestIntegrationDDSLongWait(BaseIntegration): def __init__(self, *args): From cb1da138e39031646070fb2fb9530d6ad4286595 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Thu, 10 Aug 2023 14:00:55 +0200 Subject: [PATCH 22/27] Make sure project cannot be delivered twice unless forced --- delivery/handlers/delivery_handlers.py | 15 +- delivery/services/delivery_service.py | 4 +- .../integration_tests/test_integration_dds.py | 250 ++++++------------ .../services/test_delivery_service.py | 4 +- 4 files changed, 103 insertions(+), 170 deletions(-) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index e0593da..9f672f7 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -11,6 +11,7 @@ from delivery.handlers import * from delivery.handlers.utility_handlers import ArteriaDeliveryBaseHandler from delivery.models.project import DDSProject +from delivery.models.db_models import DeliverySource log = logging.getLogger(__name__) @@ -24,6 +25,7 @@ class DeliverProjectHandler(ArteriaDeliveryBaseHandler): def initialize(self, **kwargs): self.dds_service = kwargs["dds_service"] self.general_project_repo = kwargs["general_project_repo"] + self.delivery_sources_repo = kwargs["delivery_service"].delivery_sources_repo super().initialize(kwargs) async def post(self, project_name): @@ -87,11 +89,16 @@ async def post(self, project_name): project_metadata.get("project_alias", project_name)).path source = pathlib.Path(project_path).name + was_delivered_new_route = self.dds_service.dds_put_repo \ + .was_delivered_before(project_name, source) + was_delivered_old_route = self.delivery_sources_repo \ + .source_exists(DeliverySource( + project_name=project_name, + source_name=source, + path=project_path) + ) if ( - self.dds_service.dds_put_repo - .was_delivered_before( - project_name, - source) + (was_delivered_new_route or was_delivered_old_route) and not force_delivery ): self.set_status( diff --git a/delivery/services/delivery_service.py b/delivery/services/delivery_service.py index d09c968..a00ed6d 100644 --- a/delivery/services/delivery_service.py +++ b/delivery/services/delivery_service.py @@ -31,10 +31,12 @@ def __init__(self, def _validate_source_and_add_to_repo(self, source, force_delivery, path): source_exists = self.delivery_sources_repo.source_exists(source) + source_exists_new_db = self.dds_service.dds_put_repo \ + .was_delivered_before(source.project_name, source.source_name) # If such a Delivery source exists, only proceed if # override is activated - if source_exists and not force_delivery: + if (source_exists or source_exists_new_db) and not force_delivery: raise ProjectAlreadyDeliveredException( "Project source {} has already been delivered.".format(source)) elif source_exists and force_delivery: diff --git a/tests/integration_tests/test_integration_dds.py b/tests/integration_tests/test_integration_dds.py index 42d841b..a93c6df 100644 --- a/tests/integration_tests/test_integration_dds.py +++ b/tests/integration_tests/test_integration_dds.py @@ -4,6 +4,7 @@ import tempfile from tornado.testing import * +from tornado import gen from delivery.models.db_models import StagingStatus, DeliveryStatus @@ -297,99 +298,6 @@ def test_can_deliver_project_dir_stage_free(self): response_body = json.loads(response.body) status_link = response_body["status_link"] - while True: - status_response = yield self.http_client.fetch( - status_link) - self.assertEqual(status_response.code, 200) - if (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_successful.name): - break - elif (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_failed.name): - raise Exception("Delivery failed") - time.sleep(1) - @gen_test - def test_cannot_deliver_project_twice(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: - project_name = "AB-1238" - url = "/".join([self.API_BASE, "deliver", "project", project_name]) - payload = { - "description": "Dummy project", - "pi": "alex@doe.com", - "researchers": ["robin@doe.com", "kim@doe.com"], - "owners": ["alex@doe.com"], - "auth_token": '1234', - "project_alias": pathlib.Path(tmp_dir).name, - } - - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload)) - - self.assertEqual(response.code, 202) - response_body = json.loads(response.body) - status_link = response_body["status_link"] - - while True: - status_response = yield self.http_client.fetch( - status_link) - self.assertEqual(status_response.code, 200) - if (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_successful.name): - break - elif (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_failed.name): - raise Exception("Delivery failed") - time.sleep(1) - - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload), - raise_error=False, - ) - self.assertEqual(response.code, 403) - - @gen_test - def test_can_deliver_project_twice_with_force(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: - project_name = "AB-1229" - url = "/".join([self.API_BASE, "deliver", "project", project_name]) - payload = { - "description": "Dummy project", - "pi": "alex@doe.com", - "researchers": ["robin@doe.com", "kim@doe.com"], - "owners": ["alex@doe.com"], - "auth_token": '1234', - "project_alias": pathlib.Path(tmp_dir).name, - } - - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload)) - - self.assertEqual(response.code, 202) - response_body = json.loads(response.body) - status_link = response_body["status_link"] - - while True: - status_response = yield self.http_client.fetch( - status_link) - self.assertEqual(status_response.code, 200) - if (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_successful.name): - break - elif (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_failed.name): - raise Exception("Delivery failed") - time.sleep(1) - - payload["force_delivery"] = True - - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload)) - self.assertEqual(response.code, 202) - while True: status_response = yield self.http_client.fetch( status_link) @@ -435,98 +343,112 @@ def test_can_deliver_project_dir_stage_free_no_alias(self): raise Exception("Delivery failed") time.sleep(1) - @gen_test - def test_cannot_deliver_project_twice_no_alias(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: - project_name = pathlib.Path(tmp_dir).name - url = "/".join([self.API_BASE, "deliver", "project", project_name]) - payload = { - "description": "Dummy project", - "pi": "alex@doe.com", - "researchers": ["robin@doe.com", "kim@doe.com"], - "owners": ["alex@doe.com"], - "auth_token": '1234', - } + @gen_test(timeout=20) + def test_double_project_delivery_protection(self): + @gen.coroutine + def deliver_project_old_route(project_name, project_path, force=False): + + dir_name = project_path.name + + payload = {"force_delivery": force} + if project_name: + url = "/".join( + [self.API_BASE, "stage", "project", project_name]) + payload["project_alias"] = project_path.name + else: + url = "/".join([self.API_BASE, "stage", "project", dir_name]) response = yield self.http_client.fetch( self.get_url(url), method='POST', - body=json.dumps(payload)) + body=json.dumps(payload), + raise_error=False) - self.assertEqual(response.code, 202) - response_body = json.loads(response.body) - status_link = response_body["status_link"] + # Wait for staging to complete before returning + if response.code == 202: + response_json = json.loads(response.body) + staging_status_links = response_json.get("staging_order_links") - while True: - status_response = yield self.http_client.fetch( - status_link) - self.assertEqual(status_response.code, 200) - if (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_successful.name): - break - elif (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_failed.name): - raise Exception("Delivery failed") - time.sleep(1) + for project, link in staging_status_links.items(): + status_response = yield self.http_client.fetch(link) + self.assertEqual( + json.loads(status_response.body)["status"], + StagingStatus.staging_successful.name + ) - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload), - raise_error=False, - ) - self.assertEqual(response.code, 403) + return response - @gen_test - def test_can_deliver_project_twice_with_force_no_alias(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: - project_name = pathlib.Path(tmp_dir).name - url = "/".join([self.API_BASE, "deliver", "project", project_name]) + @gen.coroutine + def deliver_project_new_route(project_name, project_path, force=False): payload = { "description": "Dummy project", "pi": "alex@doe.com", "researchers": ["robin@doe.com", "kim@doe.com"], "owners": ["alex@doe.com"], "auth_token": '1234', + "force_delivery": force, } + if project_name: + url = "/".join( + [self.API_BASE, "deliver", "project", project_name]) + payload["project_alias"] = project_path.name + else: + url = "/".join( + [self.API_BASE, "deliver", "project", project_path.name]) + response = yield self.http_client.fetch( self.get_url(url), method='POST', - body=json.dumps(payload)) + body=json.dumps(payload), + raise_error=False, + ) - self.assertEqual(response.code, 202) - response_body = json.loads(response.body) - status_link = response_body["status_link"] + # Wait for staging to complete before returning + if response.code == 202: + response_body = json.loads(response.body) + status_link = response_body["status_link"] - while True: - status_response = yield self.http_client.fetch( - status_link) - self.assertEqual(status_response.code, 200) - if (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_successful.name): - break - elif (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_failed.name): - raise Exception("Delivery failed") - time.sleep(1) - - payload["force_delivery"] = True - - response = yield self.http_client.fetch( - self.get_url(url), method='POST', - body=json.dumps(payload)) - self.assertEqual(response.code, 202) + while True: + status_response = yield self.http_client.fetch( + status_link) + self.assertEqual(status_response.code, 200) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + time.sleep(1) + + return response + + delivery_methods = [ + deliver_project_old_route, + deliver_project_new_route, + ] + + for (i, (first_delivery, second_delivery, project_name)) in enumerate([ + (first_delivery, second_delivery, project_name) + for first_delivery in delivery_methods + for second_delivery in delivery_methods + for project_name in ["AB-{:04d}", None] + ]): + with tempfile.TemporaryDirectory( + dir='./tests/resources/projects') as tmp_dir: + if project_name: + project_name = project_name.format(i) + project_path = pathlib.Path(tmp_dir) + + response = yield first_delivery( + project_name, project_path) + self.assertEqual(response.code, 202) - while True: - status_response = yield self.http_client.fetch( - status_link) - self.assertEqual(status_response.code, 200) - if (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_successful.name): - break - elif (json.loads(status_response.body)["status"] - == DeliveryStatus.delivery_failed.name): - raise Exception("Delivery failed") - time.sleep(1) + response = yield second_delivery( + project_name, project_path) + self.assertEqual(response.code, 403) + response = yield second_delivery( + project_name, project_path, force=True) + self.assertEqual(response.code, 202) @gen_test def test_getting_unknown_status_returns_not_found(self): diff --git a/tests/unit_tests/services/test_delivery_service.py b/tests/unit_tests/services/test_delivery_service.py index c30b3a4..c52ccc3 100644 --- a/tests/unit_tests/services/test_delivery_service.py +++ b/tests/unit_tests/services/test_delivery_service.py @@ -37,7 +37,7 @@ class TestDeliveryService(unittest.TestCase): def _compose_delivery_service( self, - dds_delivery_service=mock.create_autospec(DDSService), + dds_delivery_service=mock.MagicMock(), staging_service=mock.create_autospec(StagingService), delivery_sources_repo=mock.create_autospec( DatabaseBasedDeliverySourcesRepository), @@ -53,6 +53,8 @@ def _compose_delivery_service( runfolder_service = runfolder_service self.project_links_dir = project_links_dir + dds_delivery_service.dds_put_repo.was_delivered_before.return_value = False + self.delivery_service = DeliveryService( dds_service=dds_delivery_service, staging_service=self.staging_service, From 3028ed8fabfdde7549504812f716decbd32d6407 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 22 Aug 2023 08:15:19 +0200 Subject: [PATCH 23/27] Fix documentation typo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Matilda Åslin --- delivery/handlers/delivery_handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index 9f672f7..c299e43 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -18,7 +18,7 @@ class DeliverProjectHandler(ArteriaDeliveryBaseHandler): """ - Handler for delivering a project a project (i.e. a directory placed in the + Handler for delivering a project (i.e. a directory placed in the directory defined by the arteria delivery service `general_project_directory` configuration). """ From c16605519f88fdc94d8b621d67f9ce9c3b46a410 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 22 Aug 2023 08:17:04 +0200 Subject: [PATCH 24/27] Fix db model representation --- delivery/models/db_models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/delivery/models/db_models.py b/delivery/models/db_models.py index b16d365..d14792f 100644 --- a/delivery/models/db_models.py +++ b/delivery/models/db_models.py @@ -156,7 +156,7 @@ class DDSDelivery(SQLAlchemyBase): def __repr__(self): return ( "DDS Delivery: {" - f"dds_project: '{self.dds_project}', " + f"dds_project_id: '{self.dds_project}', " f"ngi_project_name: '{self.ngi_project_name}', " f"date_started: {self.date_started}, " f"date_completed: {self.date_completed}, " @@ -188,7 +188,7 @@ def __repr__(self): return ( "DDS Put: {" f"id: {self.id}," - f"dds_project: '{self.dds_project}', " + f"dds_project_id: '{self.dds_project}', " f"dds_pid: {self.dds_pid}, " f"delivery_source: '{self.delivery_source}', " f"delivery_path: '{self.delivery_path}', " From 3d193a5dfc0d0ac25e5408ea1e9e2d60356a682d Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 22 Aug 2023 08:54:05 +0200 Subject: [PATCH 25/27] Revert "Implement DDSPut.is_running" This reverts commit e7a735b34ed1433f510a1b1bb168c26f123af287. Turns out I didn't need this function in the end. --- delivery/models/db_models.py | 17 ----------------- requirements/prod | 1 - 2 files changed, 18 deletions(-) diff --git a/delivery/models/db_models.py b/delivery/models/db_models.py index d14792f..69db1ae 100644 --- a/delivery/models/db_models.py +++ b/delivery/models/db_models.py @@ -1,8 +1,6 @@ import os -import datetime import enum as base_enum -import psutil from sqlalchemy import Column, ForeignKey from sqlalchemy import Integer, BigInteger, String, Enum, DateTime @@ -198,18 +196,3 @@ def __repr__(self): f"delivery_status: {self.delivery_status}, " " }" ) - - def is_running(self): - try: - process = psutil.Process(self.dds_pid) - date_created = datetime.datetime.fromtimestamp( - process.create_time()) - return ( - process.name() == 'dds' - and ( - abs(date_created - self.date_started) - < datetime.timedelta(minutes=1) - ) - ) - except psutil.NoSuchProcess: - return False diff --git a/requirements/prod b/requirements/prod index bbb00cd..fbec63a 100644 --- a/requirements/prod +++ b/requirements/prod @@ -5,5 +5,4 @@ sqlalchemy==1.4.35 alembic==1.7.7 enum34==1.1.10 arteria==1.1.4 -psutil==5.9.4 dds-cli From da03f20ce1e2507b88d9ac7e869be491f1ac1a10 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 22 Aug 2023 09:00:36 +0200 Subject: [PATCH 26/27] Add forgotten class documentation --- delivery/repositories/dds_repository.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/delivery/repositories/dds_repository.py b/delivery/repositories/dds_repository.py index bee378f..5fb4e5a 100644 --- a/delivery/repositories/dds_repository.py +++ b/delivery/repositories/dds_repository.py @@ -125,6 +125,9 @@ def get_dds_delivery(self, dds_project_id): class DatabaseBasedDDSPutRepository(DatabaseBasedDDSRepository): + """ + Class to manipulate the DDSPut database. + """ def _get_row(self, primary_key): return self.session.get(DDSPut, primary_key) From 1edc611aa49eb57b44c2702c96aa6793aa0a56ed Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Tue, 22 Aug 2023 09:52:10 +0200 Subject: [PATCH 27/27] Rename `delivery_status` into `status` --- ...e3ecb_add_ddsdelivery_and_ddsput_models.py | 4 +-- delivery/handlers/delivery_handlers.py | 2 +- delivery/models/db_models.py | 8 ++--- delivery/models/project.py | 4 +-- delivery/repositories/dds_repository.py | 30 +++++++++---------- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/alembic/versions/69ebc95e3ecb_add_ddsdelivery_and_ddsput_models.py b/alembic/versions/69ebc95e3ecb_add_ddsdelivery_and_ddsput_models.py index 2ac482c..fe5caa9 100644 --- a/alembic/versions/69ebc95e3ecb_add_ddsdelivery_and_ddsput_models.py +++ b/alembic/versions/69ebc95e3ecb_add_ddsdelivery_and_ddsput_models.py @@ -22,7 +22,7 @@ def upgrade(): sa.Column('ngi_project_name', sa.String), sa.Column('date_started', sa.DateTime, nullable=False), sa.Column('date_completed', sa.DateTime), - sa.Column('delivery_status', sa.Enum( + sa.Column('status', sa.Enum( 'pending', 'delivery_in_progress', 'delivery_successful', @@ -46,7 +46,7 @@ def upgrade(): sa.Column('destination', sa.String), sa.Column('date_started', sa.DateTime, nullable=False), sa.Column('date_completed', sa.DateTime), - sa.Column('delivery_status', sa.Enum( + sa.Column('status', sa.Enum( 'pending', 'delivery_in_progress', 'delivery_successful', diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index c299e43..64cd06b 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -222,7 +222,7 @@ def get(self, delivery_order_id): try: body = { 'id': delivery_order_id, - 'status': delivery_project.get_db_entry().delivery_status.name, + 'status': delivery_project.get_db_entry().status.name, } except AttributeError: self.set_status(NOT_FOUND) diff --git a/delivery/models/db_models.py b/delivery/models/db_models.py index 69db1ae..64880fe 100644 --- a/delivery/models/db_models.py +++ b/delivery/models/db_models.py @@ -149,7 +149,7 @@ class DDSDelivery(SQLAlchemyBase): date_started = Column(DateTime, nullable=False) date_completed = Column(DateTime) - delivery_status = Column(Enum(DeliveryStatus), nullable=False) + status = Column(Enum(DeliveryStatus), nullable=False) def __repr__(self): return ( @@ -158,7 +158,7 @@ def __repr__(self): f"ngi_project_name: '{self.ngi_project_name}', " f"date_started: {self.date_started}, " f"date_completed: {self.date_completed}, " - f"delivery_status: {self.delivery_status}, " + f"status: {self.status}, " " }" ) @@ -180,7 +180,7 @@ class DDSPut(SQLAlchemyBase): date_started = Column(DateTime, nullable=False) date_completed = Column(DateTime) - delivery_status = Column(Enum(DeliveryStatus), nullable=False) + status = Column(Enum(DeliveryStatus), nullable=False) def __repr__(self): return ( @@ -193,6 +193,6 @@ def __repr__(self): f"destination: '{self.destination}', " f"date_started: {self.date_started}, " f"date_completed: {self.date_completed}, " - f"delivery_status: {self.delivery_status}, " + f"status: {self.status}, " " }" ) diff --git a/delivery/models/project.py b/delivery/models/project.py index 155c1a1..f9ea957 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -308,10 +308,10 @@ def put( try: yield self.dds_service.external_program_service \ .wait_for_execution(execution) - dds_put.delivery_status = DeliveryStatus.delivery_successful + dds_put.status = DeliveryStatus.delivery_successful dds_put.date_completed = datetime.datetime.now() except RuntimeError: - dds_put.delivery_status = DeliveryStatus.delivery_failed + dds_put.status = DeliveryStatus.delivery_failed raise finally: self.dds_service.dds_put_repo.session.commit() diff --git a/delivery/repositories/dds_repository.py b/delivery/repositories/dds_repository.py index 5fb4e5a..99d9521 100644 --- a/delivery/repositories/dds_repository.py +++ b/delivery/repositories/dds_repository.py @@ -35,7 +35,7 @@ def set_to_completed(self, primary_key, date_completed=None): row = self._get_row(primary_key) - row.delivery_status = DeliveryStatus.delivery_successful + row.status = DeliveryStatus.delivery_successful row.date_completed = date_completed self.session.commit() @@ -52,7 +52,7 @@ def update_status(self, primary_key, new_status): new delivery status """ row = self._get_row(primary_key) - row.delivery_status = new_status + row.status = new_status self.session.commit() @@ -68,7 +68,7 @@ def register_dds_delivery( dds_project_id, ngi_project_name, date_started=None, - delivery_status=None, + status=None, ): """ Add a new delivery to the database. By default, starting date will be @@ -83,7 +83,7 @@ def register_dds_delivery( name of the project (typically project name in Clarity) date_started: datetime date when the delivery was started - delivery_status: DeliveryStatus + status: DeliveryStatus Initial delivery status Returns @@ -93,14 +93,14 @@ def register_dds_delivery( if not date_started: date_started = datetime.datetime.now() - if not delivery_status: - delivery_status = DeliveryStatus.delivery_in_progress + if not status: + status = DeliveryStatus.delivery_in_progress dds_delivery = DDSDelivery( dds_project_id=dds_project_id, ngi_project_name=ngi_project_name, date_started=date_started, - delivery_status=delivery_status, + status=status, ) self.session.add(dds_delivery) @@ -139,7 +139,7 @@ def register_dds_put( delivery_path, destination=None, date_started=None, - delivery_status=None, + status=None, ): """ Register a new upload in the database. By default, starting date will @@ -163,7 +163,7 @@ def register_dds_put( date_started: datetime date when the upload was started (by default, will use the current time) - delivery_status: DeliveryStatus + status: DeliveryStatus Initial status of the upload Returns @@ -173,8 +173,8 @@ def register_dds_put( if not date_started: date_started = datetime.datetime.now() - if not delivery_status: - delivery_status = DeliveryStatus.delivery_in_progress + if not status: + status = DeliveryStatus.delivery_in_progress dds_put = DDSPut( dds_project_id=dds_project_id, @@ -183,7 +183,7 @@ def register_dds_put( delivery_path=delivery_path, destination=destination, date_started=date_started, - delivery_status=delivery_status, + status=status, ) self.session.add(dds_put) @@ -230,7 +230,7 @@ def was_delivered_before(self, ngi_project_name, source): .filter(DDSPut.delivery_source == source) \ .first() is not None - def get_dds_put_by_status(self, dds_project_id, delivery_status): + def get_dds_put_by_status(self, dds_project_id, status): """ Returns all uploads to a specific project with the given status @@ -238,7 +238,7 @@ def get_dds_put_by_status(self, dds_project_id, delivery_status): ---------- dds_project_id: str (e.g. snpseq00000) dds project where the uploads were made - delivery_status: DeliveryStatus + status: DeliveryStatus status to filter by Returns @@ -248,5 +248,5 @@ def get_dds_put_by_status(self, dds_project_id, delivery_status): return self.session.query(DDSPut) \ .join(DDSDelivery) \ .filter(DDSDelivery.dds_project_id == dds_project_id) \ - .filter(DDSPut.delivery_status == delivery_status) \ + .filter(DDSPut.status == status) \ .all()