Skip to content

Commit

Permalink
Merge pull request #1698 from dandi/zarr-uploaded-status
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnesbitt authored Nov 7, 2023
2 parents ec0c1f1 + d7e7169 commit c875d22
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 30 deletions.
19 changes: 13 additions & 6 deletions dandiapi/api/tests/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dandiapi.api.services.asset.exceptions import AssetPathConflict
from dandiapi.api.services.publish import publish_asset
from dandiapi.api.tasks.scheduled import validate_pending_asset_metadata
from dandiapi.zarr.models import ZarrArchive
from dandiapi.zarr.models import ZarrArchive, ZarrArchiveStatus
from dandiapi.zarr.tasks import ingest_zarr_archive

from .fuzzy import HTTP_URL_RE, TIMESTAMP_RE, URN_RE, UTC_ISO_TIMESTAMP_RE, UUID_RE
Expand Down Expand Up @@ -1244,25 +1244,32 @@ def test_asset_rest_delete_zarr(

@pytest.mark.django_db
def test_asset_rest_delete_zarr_modified(
api_client, user, draft_version, zarr_archive, zarr_file_factory, storage, monkeypatch
api_client,
user,
draft_version,
zarr_archive_factory,
zarr_file_factory,
storage,
monkeypatch,
):
"""Ensure that a zarr can be associated to an asset, modified, then the asset deleted."""
# Pretend like our zarr was defined with the given storage
monkeypatch.setattr(ZarrArchive, 'storage', storage)

# Assign perms and authenticate user
assign_perm('owner', user, draft_version.dandiset)
dandiset = draft_version.dandiset
assign_perm('owner', user, dandiset)
api_client.force_authenticate(user=user)

# Ensure zarr is ingested
zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED, dandiset=dandiset)
zarr_file_factory(zarr_archive=zarr_archive, size=100)
ingest_zarr_archive(zarr_archive.zarr_id)
zarr_archive.refresh_from_db()

