From 54a623b6edaa766dc7dd55247e4592c26f2cdff5 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Tue, 3 Oct 2023 16:02:46 -0400 Subject: [PATCH] Add `UPLOADED` state to Zarr models --- dandiapi/api/tests/test_asset.py | 19 +++-- dandiapi/api/tests/test_version.py | 2 +- ...ive-consistent-checksum-status_and_more.py | 76 +++++++++++++++++++ dandiapi/zarr/models.py | 9 ++- dandiapi/zarr/tasks/__init__.py | 4 +- .../zarr/tests/test_ingest_zarr_archive.py | 17 +++-- dandiapi/zarr/tests/test_zarr.py | 35 ++++++--- dandiapi/zarr/views/__init__.py | 9 ++- 8 files changed, 143 insertions(+), 28 deletions(-) create mode 100644 dandiapi/zarr/migrations/0004_remove_embargoedzarrarchive_zarr-embargoedzarrarchive-consistent-checksum-status_and_more.py diff --git a/dandiapi/api/tests/test_asset.py b/dandiapi/api/tests/test_asset.py index c0f250cd8..379dd6f4e 100644 --- a/dandiapi/api/tests/test_asset.py +++ b/dandiapi/api/tests/test_asset.py @@ -17,7 +17,7 @@ from dandiapi.api.services.asset.exceptions import AssetPathConflict from dandiapi.api.services.publish import publish_asset from dandiapi.api.tasks.scheduled import validate_pending_asset_metadata -from dandiapi.zarr.models import ZarrArchive +from dandiapi.zarr.models import ZarrArchive, ZarrArchiveStatus from dandiapi.zarr.tasks import ingest_zarr_archive from .fuzzy import HTTP_URL_RE, TIMESTAMP_RE, URN_RE, UTC_ISO_TIMESTAMP_RE, UUID_RE @@ -1218,25 +1218,32 @@ def test_asset_rest_delete_zarr( @pytest.mark.django_db def test_asset_rest_delete_zarr_modified( - api_client, user, draft_version, zarr_archive, zarr_file_factory, storage, monkeypatch + api_client, + user, + draft_version, + zarr_archive_factory, + zarr_file_factory, + storage, + monkeypatch, ): """Ensure that a zarr can be associated to an asset, modified, then the asset deleted.""" # Pretend like our zarr was defined with the given storage monkeypatch.setattr(ZarrArchive, 'storage', storage) # Assign perms and authenticate user - assign_perm('owner', user, draft_version.dandiset) + dandiset = draft_version.dandiset + assign_perm('owner', user, dandiset) api_client.force_authenticate(user=user) # Ensure zarr is ingested + zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED, dandiset=dandiset) zarr_file_factory(zarr_archive=zarr_archive, size=100) ingest_zarr_archive(zarr_archive.zarr_id) zarr_archive.refresh_from_db() # Create first asset, pointing to zarr resp = api_client.post( - f'/api/dandisets/{draft_version.dandiset.identifier}' - f'/versions/{draft_version.version}/assets/', + f'/api/dandisets/{dandiset.identifier}' f'/versions/{draft_version.version}/assets/', { 'metadata': { 'path': 'sample.zarr', @@ -1262,7 +1269,7 @@ def test_asset_rest_delete_zarr_modified( # Delete the asset resp = api_client.delete( - f'/api/dandisets/{draft_version.dandiset.identifier}/' + f'/api/dandisets/{dandiset.identifier}/' f'versions/{draft_version.version}/assets/{asset.asset_id}/' ) assert resp.status_code == 204 diff --git a/dandiapi/api/tests/test_version.py b/dandiapi/api/tests/test_version.py index 619b2aca7..75541b4c7 100644 --- a/dandiapi/api/tests/test_version.py +++ b/dandiapi/api/tests/test_version.py @@ -648,7 +648,7 @@ def test_version_rest_publish_zarr( api_client.force_authenticate(user=user) # create and ingest zarr archive - zarr_archive = zarr_archive_factory(dandiset=draft_version.dandiset) + zarr_archive = zarr_archive_factory(dandiset=draft_version.dandiset, status='Uploaded') zarr_file_factory(zarr_archive=zarr_archive) ingest_zarr_archive(zarr_archive.zarr_id) zarr_archive.refresh_from_db() diff --git a/dandiapi/zarr/migrations/0004_remove_embargoedzarrarchive_zarr-embargoedzarrarchive-consistent-checksum-status_and_more.py b/dandiapi/zarr/migrations/0004_remove_embargoedzarrarchive_zarr-embargoedzarrarchive-consistent-checksum-status_and_more.py new file mode 100644 index 000000000..e7b98932c --- /dev/null +++ b/dandiapi/zarr/migrations/0004_remove_embargoedzarrarchive_zarr-embargoedzarrarchive-consistent-checksum-status_and_more.py @@ -0,0 +1,76 @@ +# Generated by Django 4.1.1 on 2023-10-04 18:03 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ('zarr', '0003_zarr_rename_tables_indexes_constraints'), + ] + + operations = [ + migrations.RemoveConstraint( + model_name='embargoedzarrarchive', + name='zarr-embargoedzarrarchive-consistent-checksum-status', + ), + migrations.RemoveConstraint( + model_name='zarrarchive', + name='zarr-zarrarchive-consistent-checksum-status', + ), + migrations.AlterField( + model_name='embargoedzarrarchive', + name='status', + field=models.CharField( + choices=[ + ('Pending', 'Pending'), + ('Uploaded', 'Uploaded'), + ('Ingesting', 'Ingesting'), + ('Complete', 'Complete'), + ], + default='Pending', + max_length=9, + ), + ), + migrations.AlterField( + model_name='zarrarchive', + name='status', + field=models.CharField( + choices=[ + ('Pending', 'Pending'), + ('Uploaded', 'Uploaded'), + ('Ingesting', 'Ingesting'), + ('Complete', 'Complete'), + ], + default='Pending', + max_length=9, + ), + ), + migrations.AddConstraint( + model_name='embargoedzarrarchive', + constraint=models.CheckConstraint( + check=models.Q( + models.Q( + ('checksum__isnull', True), + ('status__in', ['Pending', 'Uploaded', 'Ingesting']), + ), + models.Q(('checksum__isnull', False), ('status', 'Complete')), + _connector='OR', + ), + name='zarr-embargoedzarrarchive-consistent-checksum-status', + ), + ), + migrations.AddConstraint( + model_name='zarrarchive', + constraint=models.CheckConstraint( + check=models.Q( + models.Q( + ('checksum__isnull', True), + ('status__in', ['Pending', 'Uploaded', 'Ingesting']), + ), + models.Q(('checksum__isnull', False), ('status', 'Complete')), + _connector='OR', + ), + name='zarr-zarrarchive-consistent-checksum-status', + ), + ), + ] diff --git a/dandiapi/zarr/models.py b/dandiapi/zarr/models.py index ef48edd8d..e7173ff02 100644 --- a/dandiapi/zarr/models.py +++ b/dandiapi/zarr/models.py @@ -19,13 +19,14 @@ # The status of the zarr ingestion (checksums, size, file count) class ZarrArchiveStatus(models.TextChoices): PENDING = 'Pending' + UPLOADED = 'Uploaded' INGESTING = 'Ingesting' COMPLETE = 'Complete' class BaseZarrArchive(TimeStampedModel): UUID_REGEX = r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}' - INGEST_ERROR_MSG = 'Zarr archive already ingested' + INGEST_ERROR_MSG = 'Zarr archive is currently ingesting or has already ingested' class Meta: get_latest_by = 'modified' @@ -39,7 +40,11 @@ class Meta: name='%(app_label)s-%(class)s-consistent-checksum-status', check=models.Q( checksum__isnull=True, - status__in=[ZarrArchiveStatus.PENDING, ZarrArchiveStatus.INGESTING], + status__in=[ + ZarrArchiveStatus.PENDING, + ZarrArchiveStatus.UPLOADED, + ZarrArchiveStatus.INGESTING, + ], ) | models.Q(checksum__isnull=False, status=ZarrArchiveStatus.COMPLETE), ), diff --git a/dandiapi/zarr/tasks/__init__.py b/dandiapi/zarr/tasks/__init__.py index 92fb39409..81bbd921a 100644 --- a/dandiapi/zarr/tasks/__init__.py +++ b/dandiapi/zarr/tasks/__init__.py @@ -18,8 +18,8 @@ def ingest_zarr_archive(zarr_id: str, force: bool = False): # Ensure zarr is in pending state before proceeding with transaction.atomic(): zarr: ZarrArchive = ZarrArchive.objects.select_for_update().get(zarr_id=zarr_id) - if not force and zarr.status != ZarrArchiveStatus.PENDING: - logger.info(f'{ZarrArchive.INGEST_ERROR_MSG}. Exiting...') + if not force and zarr.status != ZarrArchiveStatus.UPLOADED: + logger.info('Zarrs must be in an UPLOADED state to begin ingestion. Exiting...') return # Set as ingesting diff --git a/dandiapi/zarr/tests/test_ingest_zarr_archive.py b/dandiapi/zarr/tests/test_ingest_zarr_archive.py index ffa1172cf..38f476ce6 100644 --- a/dandiapi/zarr/tests/test_ingest_zarr_archive.py +++ b/dandiapi/zarr/tests/test_ingest_zarr_archive.py @@ -11,7 +11,8 @@ @pytest.mark.django_db(transaction=True) def test_ingest_zarr_archive(zarr_archive_factory, zarr_file_factory): - zarr: ZarrArchive = zarr_archive_factory() + # Create zarr with uploaded status so it can be ingested + zarr: ZarrArchive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED) files = [ zarr_file_factory(zarr_archive=zarr, path='foo'), zarr_file_factory(zarr_archive=zarr, path='bar'), @@ -24,7 +25,7 @@ def test_ingest_zarr_archive(zarr_archive_factory, zarr_file_factory): assert zarr.checksum is None assert zarr.size == 0 assert zarr.file_count == 0 - assert zarr.status == ZarrArchiveStatus.PENDING + assert zarr.status == ZarrArchiveStatus.UPLOADED # Compute checksum ingest_zarr_archive(str(zarr.zarr_id)) @@ -39,7 +40,7 @@ def test_ingest_zarr_archive(zarr_archive_factory, zarr_file_factory): @pytest.mark.django_db(transaction=True) def test_ingest_zarr_archive_empty(zarr_archive_factory): - zarr: ZarrArchive = zarr_archive_factory() + zarr: ZarrArchive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED) # Compute checksum ingest_zarr_archive(str(zarr.zarr_id)) @@ -81,7 +82,7 @@ def test_ingest_zarr_archive_force(zarr_archive_factory, zarr_file_factory): @pytest.mark.django_db(transaction=True) def test_ingest_zarr_archive_assets(zarr_archive_factory, zarr_file_factory, draft_asset_factory): # Create zarr and asset - zarr: ZarrArchive = zarr_archive_factory() + zarr: ZarrArchive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED) asset = draft_asset_factory(zarr=zarr, blob=None, embargoed_blob=None) # Assert asset size, metadata @@ -102,11 +103,14 @@ def test_ingest_zarr_archive_assets(zarr_archive_factory, zarr_file_factory, dra @pytest.mark.django_db(transaction=True) -def test_ingest_zarr_archive_modified(user, draft_version, zarr_archive, zarr_file_factory): +def test_ingest_zarr_archive_modified(user, draft_version, zarr_archive_factory, zarr_file_factory): """Ensure that if the zarr associated to an asset is modified and then ingested, it succeeds.""" assign_perm('owner', user, draft_version.dandiset) # Ensure zarr is ingested with non-zero size + zarr_archive = zarr_archive_factory( + dandiset=draft_version.dandiset, status=ZarrArchiveStatus.UPLOADED + ) zarr_file_factory(zarr_archive=zarr_archive, size=100) ingest_zarr_archive(zarr_archive.zarr_id) zarr_archive.refresh_from_db() @@ -128,6 +132,7 @@ def test_ingest_zarr_archive_modified(user, draft_version, zarr_archive, zarr_fi # Simulate more data uploaded to the zarr zarr_archive.mark_pending() + zarr_archive.status = ZarrArchiveStatus.UPLOADED zarr_archive.save() zarr_archive.refresh_from_db() @@ -142,7 +147,7 @@ def test_ingest_dandiset_zarrs(dandiset_factory, zarr_archive_factory, zarr_file for _ in range(10): zarr_file_factory( path='foo/a', - zarr_archive=zarr_archive_factory(dandiset=dandiset), + zarr_archive=zarr_archive_factory(dandiset=dandiset, status=ZarrArchiveStatus.UPLOADED), ) # Run ingest diff --git a/dandiapi/zarr/tests/test_zarr.py b/dandiapi/zarr/tests/test_zarr.py index 42e8b8b8b..0f2444f38 100644 --- a/dandiapi/zarr/tests/test_zarr.py +++ b/dandiapi/zarr/tests/test_zarr.py @@ -102,11 +102,11 @@ def test_zarr_rest_create_embargoed_dandiset( @pytest.mark.django_db -def test_zarr_rest_get( - authenticated_api_client, storage, zarr_archive: ZarrArchive, zarr_file_factory -): +def test_zarr_rest_get(authenticated_api_client, storage, zarr_archive_factory, zarr_file_factory): # Pretend like ZarrArchive was defined with the given storage ZarrArchive.storage = storage + + zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED) zarr_file = zarr_file_factory(zarr_archive=zarr_archive) # Ingest @@ -211,16 +211,18 @@ def test_zarr_rest_delete_file( authenticated_api_client, user, storage, - zarr_archive: ZarrArchive, + zarr_archive_factory, zarr_file_factory, ): - assign_perm('owner', user, zarr_archive.dandiset) # Pretend like ZarrArchive was defined with the given storage ZarrArchive.storage = storage - zarr_file = zarr_file_factory(zarr_archive=zarr_archive) + # Create zarr and assign user perms + zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED) + assign_perm('owner', user, zarr_archive.dandiset) - # Ingest + # Upload file and ingest + zarr_file = zarr_file_factory(zarr_archive=zarr_archive) ingest_zarr_archive(zarr_archive.zarr_id) # Assert file count and size @@ -243,6 +245,8 @@ def test_zarr_rest_delete_file( assert zarr_archive.size == 0 # Re-ingest + zarr_archive.status = ZarrArchiveStatus.UPLOADED + zarr_archive.save() ingest_zarr_archive(zarr_archive.zarr_id) zarr_archive.refresh_from_db() @@ -256,14 +260,16 @@ def test_zarr_rest_delete_file_asset_metadata( authenticated_api_client, user, storage, - zarr_archive: ZarrArchive, + zarr_archive_factory, zarr_file_factory, asset_factory, ): - assign_perm('owner', user, zarr_archive.dandiset) # Pretend like ZarrArchive was defined with the given storage ZarrArchive.storage = storage + zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED) + assign_perm('owner', user, zarr_archive.dandiset) + asset = asset_factory(zarr=zarr_archive, blob=None) zarr_file = zarr_file_factory(zarr_archive=zarr_archive) @@ -279,7 +285,13 @@ def test_zarr_rest_delete_file_asset_metadata( ) assert resp.status_code == 204 + # Re-ingest + zarr_archive.refresh_from_db() + zarr_archive.status = ZarrArchiveStatus.UPLOADED + zarr_archive.save() ingest_zarr_archive(zarr_archive.zarr_id) + + # Assert now empty asset.refresh_from_db() assert asset.full_metadata['digest']['dandi:dandi-zarr-checksum'] == EMPTY_CHECKSUM assert asset.full_metadata['contentSize'] == 0 @@ -357,7 +369,12 @@ def test_zarr_rest_delete_missing_file( ] assert zarr_archive.storage.exists(zarr_archive.s3_path(zarr_file.path)) + # Ingest + zarr_archive.status = ZarrArchiveStatus.UPLOADED + zarr_archive.save() ingest_zarr_archive(zarr_archive.zarr_id) + + # Check zarr_archive.refresh_from_db() assert zarr_archive.file_count == 1 assert zarr_archive.size == zarr_file.size diff --git a/dandiapi/zarr/views/__init__.py b/dandiapi/zarr/views/__init__.py index 71767a7ab..7705a6aa2 100644 --- a/dandiapi/zarr/views/__init__.py +++ b/dandiapi/zarr/views/__init__.py @@ -170,6 +170,11 @@ def finalize(self, request, zarr_id): if zarr_archive.status != ZarrArchiveStatus.PENDING: return Response(ZarrArchive.INGEST_ERROR_MSG, status=status.HTTP_400_BAD_REQUEST) + # Indicate that the user is done uploading to this zarr + # TODO: If locking above is removed, use an update query + zarr_archive.status = ZarrArchiveStatus.UPLOADED + zarr_archive.save() + # Dispatch task ingest_zarr_archive.delay(zarr_id=zarr_archive.zarr_id) return Response(None, status=status.HTTP_204_NO_CONTENT) @@ -265,7 +270,7 @@ def create_files(self, request, zarr_id): queryset = self.get_queryset().select_for_update(of=['self']) with transaction.atomic(): zarr_archive: ZarrArchive = get_object_or_404(queryset, zarr_id=zarr_id) - if zarr_archive.status == ZarrArchiveStatus.INGESTING: + if zarr_archive.status in [ZarrArchiveStatus.UPLOADED, ZarrArchiveStatus.INGESTING]: return Response(ZarrArchive.INGEST_ERROR_MSG, status=status.HTTP_400_BAD_REQUEST) # Deny if the user doesn't have ownership permission @@ -301,7 +306,7 @@ def delete_files(self, request, zarr_id): queryset = self.get_queryset().select_for_update() with transaction.atomic(): zarr_archive: ZarrArchive = get_object_or_404(queryset, zarr_id=zarr_id) - if zarr_archive.status == ZarrArchiveStatus.INGESTING: + if zarr_archive.status in [ZarrArchiveStatus.UPLOADED, ZarrArchiveStatus.INGESTING]: return Response(ZarrArchive.INGEST_ERROR_MSG, status=status.HTTP_400_BAD_REQUEST) if not self.request.user.has_perm('owner', zarr_archive.dandiset):