Skip to content

Commit

Permalink
Merge pull request #34 from nansencenter/processing_cleanup
Browse files Browse the repository at this point in the history
Updates to processing results cleanup
  • Loading branch information
aperrin66 authored Nov 20, 2024
2 parents 83f5b13 + 26ee684 commit 3f4b4d3
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
types: [prereleased, released]
env:
TESTING_IMAGE_NAME: geospaas_rest_api_tests
GEOSPAAS_PROCESSING_VERSION: 3.1.0
GEOSPAAS_PROCESSING_VERSION: 3.8.0
GEOSPAAS_HARVESTING_VERSION: 3.8.0
METANORM_VERSION: 4.1.2
jobs:
Expand Down
24 changes: 24 additions & 0 deletions geospaas_rest_api/migrations/0006_syntoolcomparejob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by Django 3.2 on 2024-11-14 14:41

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('geospaas_rest_api', '0005_alter_job_task_id'),
]

operations = [
migrations.CreateModel(
name='SyntoolCompareJob',
fields=[
],
options={
'proxy': True,
'indexes': [],
'constraints': [],
},
bases=('geospaas_rest_api.job',),
),
]
24 changes: 24 additions & 0 deletions geospaas_rest_api/migrations/0007_workdircleanupjob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by Django 3.2 on 2024-11-18 08:27

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('geospaas_rest_api', '0006_syntoolcomparejob'),
]

operations = [
migrations.CreateModel(
name='WorkdirCleanupJob',
fields=[
],
options={
'proxy': True,
'indexes': [],
'constraints': [],
},
bases=('geospaas_rest_api.job',),
),
]
3 changes: 2 additions & 1 deletion geospaas_rest_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
ConvertJob,
SyntoolCleanupJob,
SyntoolCompareJob,
HarvestJob)
HarvestJob,
WorkdirCleanupJob)
70 changes: 59 additions & 11 deletions geospaas_rest_api/processing_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,22 @@ def get_signature(cls, parameters):
tasks_core.archive.signature(),
tasks_core.publish.signature())
elif conversion_format == 'syntool':
syntool_chain = celery.chain(
syntool_tasks = [
tasks_core.download.signature(),
tasks_core.unarchive.signature(),
tasks_core.crop.signature(
kwargs={'bounding_box': parameters.get('bounding_box', None)}),
tasks_syntool.convert.signature(
kwargs={'converter_options': parameters.get('converter_options', None)}),
kwargs={
'converter_options': parameters.get('converter_options', None),
'ttl': parameters.get('ttl', None),
}),
tasks_syntool.db_insert.signature(),
tasks_core.remove_downloaded.signature())
]
if parameters.get('remove_downloaded', True):
syntool_tasks.append(tasks_core.remove_downloaded.signature())
syntool_chain = celery.chain(syntool_tasks)

