Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UPLOADED state to Zarr models #1698

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions dandiapi/api/tests/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dandiapi.api.services.asset.exceptions import AssetPathConflict
from dandiapi.api.services.publish import publish_asset
from dandiapi.api.tasks.scheduled import validate_pending_asset_metadata
from dandiapi.zarr.models import ZarrArchive
from dandiapi.zarr.models import ZarrArchive, ZarrArchiveStatus
from dandiapi.zarr.tasks import ingest_zarr_archive

from .fuzzy import HTTP_URL_RE, TIMESTAMP_RE, URN_RE, UTC_ISO_TIMESTAMP_RE, UUID_RE
Expand Down Expand Up @@ -1244,25 +1244,32 @@ def test_asset_rest_delete_zarr(

@pytest.mark.django_db
def test_asset_rest_delete_zarr_modified(
api_client, user, draft_version, zarr_archive, zarr_file_factory, storage, monkeypatch
api_client,
user,
draft_version,
zarr_archive_factory,
zarr_file_factory,
storage,
monkeypatch,
):
"""Ensure that a zarr can be associated to an asset, modified, then the asset deleted."""
# Pretend like our zarr was defined with the given storage
monkeypatch.setattr(ZarrArchive, 'storage', storage)

# Assign perms and authenticate user
assign_perm('owner', user, draft_version.dandiset)
dandiset = draft_version.dandiset
assign_perm('owner', user, dandiset)
api_client.force_authenticate(user=user)

# Ensure zarr is ingested
zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED, dandiset=dandiset)
zarr_file_factory(zarr_archive=zarr_archive, size=100)
ingest_zarr_archive(zarr_archive.zarr_id)
zarr_archive.refresh_from_db()