# Create first asset, pointing to zarr
resp = api_client.post(
f'/api/dandisets/{draft_version.dandiset.identifier}'
f'/versions/{draft_version.version}/assets/',
f'/api/dandisets/{dandiset.identifier}' f'/versions/{draft_version.version}/assets/',
{
'metadata': {
'path': 'sample.zarr',
Expand All @@ -1288,7 +1295,7 @@ def test_asset_rest_delete_zarr_modified(

# Delete the asset
resp = api_client.delete(
f'/api/dandisets/{draft_version.dandiset.identifier}/'
f'/api/dandisets/{dandiset.identifier}/'
f'versions/{draft_version.version}/assets/{asset.asset_id}/'
)
assert resp.status_code == 204
Expand Down
2 changes: 1 addition & 1 deletion dandiapi/api/tests/test_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ def test_version_rest_publish_zarr(
api_client.force_authenticate(user=user)

# create and ingest zarr archive
zarr_archive = zarr_archive_factory(dandiset=draft_version.dandiset)
zarr_archive = zarr_archive_factory(dandiset=draft_version.dandiset, status='Uploaded')
zarr_file_factory(zarr_archive=zarr_archive)
ingest_zarr_archive(zarr_archive.zarr_id)
zarr_archive.refresh_from_db()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Generated by Django 4.1.1 on 2023-10-04 18:03

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
('zarr', '0003_zarr_rename_tables_indexes_constraints'),
]

operations = [
migrations.RemoveConstraint(
model_name='embargoedzarrarchive',
name='zarr-embargoedzarrarchive-consistent-checksum-status',
),
migrations.RemoveConstraint(
model_name='zarrarchive',
name='zarr-zarrarchive-consistent-checksum-status',
),
migrations.AlterField(
model_name='embargoedzarrarchive',
name='status',
field=models.CharField(
choices=[
('Pending', 'Pending'),
('Uploaded', 'Uploaded'),
('Ingesting', 'Ingesting'),
('Complete', 'Complete'),
],
default='Pending',
max_length=9,
),
),
migrations.AlterField(
model_name='zarrarchive',
name='status',
field=models.CharField(
choices=[
('Pending', 'Pending'),
('Uploaded', 'Uploaded'),
('Ingesting', 'Ingesting'),
('Complete', 'Complete'),
],
default='Pending',
max_length=9,
),
),
migrations.AddConstraint(
model_name='embargoedzarrarchive',
constraint=models.CheckConstraint(
check=models.Q(
models.Q(
('checksum__isnull', True),
('status__in', ['Pending', 'Uploaded', 'Ingesting']),
),
models.Q(('checksum__isnull', False), ('status', 'Complete')),
_connector='OR',
),
name='zarr-embargoedzarrarchive-consistent-checksum-status',
),
),
migrations.AddConstraint(
model_name='zarrarchive',
constraint=models.CheckConstraint(
check=models.Q(
models.Q(
('checksum__isnull', True),
('status__in', ['Pending', 'Uploaded', 'Ingesting']),
),
models.Q(('checksum__isnull', False), ('status', 'Complete')),
_connector='OR',
),
name='zarr-zarrarchive-consistent-checksum-status',
),
),
]
9 changes: 7 additions & 2 deletions dandiapi/zarr/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
# The status of the zarr ingestion (checksums, size, file count)
class ZarrArchiveStatus(models.TextChoices):
PENDING = 'Pending'
UPLOADED = 'Uploaded'
INGESTING = 'Ingesting'
COMPLETE = 'Complete'


class BaseZarrArchive(TimeStampedModel):
UUID_REGEX = r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}'
INGEST_ERROR_MSG = 'Zarr archive already ingested'
INGEST_ERROR_MSG = 'Zarr archive is currently ingesting or has already ingested'

class Meta:
get_latest_by = 'modified'
Expand All @@ -39,7 +40,11 @@ class Meta:
name='%(app_label)s-%(class)s-consistent-checksum-status',
check=models.Q(
checksum__isnull=True,
status__in=[ZarrArchiveStatus.PENDING, ZarrArchiveStatus.INGESTING],
status__in=[
ZarrArchiveStatus.PENDING,
ZarrArchiveStatus.UPLOADED,
ZarrArchiveStatus.INGESTING,
],
)
| models.Q(checksum__isnull=False, status=ZarrArchiveStatus.COMPLETE),
),
Expand Down
4 changes: 2 additions & 2 deletions dandiapi/zarr/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def ingest_zarr_archive(zarr_id: str, force: bool = False):
# Ensure zarr is in pending state before proceeding
with transaction.atomic():
zarr: ZarrArchive = ZarrArchive.objects.select_for_update().get(zarr_id=zarr_id)
if not force and zarr.status != ZarrArchiveStatus.PENDING:
logger.info(f'{ZarrArchive.INGEST_ERROR_MSG}. Exiting...')
if not force and zarr.status != ZarrArchiveStatus.UPLOADED:
logger.info('Zarrs must be in an UPLOADED state to begin ingestion. Exiting...')
return

# Set as ingesting
Expand Down
17 changes: 11 additions & 6 deletions dandiapi/zarr/tests/test_ingest_zarr_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

@pytest.mark.django_db(transaction=True)
def test_ingest_zarr_archive(zarr_archive_factory, zarr_file_factory):
zarr: ZarrArchive = zarr_archive_factory()
# Create zarr with uploaded status so it can be ingested
zarr: ZarrArchive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
files = [
zarr_file_factory(zarr_archive=zarr, path='foo'),
zarr_file_factory(zarr_archive=zarr, path='bar'),
Expand All @@ -24,7 +25,7 @@ def test_ingest_zarr_archive(zarr_archive_factory, zarr_file_factory):
assert zarr.checksum is None
assert zarr.size == 0
assert zarr.file_count == 0
assert zarr.status == ZarrArchiveStatus.PENDING
assert zarr.status == ZarrArchiveStatus.UPLOADED

# Compute checksum
ingest_zarr_archive(str(zarr.zarr_id))
Expand All @@ -39,7 +40,7 @@ def test_ingest_zarr_archive(zarr_archive_factory, zarr_file_factory):

@pytest.mark.django_db(transaction=True)
def test_ingest_zarr_archive_empty(zarr_archive_factory):
zarr: ZarrArchive = zarr_archive_factory()
zarr: ZarrArchive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)

# Compute checksum
ingest_zarr_archive(str(zarr.zarr_id))
Expand Down Expand Up @@ -81,7 +82,7 @@ def test_ingest_zarr_archive_force(zarr_archive_factory, zarr_file_factory):
@pytest.mark.django_db(transaction=True)
def test_ingest_zarr_archive_assets(zarr_archive_factory, zarr_file_factory, draft_asset_factory):
# Create zarr and asset
zarr: ZarrArchive = zarr_archive_factory()
zarr: ZarrArchive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
asset = draft_asset_factory(zarr=zarr, blob=None, embargoed_blob=None)

# Assert asset size, metadata
Expand All @@ -102,11 +103,14 @@ def test_ingest_zarr_archive_assets(zarr_archive_factory, zarr_file_factory, dra


@pytest.mark.django_db(transaction=True)
def test_ingest_zarr_archive_modified(user, draft_version, zarr_archive, zarr_file_factory):
def test_ingest_zarr_archive_modified(user, draft_version, zarr_archive_factory, zarr_file_factory):
"""Ensure that if the zarr associated to an asset is modified and then ingested, it succeeds."""
assign_perm('owner', user, draft_version.dandiset)

# Ensure zarr is ingested with non-zero size
zarr_archive = zarr_archive_factory(
dandiset=draft_version.dandiset, status=ZarrArchiveStatus.UPLOADED
)
zarr_file_factory(zarr_archive=zarr_archive, size=100)
ingest_zarr_archive(zarr_archive.zarr_id)
zarr_archive.refresh_from_db()
Expand All @@ -128,6 +132,7 @@ def test_ingest_zarr_archive_modified(user, draft_version, zarr_archive, zarr_fi

# Simulate more data uploaded to the zarr
zarr_archive.mark_pending()
zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()
zarr_archive.refresh_from_db()

Expand All @@ -142,7 +147,7 @@ def test_ingest_dandiset_zarrs(dandiset_factory, zarr_archive_factory, zarr_file
for _ in range(10):
zarr_file_factory(
path='foo/a',
zarr_archive=zarr_archive_factory(dandiset=dandiset),
zarr_archive=zarr_archive_factory(dandiset=dandiset, status=ZarrArchiveStatus.UPLOADED),
)

# Run ingest
Expand Down
35 changes: 26 additions & 9 deletions dandiapi/zarr/tests/test_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def test_zarr_rest_create_embargoed_dandiset(


@pytest.mark.django_db
def test_zarr_rest_get(
authenticated_api_client, storage, zarr_archive: ZarrArchive, zarr_file_factory
):
def test_zarr_rest_get(authenticated_api_client, storage, zarr_archive_factory, zarr_file_factory):
# Pretend like ZarrArchive was defined with the given storage
ZarrArchive.storage = storage

zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
zarr_file = zarr_file_factory(zarr_archive=zarr_archive)

# Ingest
Expand Down Expand Up @@ -211,16 +211,18 @@ def test_zarr_rest_delete_file(
authenticated_api_client,
user,
storage,
zarr_archive: ZarrArchive,
zarr_archive_factory,
zarr_file_factory,
):
assign_perm('owner', user, zarr_archive.dandiset)
# Pretend like ZarrArchive was defined with the given storage
ZarrArchive.storage = storage

zarr_file = zarr_file_factory(zarr_archive=zarr_archive)
# Create zarr and assign user perms
zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
assign_perm('owner', user, zarr_archive.dandiset)

# Ingest
# Upload file and ingest
zarr_file = zarr_file_factory(zarr_archive=zarr_archive)
ingest_zarr_archive(zarr_archive.zarr_id)

# Assert file count and size
Expand All @@ -243,6 +245,8 @@ def test_zarr_rest_delete_file(
assert zarr_archive.size == 0

# Re-ingest
zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()
ingest_zarr_archive(zarr_archive.zarr_id)

zarr_archive.refresh_from_db()
Expand All @@ -256,14 +260,16 @@ def test_zarr_rest_delete_file_asset_metadata(
authenticated_api_client,
user,
storage,
zarr_archive: ZarrArchive,
zarr_archive_factory,
zarr_file_factory,
asset_factory,
):
assign_perm('owner', user, zarr_archive.dandiset)
# Pretend like ZarrArchive was defined with the given storage
ZarrArchive.storage = storage

zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
assign_perm('owner', user, zarr_archive.dandiset)

asset = asset_factory(zarr=zarr_archive, blob=None)

zarr_file = zarr_file_factory(zarr_archive=zarr_archive)
Expand All @@ -279,7 +285,13 @@ def test_zarr_rest_delete_file_asset_metadata(
)
assert resp.status_code == 204

# Re-ingest
zarr_archive.refresh_from_db()
zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()
ingest_zarr_archive(zarr_archive.zarr_id)

# Assert now empty
asset.refresh_from_db()
assert asset.full_metadata['digest']['dandi:dandi-zarr-checksum'] == EMPTY_CHECKSUM
assert asset.full_metadata['contentSize'] == 0
Expand Down Expand Up @@ -357,7 +369,12 @@ def test_zarr_rest_delete_missing_file(
]
assert zarr_archive.storage.exists(zarr_archive.s3_path(zarr_file.path))

# Ingest
zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()
ingest_zarr_archive(zarr_archive.zarr_id)

# Check
zarr_archive.refresh_from_db()
assert zarr_archive.file_count == 1
assert zarr_archive.size == zarr_file.size
Expand Down
7 changes: 5 additions & 2 deletions dandiapi/zarr/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ def finalize(self, request, zarr_id):
if zarr_archive.status != ZarrArchiveStatus.PENDING:
return Response(ZarrArchive.INGEST_ERROR_MSG, status=status.HTTP_400_BAD_REQUEST)

zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()

# Dispatch task
ingest_zarr_archive.delay(zarr_id=zarr_archive.zarr_id)
return Response(None, status=status.HTTP_204_NO_CONTENT)
Expand Down Expand Up @@ -265,7 +268,7 @@ def create_files(self, request, zarr_id):
queryset = self.get_queryset().select_for_update(of=['self'])
with transaction.atomic():
zarr_archive: ZarrArchive = get_object_or_404(queryset, zarr_id=zarr_id)
if zarr_archive.status == ZarrArchiveStatus.INGESTING:
if zarr_archive.status in [ZarrArchiveStatus.UPLOADED, ZarrArchiveStatus.INGESTING]:
return Response(ZarrArchive.INGEST_ERROR_MSG, status=status.HTTP_400_BAD_REQUEST)

# Deny if the user doesn't have ownership permission
Expand Down Expand Up @@ -301,7 +304,7 @@ def delete_files(self, request, zarr_id):
queryset = self.get_queryset().select_for_update()
with transaction.atomic():
zarr_archive: ZarrArchive = get_object_or_404(queryset, zarr_id=zarr_id)
if zarr_archive.status == ZarrArchiveStatus.INGESTING:
if zarr_archive.status in [ZarrArchiveStatus.UPLOADED, ZarrArchiveStatus.INGESTING]:
return Response(ZarrArchive.INGEST_ERROR_MSG, status=status.HTTP_400_BAD_REQUEST)

if not self.request.user.has_perm('owner', zarr_archive.dandiset):
Expand Down
4 changes: 2 additions & 2 deletions doc/design/zarr-performance-redesign.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ sequenceDiagram
end
Client->>+Server: Finalize Zarr Archive ⚡⚡⚡
Server-->>-Client: PENDING Zarr Archive
Server-->>-Client: UPLOADED Zarr Archive
rect rgb(179, 209, 95)
Server->>+Worker: Compute tree checksum for Zarr Archive
end
loop
Client->>+Server: Check for COMPLETE Zarr Archive
Server-->>-Client: PENDING/INGESTING/COMPLETE Zarr Archive
Server-->>-Client: UPLOADED/INGESTING/COMPLETE Zarr Archive
end
Client->>+Client: Verify zarr checksum with local
Expand Down

0 comments on commit c875d22

Please sign in to comment.