Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stage-free delivery for project deliveries. #48

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fec56fc
Make "pi" and "description" mandatory
Aratz Mar 6, 2023
f553cf6
Rename `put` into `deliver`
Aratz Mar 6, 2023
9f626a5
Implement database models for DDSPut and DDSDelivery
Aratz Mar 8, 2023
fb3e684
Connect dds repos to dds_service
Aratz Mar 9, 2023
16288e7
Register dds project in db upon creation
Aratz Mar 9, 2023
ebda4a0
Update `get_ngi_project_name` to call db instead
Aratz Mar 9, 2023
641667c
Use `external_program_service` instead of private method _run
Aratz Mar 10, 2023
d5f210e
Implement DDSProject.put
Aratz Mar 10, 2023
e7a735b
Implement DDSPut.is_running
Aratz Mar 10, 2023
324a7b1
Remove `run_and_wait` method
Aratz Mar 20, 2023
6c554a9
Fix integration tests
Aratz Mar 21, 2023
564011d
Add alembic db migration script
Aratz Mar 21, 2023
156530a
Implement new project delivery handler
Aratz Mar 22, 2023
2905775
Reorder methods in DDSProject
Aratz Mar 22, 2023
50a62c0
Add route to new delivery handler
Aratz Aug 8, 2023
18df8ab
Update status to support new delivery method
Aratz Aug 8, 2023
1db11d1
Check only one dds put is running at a time
Aratz Aug 8, 2023
668588c
Add complete method
Aratz Aug 8, 2023
b964d2c
Add integration tests
Aratz Aug 8, 2023
2273531
Add documentation
Aratz Aug 9, 2023
81032d7
Test with and without project alias
Aratz Aug 9, 2023
cb1da13
Make sure project cannot be delivered twice unless forced
Aratz Aug 10, 2023
3028ed8
Fix documentation typo
Aratz Aug 22, 2023
c166055
Fix db model representation
Aratz Aug 22, 2023
3d193a5
Revert "Implement DDSPut.is_running"
Aratz Aug 22, 2023
da03f20
Add forgotten class documentation
Aratz Aug 22, 2023
1edc611
Rename `delivery_status` into `status`
Aratz Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions alembic/versions/69ebc95e3ecb_add_ddsdelivery_and_ddsput_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Add DDSDelivery and DDSPut Models

Revision ID: 69ebc95e3ecb
Revises: 74b309c44134
Create Date: 2023-03-10 16:10:37.844346

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '69ebc95e3ecb'
down_revision = '74b309c44134'
branch_labels = None
depends_on = None

def upgrade():
op.create_table(
'dds_deliveries',
sa.Column('dds_project_id', sa.String, primary_key=True),
sa.Column('ngi_project_name', sa.String),
sa.Column('date_started', sa.DateTime, nullable=False),
sa.Column('date_completed', sa.DateTime),
sa.Column('delivery_status', sa.Enum(
'pending',
'delivery_in_progress',
'delivery_successful',
'delivery_failed',
'delivery_skipped',
name='deliverystatus'
)),
)

op.create_table(
'dds_puts',
sa.Column('id', sa.Integer, primary_key=True, autoincrement=True),
sa.Column(
'dds_project_id',
sa.String,
sa.ForeignKey("dds_deliveries.dds_project_id"),
nullable=False),
sa.Column('dds_pid', sa.Integer, nullable=False),
sa.Column('delivery_source', sa.String, nullable=False),
sa.Column('delivery_path', sa.String, nullable=False),
sa.Column('destination', sa.String),
sa.Column('date_started', sa.DateTime, nullable=False),
sa.Column('date_completed', sa.DateTime),
sa.Column('delivery_status', sa.Enum(
'pending',
'delivery_in_progress',
'delivery_successful',
'delivery_failed',
'delivery_skipped',
name='deliverystatus'
)),
)