if parameters.pop('skip_check', False):
return syntool_chain
else:
Expand All @@ -162,7 +169,14 @@ def check_parameters(parameters):
- bounding_box: 4-elements list
- format: value in ['idf']
"""
accepted_keys = ('dataset_id', 'format', 'bounding_box', 'skip_check', 'converter_options')
accepted_keys = (
'dataset_id',
'format',
'bounding_box',
'skip_check',
'converter_options',
'remove_downloaded',
'ttl')
if not set(parameters).issubset(set(accepted_keys)):
raise ValidationError(
f"The convert action accepts only these parameters: {', '.join(accepted_keys)}")
Expand All @@ -185,6 +199,10 @@ def check_parameters(parameters):
not isinstance(parameters['converter_options'], dict)):
raise ValidationError("'converter_options' should be a dictionary")

if ('ttl' in parameters and not (
parameters['ttl'] is None or isinstance(parameters['ttl'], dict))):
raise ValidationError("'ttl' should be a dictionary or None")

return parameters

@staticmethod
Expand Down Expand Up @@ -231,18 +249,25 @@ class Meta:

@classmethod
def get_signature(cls, parameters):
return celery.chain(
tasks_syntool.compare_profiles.signature(),
tasks = [
tasks_syntool.compare_profiles.signature(kwargs={'ttl': parameters.get('ttl', None)}),
tasks_syntool.db_insert.signature(),
tasks_core.remove_downloaded.signature(),
)
]
if parameters.get('remove_downloaded', True):
tasks.append(tasks_core.remove_downloaded.signature())
return celery.chain(tasks)

@staticmethod
def check_parameters(parameters):
accepted_keys = ('model', 'profiles')
if not set(parameters) == set(accepted_keys):
required_keys = ('model', 'profiles')
accepted_keys = (*required_keys, 'ttl')
if not set(parameters).issubset(accepted_keys):
raise ValidationError(
f"The convert action accepts only these parameters: {', '.join(accepted_keys)}")
f"The compare action accepts only these parameters: {', '.join(accepted_keys)}")

if not set(required_keys).issubset(parameters):
raise ValidationError(
f"The following parameters are required for the compare action: {required_keys}")

if ((not isinstance(parameters['model'], Sequence)) or
len(parameters['model']) != 2 or
Expand All @@ -265,6 +290,11 @@ def check_parameters(parameters):
break
if not valid_profiles:
raise ValidationError("'profiles' must be a list of tuples (profile_id, profile_path)")

if ('ttl' in parameters and not (
parameters['ttl'] is None or isinstance(parameters['ttl'], dict))):
raise ValidationError("'converter_options' should be a dictionary or None")

return parameters

@staticmethod
Expand Down Expand Up @@ -292,3 +322,21 @@ def check_parameters(parameters):
@staticmethod
def make_task_parameters(parameters):
return ((parameters['search_config_dict'],), {})


class WorkdirCleanupJob(Job):
"""Remove everything in the working directory"""
class Meta:
proxy = True

@classmethod
def get_signature(cls, parameters):
return tasks_core.cleanup_workdir.signature()

@staticmethod
def check_parameters(parameters):
return parameters

@staticmethod
def make_task_parameters(parameters):
return (tuple(), {})
2 changes: 2 additions & 0 deletions geospaas_rest_api/processing_api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class JobSerializer(rest_framework.serializers.Serializer):
'harvest': models.HarvestJob,
'syntool_cleanup': models.SyntoolCleanupJob,
'compare_profiles': models.SyntoolCompareJob,
'workdir_cleanup': models.WorkdirCleanupJob,
}

# Actual Job fields
Expand All @@ -29,6 +30,7 @@ class JobSerializer(rest_framework.serializers.Serializer):
'convert',
'harvest',
'syntool_cleanup',
'workdir_cleanup',
'compare_profiles',
],
required=True, write_only=True,
Expand Down
115 changes: 105 additions & 10 deletions geospaas_rest_api/tests/test_processing_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ def test_check_parameters_wrong_key(self):
self.assertListEqual(
raised.exception.detail,
[ErrorDetail(string="The convert action accepts only these parameters: "
"dataset_id, format, bounding_box, skip_check, converter_options",
"dataset_id, format, bounding_box, skip_check, converter_options, "
"remove_downloaded, ttl",
code='invalid')])

def test_check_parameters_wrong_format(self):
Expand All @@ -314,7 +315,8 @@ def test_check_parameters_extra_param(self):
self.assertListEqual(
raised.exception.detail,
[ErrorDetail(string="The convert action accepts only these parameters: "
"dataset_id, format, bounding_box, skip_check, converter_options",
"dataset_id, format, bounding_box, skip_check, converter_options, "
"remove_downloaded, ttl",
code='invalid')])

def test_check_parameters_wrong_type_for_dataset_id(self):
Expand Down Expand Up @@ -348,15 +350,34 @@ def test_check_parameters_wrong_converter_options_type(self):
models.ConvertJob.check_parameters(
{'dataset_id': 1, 'format': 'idf', 'converter_options': '2'})

def test_check_parameters_ttl(self):
"""`check_parameters()` must not raise an exception if 'ttl' is
a dict or None
"""
self.assertEqual(
models.ConvertJob.check_parameters(
{'dataset_id': 1, 'format': 'syntool', 'ttl': {'days': 2}}),
{'dataset_id': 1, 'format': 'syntool', 'ttl': {'days': 2}})
self.assertEqual(
models.ConvertJob.check_parameters(
{'dataset_id': 1, 'format': 'syntool', 'ttl': None}),
{'dataset_id': 1, 'format': 'syntool', 'ttl': None})

def test_check_parameters_wrong_ttl_type(self):
"""`check_parameters()` must raise an exception if the 'ttl'
value is of the wrong type"""
with self.assertRaises(ValidationError):
models.ConvertJob.check_parameters(
{'dataset_id': 1, 'format': 'syntool', 'ttl': 2})

def test_get_signature_syntool(self):
"""Test the right signature is returned"""
self.maxDiff = None
base_chain = celery.chain(
tasks_core.download.signature(),
tasks_core.unarchive.signature(),
tasks_core.crop.signature(
kwargs={'bounding_box': [0, 20, 20, 0]}),
tasks_syntool.convert.signature(kwargs={'converter_options': None}),
tasks_syntool.convert.signature(kwargs={'converter_options': None, 'ttl': None}),
tasks_syntool.db_insert.signature(),
tasks_core.remove_downloaded.signature())

Expand Down Expand Up @@ -459,11 +480,26 @@ def test_get_signature(self):
'geospaas_rest_api.processing_api.models.tasks_core') as mock_core_tasks, \
mock.patch('celery.chain') as mock_chain:
_ = models.SyntoolCompareJob.get_signature({})
mock_chain.assert_called_once_with(
mock_chain.assert_called_once_with([
mock_syntool_tasks.compare_profiles.signature.return_value,
mock_syntool_tasks.db_insert.signature.return_value,
mock_core_tasks.remove_downloaded.signature.return_value,
)
])

def test_get_signature_no_remove(self):
"""Test that remove_downloaded is not added to the signature if
the parameter is False
"""
with mock.patch(
'geospaas_rest_api.processing_api.models.tasks_syntool') as mock_syntool_tasks, \
mock.patch(
'geospaas_rest_api.processing_api.models.tasks_core') as mock_core_tasks, \
mock.patch('celery.chain') as mock_chain:
_ = models.SyntoolCompareJob.get_signature({'remove_downloaded': False})
mock_chain.assert_called_once_with([
mock_syntool_tasks.compare_profiles.signature.return_value,
mock_syntool_tasks.db_insert.signature.return_value,
])

def test_check_parameters_ok(self):
"""Test that check_parameters() returns the parameters when
Expand All @@ -472,10 +508,36 @@ def test_check_parameters_ok(self):
self.assertDictEqual(
models.SyntoolCompareJob.check_parameters({
'model': (123, '/foo'),
'profiles': ((456, '/bar'), (789, '/baz'))
'profiles': ((456, '/bar'), (789, '/baz')),
}),
{'model': (123, '/foo'), 'profiles': ((456, '/bar'), (789, '/baz'))})

