Skip to content

Commit

Permalink
ref: partially migrate to dcor_schemas.RQJob
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Nov 26, 2024
1 parent 904498f commit bf5222a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
0.16.0
- ref: partially migrate to dcor_schemas.RQJob
- ref: deprecate resource symlinking
0.15.4
- fix: make public resources public on S3 only after dataset is "active"
0.15.3
Expand Down
1 change: 1 addition & 0 deletions ckanext/dcor_depot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def list_all_resources():
click.echo(resource.id)


# TODO: Remove this method (it should not be used in current workflows)
@click.command()
@click.option('--modified-days', default=-1,
help='Only run for datasets modified within this number of days '
Expand Down
23 changes: 20 additions & 3 deletions ckanext/dcor_depot/jobs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import logging
import warnings

from ckan import logic
from dcor_shared import get_resource_path, s3cc, sha256sum, wait_for_resource
from dcor_shared import (
get_resource_path, rqjob_register, s3, s3cc, sha256sum, wait_for_resource)
from dcor_shared import RQJob # noqa: F401

from .orgs import MANUAL_DEPOT_ORGS
from .paths import USER_DEPOT


log = logging.getLogger(__name__)


class NoSHA256Available(UserWarning):
"""Used for missing SHA256 sums"""
pass
Expand All @@ -24,8 +30,16 @@ def patch_resource_noauth(package_id, resource_id, data_dict):
package_revise(context=admin_context(), data_dict=revise_dict)


def migrate_resource_to_s3_job(resource):
@rqjob_register(ckanext="dcor_depot",
queue="dcor-normal",
timeout=3600,
)
def job_migrate_resource_to_s3(resource):
"""Migrate a resource to the S3 object store"""
if not s3.is_available():
log.info("S3 not available, not migrating resource")
return False

performed_upload = False
rid = resource["id"]
# Make sure the resource is available for processing
Expand Down Expand Up @@ -67,8 +81,11 @@ def migrate_resource_to_s3_job(resource):
return performed_upload


def symlink_user_dataset_job(pkg, usr, resource):
# TODO: Remove this method and make sure nothing depends on it.
def job_symlink_user_dataset(pkg, usr, resource):
"""Symlink resource data to human-readable depot"""
warnings.warn("job_symlink_user_dataset should not be used",
DeprecationWarning)
path = get_resource_path(resource["id"])
if not path.exists():
# nothing to do (skip, because resource is on S3 only)
Expand Down
29 changes: 7 additions & 22 deletions ckanext/dcor_depot/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dcor_shared import s3, s3cc

from .cli import get_commands
from .jobs import symlink_user_dataset_job, migrate_resource_to_s3_job
from . import jobs


class DCORDepotPlugin(plugins.SingletonPlugin):
Expand Down Expand Up @@ -39,6 +39,11 @@ def after_dataset_update(self, context, data_dict):

# IResourceController
def after_resource_create(self, context, resource):
if not context.get("is_background_job") and s3.is_available():
# All jobs are defined via decorators in jobs.py
jobs.RQJob.enqueue_all_jobs(resource, ckanext="dcor_depot")

# TODO: Remove this and make sure everything still works.
# Symlinking new dataset
# check organization
pkg_id = resource["package_id"]
Expand All @@ -50,29 +55,9 @@ def after_resource_create(self, context, resource):
pkg_job_id = f"{resource['package_id']}_{resource['position']}_"
jid_symlink = pkg_job_id + "symlink"
if not Job.exists(jid_symlink, connection=ckan_jobs_connect()):
toolkit.enqueue_job(symlink_user_dataset_job,
toolkit.enqueue_job(jobs.job_symlink_user_dataset,
[pkg, usr, resource],
title="Move and symlink user dataset",
queue="dcor-short",
rq_kwargs={"timeout": 60,
"job_id": jid_symlink})

# Migrating data to S3
# This job should only be run if the S3 access is available
if s3.is_available():
jid_migrate_s3 = pkg_job_id + "migrates3"
if not Job.exists(jid_migrate_s3, connection=ckan_jobs_connect()):
toolkit.enqueue_job(
migrate_resource_to_s3_job,
[resource],
title="Migrate resource to S3 object store",
queue="dcor-normal",
rq_kwargs={"timeout": 3600,
"job_id": jid_migrate_s3,
"depends_on": [
# general requirement
jid_symlink,
# requires SHA256 check
pkg_job_id + "sha256",
]}
)

0 comments on commit bf5222a

Please sign in to comment.