def downgrade():
op.drop_table('dds_puts')
op.drop_table('dds_deliveries')
13 changes: 12 additions & 1 deletion delivery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from delivery.handlers.project_handlers import ProjectHandler, ProjectsForRunfolderHandler, \
BestPracticeProjectSampleHandler
from delivery.handlers.dds_handlers import DDSCreateProjectHandler
from delivery.handlers.delivery_handlers import DeliverByStageIdHandler, DeliveryStatusHandler
from delivery.handlers.delivery_handlers import DeliverByStageIdHandler, \
DeliveryStatusHandler, DeliverProjectHandler
from delivery.handlers.staging_handlers import StagingRunfolderHandler, StagingHandler,\
StageGeneralDirectoryHandler, StagingProjectRunfoldersHandler
from delivery.handlers.organise_handlers import OrganiseRunfolderHandler
Expand All @@ -25,6 +26,8 @@
FileSystemBasedUnorganisedRunfolderRepository
from delivery.repositories.staging_repository import DatabaseBasedStagingRepository
from delivery.repositories.deliveries_repository import DatabaseBasedDeliveriesRepository
from delivery.repositories.dds_repository import DatabaseBasedDDSDeliveryRepository, \
DatabaseBasedDDSPutRepository
from delivery.repositories.project_repository import GeneralProjectRepository, UnorganisedRunfolderProjectRepository
from delivery.repositories.delivery_sources_repository import DatabaseBasedDeliverySourcesRepository
from delivery.repositories.sample_repository import RunfolderProjectBasedSampleRepository
Expand Down Expand Up @@ -74,6 +77,8 @@ def routes(**kwargs):

url(r"/api/1.0/deliver/status/(.+)", DeliveryStatusHandler,
name="delivery_status", kwargs=kwargs),
url(r"/api/1.0/deliver/project/(.+)", DeliverProjectHandler,
name="deliver_project", kwargs=kwargs),

url(r"/api/1.0/dds_project/create/(.+)", DDSCreateProjectHandler,
name="create_dds_project", kwargs=kwargs),
Expand Down Expand Up @@ -159,13 +164,19 @@ def _assert_is_dir(directory):

delivery_repo = DatabaseBasedDeliveriesRepository(
session_factory=session_factory)
dds_delivery_repo = DatabaseBasedDDSDeliveryRepository(
session_factory=session_factory)
dds_put_repo = DatabaseBasedDDSPutRepository(
session_factory=session_factory)

dds_conf = config['dds_conf']
dds_service = DDSService(
external_program_service=external_program_service,
staging_service=staging_service,
staging_dir=staging_dir,
delivery_repo=delivery_repo,
dds_delivery_repo=dds_delivery_repo,
dds_put_repo=dds_put_repo,
session_factory=session_factory,
dds_conf=dds_conf)

Expand Down
6 changes: 5 additions & 1 deletion delivery/handlers/dds_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ async def post(self, project_name):
response = requests.request("POST", url, json=payload)
"""

required_members = ["auth_token"]
required_members = [
"auth_token",
"pi",
"description",
]
project_metadata = self.body_as_object(
required_members=required_members)

Expand Down
166 changes: 159 additions & 7 deletions delivery/handlers/delivery_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,147 @@
import os
import json
import logging
import pathlib
import re
import tempfile

from tornado.gen import coroutine

from delivery.handlers import *
from delivery.handlers.utility_handlers import ArteriaDeliveryBaseHandler
from delivery.models.project import DDSProject
from delivery.models.db_models import DeliverySource

log = logging.getLogger(__name__)


class DeliverProjectHandler(ArteriaDeliveryBaseHandler):
"""
Handler for delivering a project a project (i.e. a directory placed in the
Aratz marked this conversation as resolved.
Show resolved Hide resolved
directory defined by the arteria delivery service
`general_project_directory` configuration).
"""
def initialize(self, **kwargs):
self.dds_service = kwargs["dds_service"]
self.general_project_repo = kwargs["general_project_repo"]
self.delivery_sources_repo = kwargs["delivery_service"].delivery_sources_repo
super().initialize(kwargs)

async def post(self, project_name):
"""
Deliver a project (represented by a directory under the
`general_project_directory` path defined in the configuration). This
will create a new project in DDS, upload the data and release the
project.