def test_check_parameters_ttl(self):
"""ttl must be a dict or None"""
self.assertDictEqual(
models.SyntoolCompareJob.check_parameters({
'model': (123, '/foo'),
'profiles': ((456, '/bar'), (789, '/baz')),
'ttl': {'days': 2},
}),
{
'model': (123, '/foo'),
'profiles': ((456, '/bar'), (789, '/baz')),
'ttl': {'days': 2},
})
self.assertDictEqual(
models.SyntoolCompareJob.check_parameters({
'model': (123, '/foo'),
'profiles': ((456, '/bar'), (789, '/baz')),
'ttl': None,
}),
{
'model': (123, '/foo'),
'profiles': ((456, '/bar'), (789, '/baz')),
'ttl': None,
})


def test_check_parameters_unknown(self):
"""An error should be raised when an unknown parameter is given
"""
Expand Down Expand Up @@ -529,6 +591,13 @@ def test_check_parameters_wrong_type(self):
'model': (123, '/foo'),
'profiles': ((456, '/bar'), (789, False))
})
# wrong ttl type
with self.assertRaises(ValidationError):
models.SyntoolCompareJob.check_parameters({
'model': (123, '/foo'),
'profiles': ((456, '/bar'), (789, '/baz')),
'ttl': 2,
})

def test_make_task_parameters(self):
"""Test that the right arguments are builts from the request
Expand Down Expand Up @@ -582,6 +651,29 @@ def test_make_task_parameters(self):
(({'foo': 'bar'},), {}))


class WorkdirCleanupJobTests(unittest.TestCase):
"""Tests for WorkdirCleanupJob"""

def test_get_signature(self):
"""The signature is the cleanup_workdir task"""
self.assertEqual(
models.WorkdirCleanupJob.get_signature({}),
tasks_core.cleanup_workdir.signature()
)

def test_check_parameters(self):
"""No check needed"""
parameters = mock.Mock()
self.assertEqual(
models.WorkdirCleanupJob.check_parameters(parameters),
parameters)

def test_make_task_parameters(self):
"""No parameters needed"""
self.assertTupleEqual(
models.WorkdirCleanupJob.make_task_parameters({}),
(tuple(), {}))

class JobViewSetTests(django.test.TestCase):
"""Test jobs/ endpoints"""

Expand Down Expand Up @@ -874,13 +966,15 @@ def test_list_tasks(self):
"dataset": 1,
"path": "ingested/product_name/granule_name_1/",
"type": "syntool",
"created": "2023-10-25T15:38:47Z"
"created": "2023-10-25T15:38:47Z",
"ttl": None,
}, {
"id": 2,
"dataset": 2,
"path": "ingested/product_name/granule_name_2/",
"type": "syntool",
"created": "2023-10-26T09:10:19Z"
"created": "2023-10-26T09:10:19Z",
"ttl": None,
}
]
}
Expand All @@ -894,5 +988,6 @@ def test_retrieve_task(self):
"dataset": 1,
"path": "ingested/product_name/granule_name_1/",
"type": "syntool",
"created": "2023-10-25T15:38:47Z"
"created": "2023-10-25T15:38:47Z",
"ttl": None,
})

0 comments on commit 3f4b4d3

Please sign in to comment.