diff --git a/dandiapi/api/tests/test_asset.py b/dandiapi/api/tests/test_asset.py index d75f02d8e..4d9dd19e7 100644 --- a/dandiapi/api/tests/test_asset.py +++ b/dandiapi/api/tests/test_asset.py @@ -420,6 +420,66 @@ def test_asset_rest_retrieve_no_sha256(api_client, version, asset): ) +@pytest.mark.django_db() +def test_asset_rest_retrieve_embargoed_admin( + api_client, + draft_version_factory, + draft_asset_factory, + admin_user, + storage, + monkeypatch, +): + monkeypatch.setattr(AssetBlob.blob.field, 'storage', storage) + + api_client.force_authenticate(user=admin_user) + version = draft_version_factory(dandiset__embargo_status=Dandiset.EmbargoStatus.EMBARGOED) + ds = version.dandiset + + # Create an extra asset so that there are multiple assets to filter down + asset = draft_asset_factory(blob__embargoed=True) + version.assets.add(asset) + + # Asset View + r = api_client.get(f'/api/assets/{asset.asset_id}/') + assert r.status_code == 200 + + # Nested Asset View + r = api_client.get( + f'/api/dandisets/{ds.identifier}/versions/{version.version}/assets/{asset.asset_id}/' + ) + assert r.status_code == 200 + + +@pytest.mark.django_db() +def test_asset_rest_download_embargoed_admin( + api_client, + draft_version_factory, + draft_asset_factory, + admin_user, + storage, + monkeypatch, +): + monkeypatch.setattr(AssetBlob.blob.field, 'storage', storage) + + api_client.force_authenticate(user=admin_user) + version = draft_version_factory(dandiset__embargo_status=Dandiset.EmbargoStatus.EMBARGOED) + ds = version.dandiset + + # Create an extra asset so that there are multiple assets to filter down + asset = draft_asset_factory(blob__embargoed=True) + version.assets.add(asset) + + # Asset View + r = api_client.get(f'/api/assets/{asset.asset_id}/download/') + assert r.status_code == 302 + + # Nested Asset View + r = api_client.get( + f'/api/dandisets/{ds.identifier}/versions/{version.version}/assets/{asset.asset_id}/download/' + ) + assert r.status_code == 302 + + @pytest.mark.django_db() def test_asset_rest_info(api_client, version, asset): version.assets.add(asset) diff --git a/dandiapi/api/views/asset.py b/dandiapi/api/views/asset.py index 5a3502e53..814c98dea 100644 --- a/dandiapi/api/views/asset.py +++ b/dandiapi/api/views/asset.py @@ -84,23 +84,32 @@ def raise_if_unauthorized(self): # We need to check the dandiset to see if it's embargoed, and if so whether or not the # user has ownership asset_id = self.kwargs.get('asset_id') - if asset_id is not None: - asset = get_object_or_404(Asset.objects.select_related('blob'), asset_id=asset_id) - # TODO: When EmbargoedZarrArchive is implemented, check that as well - if asset.blob and asset.blob.embargoed: - if not self.request.user.is_authenticated: - # Clients must be authenticated to access it - raise NotAuthenticated - - # User must be an owner on any of the dandisets this asset belongs to - asset_dandisets = Dandiset.objects.filter(versions__in=asset.versions.all()) - asset_dandisets_owned_by_user = DandisetUserObjectPermission.objects.filter( - content_object__in=asset_dandisets, - user=self.request.user, - permission__codename='owner', - ) - if not asset_dandisets_owned_by_user.exists(): - raise PermissionDenied + if asset_id is None: + return + + asset = get_object_or_404(Asset.objects.select_related('blob'), asset_id=asset_id) + + # TODO: When EmbargoedZarrArchive is implemented, check that as well + if not (asset.blob and asset.blob.embargoed): + return + + # Clients must be authenticated to access it + if not self.request.user.is_authenticated: + raise NotAuthenticated + + # Admins are allowed to access any embargoed asset blob + if self.request.user.is_superuser: + return + + # User must be an owner on any of the dandisets this asset belongs to + asset_dandisets = Dandiset.objects.filter(versions__in=asset.versions.all()) + asset_dandisets_owned_by_user = DandisetUserObjectPermission.objects.filter( + content_object__in=asset_dandisets, + user=self.request.user, + permission__codename='owner', + ) + if not asset_dandisets_owned_by_user.exists(): + raise PermissionDenied def get_queryset(self): self.raise_if_unauthorized()