# Create first asset, pointing to zarr
resp = api_client.post(
f'/api/dandisets/{draft_version.dandiset.identifier}'
f'/versions/{draft_version.version}/assets/',
f'/api/dandisets/{dandiset.identifier}' f'/versions/{draft_version.version}/assets/',
{
'metadata': {
'path': 'sample.zarr',
Expand All @@ -1288,7 +1295,7 @@ def test_asset_rest_delete_zarr_modified(

# Delete the asset
resp = api_client.delete(
f'/api/dandisets/{draft_version.dandiset.identifier}/'
f'/api/dandisets/{dandiset.identifier}/'
f'versions/{draft_version.version}/assets/{asset.asset_id}/'
)
assert resp.status_code == 204
Expand Down
2 changes: 1 addition & 1 deletion dandiapi/api/tests/test_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ def test_version_rest_publish_zarr(
api_client.force_authenticate(user=user)

# create and ingest zarr archive
zarr_archive = zarr_archive_factory(dandiset=draft_version.dandiset)
zarr_archive = zarr_archive_factory(dandiset=draft_version.dandiset, status='Uploaded')
zarr_file_factory(zarr_archive=zarr_archive)
ingest_zarr_archive(zarr_archive.zarr_id)
zarr_archive.refresh_from_db()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Generated by Django 4.1.1 on 2023-10-04 18:03

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
('zarr', '0003_zarr_rename_tables_indexes_constraints'),
]

operations = [
migrations.RemoveConstraint(
model_name='embargoedzarrarchive',
name='zarr-embargoedzarrarchive-consistent-checksum-status',
),
migrations.RemoveConstraint(
model_name='zarrarchive',
name='zarr-zarrarchive-consistent-checksum-status',
),
migrations.AlterField(
model_name='embargoedzarrarchive',
name='status',
field=models.CharField(
choices=[
('Pending', 'Pending'),
('Uploaded', 'Uploaded'),
('Ingesting', 'Ingesting'),
('Complete', 'Complete'),
],
default='Pending',
max_length=9,
),
),
migrations.AlterField(
model_name='zarrarchive',
name='status',
field=models.CharField(
choices=[
('Pending', 'Pending'),
('Uploaded', 'Uploaded'),
('Ingesting', 'Ingesting'),
('Complete', 'Complete'),
],
default='Pending',
max_length=9,
),
),
migrations.AddConstraint(
model_name='embargoedzarrarchive',
constraint=models.CheckConstraint(
check=models.Q(
models.Q(
('checksum__isnull', True),
('status__in', ['Pending', 'Uploaded', 'Ingesting']),
),
models.Q(('checksum__isnull', False), ('status', 'Complete')),
_connector='OR',
),
name='zarr-embargoedzarrarchive-consistent-checksum-status',
),
),
migrations.AddConstraint(
model_name='zarrarchive',
constraint=models.CheckConstraint(
check=models.Q(
models.Q(
('checksum__isnull', True),
('status__in', ['Pending', 'Uploaded', 'Ingesting']),
),
models.Q(('checksum__isnull', False), ('status', 'Complete')),
_connector='OR',
),
name='zarr-zarrarchive-consistent-checksum-status',
),
),
]
9 changes: 7 additions & 2 deletions dandiapi/zarr/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
# The status of the zarr ingestion (checksums, size, file count)
class ZarrArchiveStatus(models.TextChoices):
PENDING = 'Pending'
UPLOADED = 'Uploaded'
INGESTING = 'Ingesting'
COMPLETE = 'Complete'


class BaseZarrArchive(TimeStampedModel):
UUID_REGEX = r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}'
INGEST_ERROR_MSG = 'Zarr archive already ingested'
INGEST_ERROR_MSG = 'Zarr archive is currently ingesting or has already ingested'

class Meta:
get_latest_by = 'modified'
Expand All @@ -39,7 +40,11 @@ class Meta:
name='%(app_label)s-%(class)s-consistent-checksum-status',
check=models.Q(
checksum__isnull=True,
status__in=[ZarrArchiveStatus.PENDING, ZarrArchiveStatus.INGESTING],
status__in=[
ZarrArchiveStatus.PENDING,
ZarrArchiveStatus.UPLOADED,
ZarrArchiveStatus.INGESTING,
],
)
| models.Q(checksum__isnull=False, status=ZarrArchiveStatus.COMPLETE),
),
Expand Down
4 changes: 2 additions & 2 deletions dandiapi/zarr/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def ingest_zarr_archive(zarr_id: str, force: bool = False):
# Ensure zarr is in pending state before proceeding
with transaction.atomic():
zarr: ZarrArchive = ZarrArchive.objects.select_for_update().get(zarr_id=zarr_id)
if not force and zarr.status != ZarrArchiveStatus.PENDING:
logger.info(f'{ZarrArchive.INGEST_ERROR_MSG}. Exiting...')
if not force and zarr.status != ZarrArchiveStatus.UPLOADED:
logger.info('Zarrs must be in an UPLOADED state to begin ingestion. Exiting...')
return

# Set as ingesting
Expand Down
17 changes: 11 additions & 6 deletions dandiapi/zarr/tests/test_ingest_zarr_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

@pytest.mark.django_db(transaction=True)
def test_ingest_zarr_archive(zarr_archive_factory, zarr_file_factory):
zarr: ZarrArchive = zarr_archive_factory()
# Create zarr with uploaded status so it can be ingested
zarr: ZarrArchive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
files = [
zarr_file_factory(zarr_archive=zarr, path='foo'),
zarr_file_factory(zarr_archive=zarr, path='bar'),
Expand All @@ -24,7 +25,7 @@ def test_ingest_zarr_archive(zarr_archive_factory, zarr_file_factory):
assert zarr.checksum is None
assert zarr.size == 0
assert zarr.file_count == 0
assert zarr.status == ZarrArchiveStatus.PENDING
assert zarr.status == ZarrArchiveStatus.UPLOADED

# Compute checksum
ingest_zarr_archive(str(zarr.zarr_id))
Expand All @@ -39,7 +40,7 @@ def test_ingest_zarr_archive(zarr_archive_factory, zarr_file_factory):

@pytest.mark.django_db(transaction=True)
def test_ingest_zarr_archive_empty(zarr_archive_factory):
zarr: ZarrArchive = zarr_archive_factory()
zarr: ZarrArchive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)

# Compute checksum
ingest_zarr_archive(str(zarr.zarr_id))
Expand Down Expand Up @@ -81,7 +82,7 @@ def test_ingest_zarr_archive_force(zarr_archive_factory, zarr_file_factory):
@pytest.mark.django_db(transaction=True)
def test_ingest_zarr_archive_assets(zarr_archive_factory, zarr_file_factory, draft_asset_factory):
# Create zarr and asset
zarr: ZarrArchive = zarr_archive_factory()
zarr: ZarrArchive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
asset = draft_asset_factory(zarr=zarr, blob=None, embargoed_blob=None)

# Assert asset size, metadata
Expand All @@ -102,11 +103,14 @@ def test_ingest_zarr_archive_assets(zarr_archive_factory, zarr_file_factory, dra


@pytest.mark.django_db(transaction=True)
def test_ingest_zarr_archive_modified(user, draft_version, zarr_archive, zarr_file_factory):
def test_ingest_zarr_archive_modified(user, draft_version, zarr_archive_factory, zarr_file_factory):
"""Ensure that if the zarr associated to an asset is modified and then ingested, it succeeds."""
assign_perm('owner', user, draft_version.dandiset)

# Ensure zarr is ingested with non-zero size
zarr_archive = zarr_archive_factory(
dandiset=draft_version.dandiset, status=ZarrArchiveStatus.UPLOADED
)
zarr_file_factory(zarr_archive=zarr_archive, size=100)
ingest_zarr_archive(zarr_archive.zarr_id)
zarr_archive.refresh_from_db()
Expand All @@ -128,6 +132,7 @@ def test_ingest_zarr_archive_modified(user, draft_version, zarr_archive, zarr_fi

# Simulate more data uploaded to the zarr
zarr_archive.mark_pending()
zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()
zarr_archive.refresh_from_db()

Expand All @@ -142,7 +147,7 @@ def test_ingest_dandiset_zarrs(dandiset_factory, zarr_archive_factory, zarr_file
for _ in range(10):
zarr_file_factory(
path='foo/a',
zarr_archive=zarr_archive_factory(dandiset=dandiset),
zarr_archive=zarr_archive_factory(dandiset=dandiset, status=ZarrArchiveStatus.UPLOADED),
)

# Run ingest
Expand Down
35 changes: 26 additions & 9 deletions dandiapi/zarr/tests/test_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def test_zarr_rest_create_embargoed_dandiset(


@pytest.mark.django_db
def test_zarr_rest_get(
authenticated_api_client, storage, zarr_archive: ZarrArchive, zarr_file_factory
):
def test_zarr_rest_get(authenticated_api_client, storage, zarr_archive_factory, zarr_file_factory):
# Pretend like ZarrArchive was defined with the given storage
ZarrArchive.storage = storage

zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
zarr_file = zarr_file_factory(zarr_archive=zarr_archive)

# Ingest
Expand Down Expand Up @@ -211,16 +211,18 @@ def test_zarr_rest_delete_file(
authenticated_api_client,
user,
storage,
zarr_archive: ZarrArchive,
zarr_archive_factory,
zarr_file_factory,
):
assign_perm('owner', user, zarr_archive.dandiset)
# Pretend like ZarrArchive was defined with the given storage
ZarrArchive.storage = storage

zarr_file = zarr_file_factory(zarr_archive=zarr_archive)
# Create zarr and assign user perms
zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
assign_perm('owner', user, zarr_archive.dandiset)

# Ingest
# Upload file and ingest
zarr_file = zarr_file_factory(zarr_archive=zarr_archive)
ingest_zarr_archive(zarr_archive.zarr_id)

# Assert file count and size
Expand All @@ -243,6 +245,8 @@ def test_zarr_rest_delete_file(
assert zarr_archive.size == 0

# Re-ingest
zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()
ingest_zarr_archive(zarr_archive.zarr_id)

zarr_archive.refresh_from_db()
Expand All @@ -256,14 +260,16 @@ def test_zarr_rest_delete_file_asset_metadata(
authenticated_api_client,
user,
storage,
zarr_archive: ZarrArchive,
zarr_archive_factory,
zarr_file_factory,
asset_factory,
):
assign_perm('owner', user, zarr_archive.dandiset)
# Pretend like ZarrArchive was defined with the given storage
ZarrArchive.storage = storage

zarr_archive = zarr_archive_factory(status=ZarrArchiveStatus.UPLOADED)
assign_perm('owner', user, zarr_archive.dandiset)

asset = asset_factory(zarr=zarr_archive, blob=None)

zarr_file = zarr_file_factory(zarr_archive=zarr_archive)
Expand All @@ -279,7 +285,13 @@ def test_zarr_rest_delete_file_asset_metadata(
)
assert resp.status_code == 204

# Re-ingest
zarr_archive.refresh_from_db()
zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()
ingest_zarr_archive(zarr_archive.zarr_id)

# Assert now empty
asset.refresh_from_db()
assert asset.full_metadata['digest']['dandi:dandi-zarr-checksum'] == EMPTY_CHECKSUM
assert asset.full_metadata['contentSize'] == 0
Expand Down Expand Up @@ -357,7 +369,12 @@ def test_zarr_rest_delete_missing_file(
]
assert zarr_archive.storage.exists(zarr_archive.s3_path(zarr_file.path))

# Ingest
zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()
ingest_zarr_archive(zarr_archive.zarr_id)

# Check
zarr_archive.refresh_from_db()
assert zarr_archive.file_count == 1
assert zarr_archive.size == zarr_file.size
Expand Down
7 changes: 5 additions & 2 deletions dandiapi/zarr/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ def finalize(self, request, zarr_id):
if zarr_archive.status != ZarrArchiveStatus.PENDING:
return Response(ZarrArchive.INGEST_ERROR_MSG, status=status.HTTP_400_BAD_REQUEST)

zarr_archive.status = ZarrArchiveStatus.UPLOADED
zarr_archive.save()

# Dispatch task
ingest_zarr_archive.delay(zarr_id=zarr_archive.zarr_id)
return Response(None, status=status.HTTP_204_NO_CONTENT)
Expand Down Expand Up @@ -265,7 +268,7 @@ def create_files(self, request, zarr_id):
queryset = self.get_queryset().select_for_update(of=['self'])
with transaction.atomic():
zarr_archive: ZarrArchive = get_object_or_404(queryset, zarr_id=zarr_id)
if zarr_archive.status == ZarrArchiveStatus.INGESTING:
if zarr_archive.status in [ZarrArchiveStatus.UPLOADED, ZarrArchiveStatus.INGESTING]:
return Response(ZarrArchive.INGEST_ERROR_MSG, status=status.HTTP_400_BAD_REQUEST)

# Deny if the user doesn't have ownership permission
Expand Down Expand Up @@ -301,7 +304,7 @@ def delete_files(self, request, zarr_id):
queryset = self.get_queryset().select_for_update()
with transaction.atomic():
zarr_archive: ZarrArchive = get_object_or_404(queryset, zarr_id=zarr_id)
if zarr_archive.status == ZarrArchiveStatus.INGESTING:
if zarr_archive.status in [ZarrArchiveStatus.UPLOADED, ZarrArchiveStatus.INGESTING]:
return Response(ZarrArchive.INGEST_ERROR_MSG, status=status.HTTP_400_BAD_REQUEST)

if not self.request.user.has_perm('owner', zarr_archive.dandiset):
Expand Down
4 changes: 2 additions & 2 deletions doc/design/zarr-performance-redesign.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ sequenceDiagram
end

Client->>+Server: Finalize Zarr Archive ⚡⚡⚡
Server-->>-Client: PENDING Zarr Archive
Server-->>-Client: UPLOADED Zarr Archive

rect rgb(179, 209, 95)
Server->>+Worker: Compute tree checksum for Zarr Archive
end

loop
Client->>+Server: Check for COMPLETE Zarr Archive
Server-->>-Client: PENDING/INGESTING/COMPLETE Zarr Archive
Server-->>-Client: UPLOADED/INGESTING/COMPLETE Zarr Archive
end

Client->>+Client: Verify zarr checksum with local
Expand Down