The payload can include the following fields:
auth_token: str (required)
token to authenticate in DDS, can be either the token string itself
or a path to the token file.
pi: str (required)
email address the the principal investigator of the project
description: str (required)
description of the project
owners: [str]
email addresses of the people who are to be set as owners of the
project.
researchers: [str]
email addresses of the people who are to be set as researchers in
the project.
project_alias: str
name of the directory containing the project in case it is
different from the project name
force_delivery: bool
enforce delivery, regardless if the data has been delivered before
or not.
deadline: int
number of days when the user will be able to download the data
(otherwise the value defined in the DDS aggreement will be used).
email: bool
whether or not an email should be sent to the user when the project
is *released* (default is true).
"""
required_members = [
"auth_token",
"pi",
"description",
]

request_data = self.body_as_object(
required_members=required_members)

project_metadata = {
key: request_data[key]
for key in [
"pi",
"description",
"owners",
"researchers",
"project_alias",
]
if key in request_data
}

force_delivery = request_data.get("force_delivery", False)

project_path = self.general_project_repo.get_project(
project_metadata.get("project_alias", project_name)).path
source = pathlib.Path(project_path).name

was_delivered_new_route = self.dds_service.dds_put_repo \
.was_delivered_before(project_name, source)
was_delivered_old_route = self.delivery_sources_repo \
.source_exists(DeliverySource(
project_name=project_name,
source_name=source,
path=project_path)
)
if (
(was_delivered_new_route or was_delivered_old_route)
and not force_delivery
):
self.set_status(
FORBIDDEN,
f"The project {project_name} has already been delivered. "
"Use the force to bypass and deliver anyway."
)
return

dds_project = await DDSProject.new(
project_name,
project_metadata,
request_data["auth_token"],
self.dds_service)

log.info(
f"New dds project created for project {project_name} "
f"with id {dds_project.project_id}"
)

self.set_status(ACCEPTED)
self.write_json({
'dds_project_id': dds_project.project_id,
'status_link': "{0}://{1}{2}".format(
self.request.protocol,
self.request.host,
self.reverse_url("delivery_status", dds_project.project_id)
)
})
self.finish()

await dds_project.put(source, project_path)
log.info(f"Uploaded {project_path} to {dds_project.project_id}")

if request_data.get("release"):
await dds_project.release(
deadline=request_data.get("deadline", None),
email=request_data.get("email", True),
)
log.info(f"Released project {dds_project.project_id}")

dds_project.complete()


class DeliverByStageIdHandler(ArteriaDeliveryBaseHandler):
"""
Handler for starting deliveries based on a previously staged directory/file
Expand Down Expand Up @@ -50,7 +181,7 @@ def post(self, staging_id):
auth_token,
delivery_project_id)

delivery_id = yield dds_project.put(
delivery_id = yield dds_project.deliver(
staging_id,
skip_delivery=skip_delivery,
deadline=deadline,
Expand All @@ -76,16 +207,37 @@ def initialize(self, **kwargs):

@coroutine
def get(self, delivery_order_id):
delivery_order = self.delivery_service\
.get_delivery_order_by_id(delivery_order_id)
"""
Returns project status.
"""

pattern_dds_project = re.compile(r"snpseq\d+")

delivery_order = yield self.delivery_service.update_delivery_status(
if pattern_dds_project.fullmatch(delivery_order_id):
delivery_project = DDSProject(
self.delivery_service,
"",
delivery_order_id)

body = {
'id': delivery_order.id,
'status': delivery_order.delivery_status.name,
try:
body = {
'id': delivery_order_id,
'status': delivery_project.get_db_entry().delivery_status.name,
}
except AttributeError:
self.set_status(NOT_FOUND)
return
else:
delivery_order = self.delivery_service\
.get_delivery_order_by_id(delivery_order_id)

delivery_order = yield self.delivery_service.update_delivery_status(
delivery_order_id)

body = {
'id': delivery_order.id,
'status': delivery_order.delivery_status.name,
}

self.write_json(body)
self.set_status(OK)
Loading