From cb4636797ba0176e01d6069ec67735770c62b376 Mon Sep 17 00:00:00 2001 From: Thomas Steen Rasmussen Date: Sun, 14 Apr 2024 17:34:16 +0200 Subject: [PATCH] use groups to manage permissions (creators, moderators, curators), add unit tests for FileAdmin --- pyproject.toml | 2 + src/albums/tests.py | 33 +-- src/bma/environment_settings.py.ci | 4 + src/bma/environment_settings.py.dist | 4 + src/bornhack_allauth_provider/adapters.py | 6 + src/files/admin.py | 9 +- src/files/api.py | 8 +- .../migrations/0006_alter_basefile_options.py | 17 ++ src/files/models.py | 18 ++ src/files/tests.py | 262 +++++++++++++----- ...003_remove_user_is_bma_creator_and_more.py | 25 ++ src/users/models.py | 12 - src/utils/admin.py | 6 +- src/utils/tests.py | 44 ++- 14 files changed, 338 insertions(+), 112 deletions(-) create mode 100644 src/files/migrations/0006_alter_basefile_options.py create mode 100644 src/users/migrations/0003_remove_user_is_bma_creator_and_more.py diff --git a/pyproject.toml b/pyproject.toml index 3bc3f7f..a0cd88a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,6 +94,8 @@ convention = "google" "tests.py" = [ "S101", # https://docs.astral.sh/ruff/rules/assert/ "PLR2004", # https://docs.astral.sh/ruff/rules/magic-value-comparison/ + "PT009", # https://docs.astral.sh/ruff/rules/pytest-unittest-assertion/ + "S106", # https://docs.astral.sh/ruff/rules/hardcoded-password-func-arg/ ] "*/migrations/*" = [ "D" # https://docs.astral.sh/ruff/rules/#pydocstyle-d diff --git a/src/albums/tests.py b/src/albums/tests.py index 9643231..8ed6076 100644 --- a/src/albums/tests.py +++ b/src/albums/tests.py @@ -27,7 +27,7 @@ def test_album_create( "description": description, "files": files if files else [], }, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, content_type="application/json", ) assert response.status_code == 201 @@ -55,7 +55,7 @@ def test_album_update(self) -> None: "description": "description here", "files": self.files[0:2], }, - headers={"authorization": self.user2.auth}, + headers={"authorization": self.user0.auth}, content_type="application/json", ) assert response.status_code == 403 @@ -68,7 +68,7 @@ def test_album_update(self) -> None: "description": "description here", "files": self.files[0:2], }, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, content_type="application/json", ) assert response.status_code == 202 @@ -81,7 +81,7 @@ def test_album_update(self) -> None: "description": "description here", "files": self.files[0:2], }, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, content_type="application/json", ) assert response.status_code == 200 @@ -93,7 +93,7 @@ def test_album_update(self) -> None: response = self.client.patch( reverse("api-v1-json:album_get", kwargs={"album_uuid": self.album_uuid}), {"files": self.files}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, content_type="application/json", ) assert response.status_code == 200 @@ -103,7 +103,7 @@ def test_album_update(self) -> None: response = self.client.patch( reverse("api-v1-json:album_get", kwargs={"album_uuid": self.album_uuid}), {"files": []}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, content_type="application/json", ) assert response.status_code == 200 @@ -122,21 +122,21 @@ def test_album_delete(self) -> None: # test with wrong auth response = self.client.delete( reverse("api-v1-json:album_get", kwargs={"album_uuid": self.album_uuid}), - headers={"authorization": self.user2.auth}, + headers={"authorization": self.user0.auth}, ) assert response.status_code == 403 # delete the album, check mode response = self.client.delete( reverse("api-v1-json:album_get", kwargs={"album_uuid": self.album_uuid}) + "?check=true", - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, ) assert response.status_code == 202 # delete the album response = self.client.delete( reverse("api-v1-json:album_get", kwargs={"album_uuid": self.album_uuid}), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, ) assert response.status_code == 204 @@ -145,7 +145,7 @@ def test_album_get(self) -> None: self.test_album_create_with_files() response = self.client.get( reverse("api-v1-json:album_get", kwargs={"album_uuid": self.album_uuid}), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, ) assert response.status_code == 200 @@ -153,7 +153,7 @@ def test_album_list(self) -> None: """Get album list from the API.""" for i in range(10): self.test_album_create_with_files(title=f"album{i}") - response = self.client.get(reverse("api-v1-json:album_list"), headers={"authorization": self.user1.auth}) + response = self.client.get(reverse("api-v1-json:album_list"), headers={"authorization": self.curator6.auth}) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 10 @@ -161,22 +161,23 @@ def test_album_list(self) -> None: response = self.client.get( reverse("api-v1-json:album_list"), data={"files": [self.files[0], response.json()["bma_response"][1]["files"][0]]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, ) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 0 + # test with files in the same album response = self.client.get( reverse("api-v1-json:album_list"), data={"files": [self.files[0], self.files[1]]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, ) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 1 # test search response = self.client.get( - reverse("api-v1-json:album_list"), data={"search": "album4"}, headers={"authorization": self.user1.auth} + reverse("api-v1-json:album_list"), data={"search": "album4"}, headers={"authorization": self.curator6.auth} ) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 1 @@ -185,7 +186,7 @@ def test_album_list(self) -> None: response = self.client.get( reverse("api-v1-json:album_list"), data={"sorting": "created_desc"}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, ) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 10 @@ -195,7 +196,7 @@ def test_album_list(self) -> None: response = self.client.get( reverse("api-v1-json:album_list"), data={"sorting": "title_asc", "offset": 5}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.curator6.auth}, ) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 5 diff --git a/src/bma/environment_settings.py.ci b/src/bma/environment_settings.py.ci index 7f07a12..368921d 100644 --- a/src/bma/environment_settings.py.ci +++ b/src/bma/environment_settings.py.ci @@ -99,3 +99,7 @@ DEFAULT_THUMBNAIL_URLS = { CORS_ALLOWED_ORIGINS = [ "http://127.0.0.1:8000", ] + +BMA_CREATOR_GROUP_NAME = "creators" +BMA_MODERATOR_GROUP_NAME = "moderators" +BMA_CURATOR_GROUP_NAME = "curators" diff --git a/src/bma/environment_settings.py.dist b/src/bma/environment_settings.py.dist index 544dec7..7d76f2a 100644 --- a/src/bma/environment_settings.py.dist +++ b/src/bma/environment_settings.py.dist @@ -110,3 +110,7 @@ DEFAULT_THUMBNAIL_URLS = { } CORS_ALLOWED_ORIGINS = env.list("DJANGO_CORS_ALLOWED_ORIGINS", default="{{ django_cors_allowed_origins|join(',')|default('http://127.0.0.1:8000,http://localhost:8000') }}") + +BMA_CREATOR_GROUP_NAME = env.str("BMA_CREATOR_GROUP_NAME", default="{{ django_bma_creator_group_name|default('creators') }}") +BMA_MODERATOR_GROUP_NAME = env.str("BMA_MODERATOR_GROUP_NAME", default="{{ django_bma_moderator_group_name|default('moderators') }}") +BMA_CURATOR_GROUP_NAME = env.str("BMA_CURATOR_GROUP_NAME", default="{{ django_bma_curator_group_name|default('curators') }}") diff --git a/src/bornhack_allauth_provider/adapters.py b/src/bornhack_allauth_provider/adapters.py index 8b0b1a7..7ebadf0 100644 --- a/src/bornhack_allauth_provider/adapters.py +++ b/src/bornhack_allauth_provider/adapters.py @@ -3,6 +3,8 @@ from allauth.account.utils import user_username from allauth.socialaccount.adapter import DefaultSocialAccountAdapter from allauth.socialaccount.models import SocialLogin +from django.conf import settings +from django.contrib.auth.models import Group from django.http import HttpRequest @@ -24,4 +26,8 @@ def populate_user(self, request: HttpRequest, sociallogin: SocialLogin, data: di # set description on the user object user_field(sociallogin.user, "description", data.get("description")) + # add to curators group + curators, created = Group.objects.get_or_create(name=settings.BMA_CURATOR_GROUP_NAME) + curators.user_set.add(sociallogin.user) + return sociallogin.user diff --git a/src/files/admin.py b/src/files/admin.py index 169403e..bfe7539 100644 --- a/src/files/admin.py +++ b/src/files/admin.py @@ -16,7 +16,7 @@ @admin.register(BaseFile) class BaseFileAdmin(admin.ModelAdmin[BaseFile]): - """The ModelAdmin class to manage files.""" + """The ModelAdmin class to manage files. Used by the regular admin and FileAdmin.""" readonly_fields = ("status", "original_filename", "file_size", "license", "owner") @@ -43,6 +43,8 @@ def get_actions(self, request: HttpRequest) -> dict[str, tuple[Callable[..., str valid_actions = actions.copy() for action in actions: if not get_objects_for_user(request.user, f"{action}_basefile", klass=BaseFile).exists(): + # user does not have permission to perform this action on any objects, + # remove it from the actions list del valid_actions[action] return valid_actions @@ -125,8 +127,8 @@ def send_message(self, request: HttpRequest, selected: int, valid: int, updated: self.message_user( request, f"{selected} files selected to be {action}, " - "out of those {valid} files had needed permission and expected status, " - "and out of those {updated} files were successfully {action}", + f"out of those {valid} files had needed permission and expected status, " + f"and out of those {updated} files were successfully {action}", status, ) @@ -210,5 +212,6 @@ def thumbnail(self, obj: BaseFile) -> str: except AttributeError: return "" + # register the BaseFile model in the file_admin file_admin.register(BaseFile, BaseFileAdmin) diff --git a/src/files/api.py b/src/files/api.py index 01a606c..5222f40 100644 --- a/src/files/api.py +++ b/src/files/api.py @@ -12,7 +12,6 @@ from django.shortcuts import get_object_or_404 from django.utils import timezone from documents.models import Document -from guardian.shortcuts import assign_perm from guardian.shortcuts import get_objects_for_user from ninja import Query from ninja import Router @@ -92,7 +91,7 @@ def upload(request: HttpRequest, f: UploadedFile, metadata: UploadRequestSchema) # save everything uploaded_file.save() - # if the filetype is picture then use the picture itself as thumbnail, + # if the filetype is picture then use the pictures large_thumbnail as thumbnail, # this has to be done after .save() to ensure the uuid filename and # full path is passed to the imagekit namer if ( @@ -104,10 +103,9 @@ def upload(request: HttpRequest, f: UploadedFile, metadata: UploadRequestSchema) uploaded_file.save(update_fields=["thumbnail_url", "updated"]) # assign permissions (publish_basefile and unpublish_basefile are assigned after moderation) - assign_perm("view_basefile", request.user, uploaded_file) - assign_perm("change_basefile", request.user, uploaded_file) - assign_perm("delete_basefile", request.user, uploaded_file) + uploaded_file.add_initial_permissions() + # all good return 201, {"bma_response": uploaded_file, "message": f"File {uploaded_file.uuid} uploaded OK!"} diff --git a/src/files/migrations/0006_alter_basefile_options.py b/src/files/migrations/0006_alter_basefile_options.py new file mode 100644 index 0000000..3b545a6 --- /dev/null +++ b/src/files/migrations/0006_alter_basefile_options.py @@ -0,0 +1,17 @@ +# Generated by Django 5.0.3 on 2024-04-14 13:03 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('files', '0005_alter_basefile_options_alter_basefile_status'), + ] + + operations = [ + migrations.AlterModelOptions( + name='basefile', + options={'ordering': ('created',), 'permissions': (('unapprove_basefile', 'Unapprove file'), ('approve_basefile', 'Approve file'), ('unpublish_basefile', 'Unpublish file'), ('publish_basefile', 'Publish file')), 'verbose_name': 'file', 'verbose_name_plural': 'files'}, + ), + ] diff --git a/src/files/models.py b/src/files/models.py index 7937e94..0cb2805 100644 --- a/src/files/models.py +++ b/src/files/models.py @@ -4,6 +4,8 @@ import uuid from django.conf import settings +from django.contrib.auth import get_user_model +from django.contrib.auth.models import Group from django.core.exceptions import ValidationError from django.db import models from django.http import HttpRequest @@ -20,6 +22,8 @@ logger = logging.getLogger("bma") +User = get_user_model() + class StatusChoices(models.TextChoices): """The possible status choices for a file.""" @@ -285,3 +289,17 @@ def publish(self) -> int: def unpublish(self) -> int: """Unpublish this file.""" return self.update_status(new_status="UNPUBLISHED") + + def add_initial_permissions(self) -> None: + """Add initial permissions for newly uploaded files.""" + # add owner permissions + assign_perm("view_basefile", self.owner, self) + assign_perm("change_basefile", self.owner, self) + assign_perm("delete_basefile", self.owner, self) + moderators, created = Group.objects.get_or_create(name=settings.BMA_MODERATOR_GROUP_NAME) + if created: + logger.debug("Created new group 'moderators'") + # add moderator permissions + assign_perm("view_basefile", moderators, self) + assign_perm("approve_basefile", moderators, self) + assign_perm("unapprove_basefile", moderators, self) diff --git a/src/files/tests.py b/src/files/tests.py index 522c351..692f29c 100644 --- a/src/files/tests.py +++ b/src/files/tests.py @@ -20,7 +20,7 @@ class TestFilesApi(ApiTestBase): def test_api_auth_bearer_token(self) -> None: """Test getting a token.""" - response = self.client.get("/o/authorized_tokens/", headers={"authorization": self.user1.auth}) + response = self.client.get("/o/authorized_tokens/", headers={"authorization": self.creator2.auth}) assert response.status_code == 200 assert "revoke" in response.content.decode("utf-8") @@ -30,8 +30,8 @@ def test_api_auth_get_refresh_token(self) -> None: "/o/token/", { "grant_type": "refresh_token", - "client_id": f"client_id_{self.user1.username}", - "refresh_token": self.user1.tokeninfo["refresh_token"], + "client_id": f"client_id_{self.creator2.username}", + "refresh_token": self.creator2.tokeninfo["refresh_token"], }, ) assert response.status_code == 200 @@ -39,7 +39,7 @@ def test_api_auth_get_refresh_token(self) -> None: def test_api_auth_django_session(self) -> None: """Test getting authorised tokens.""" - self.client.force_login(self.user1) + self.client.force_login(self.creator2) response = self.client.get("/o/authorized_tokens/") assert response.status_code == 200 assert "revoke" in response.content.decode("utf-8") @@ -54,7 +54,7 @@ def test_file_upload(self) -> None: def test_file_list(self) -> None: # noqa: PLR0915 """Test the file_list endpoint.""" files = [self.file_upload(title=f"title{i}") for i in range(15)] - response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user1.auth}) + response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.creator2.auth}) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 15 @@ -62,7 +62,7 @@ def test_file_list(self) -> None: # noqa: PLR0915 response = self.client.get( reverse("api-v1-json:file_list"), data={"limit": 5, "sorting": "title_asc"}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 5 assert response.json()["bma_response"][0]["title"] == "title0" @@ -72,7 +72,7 @@ def test_file_list(self) -> None: # noqa: PLR0915 response = self.client.get( reverse("api-v1-json:file_list"), data={"limit": 1, "sorting": "created_desc"}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.json()["bma_response"][0]["title"] == "title14" @@ -80,7 +80,7 @@ def test_file_list(self) -> None: # noqa: PLR0915 response = self.client.get( reverse("api-v1-json:file_list"), data={"offset": 5, "sorting": "created_asc"}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.json()["bma_response"][0]["title"] == "title5" assert response.json()["bma_response"][4]["title"] == "title9" @@ -88,20 +88,20 @@ def test_file_list(self) -> None: # noqa: PLR0915 # test owner filter response = self.client.get( reverse("api-v1-json:file_list"), - data={"owners": [self.user1.uuid, self.user2.uuid]}, - headers={"authorization": self.user1.auth}, + data={"owners": [self.creator2.uuid, self.user0.uuid]}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 15 response = self.client.get( reverse("api-v1-json:file_list"), - data={"owners": [self.user2.uuid]}, - headers={"authorization": self.user1.auth}, + data={"owners": [self.user0.uuid]}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 0 # test search response = self.client.get( - reverse("api-v1-json:file_list"), data={"search": "title7"}, headers={"authorization": self.user1.auth} + reverse("api-v1-json:file_list"), data={"search": "title7"}, headers={"authorization": self.creator2.auth} ) assert len(response.json()["bma_response"]) == 1 assert response.json()["bma_response"][0]["title"] == "title7" @@ -113,7 +113,7 @@ def test_file_list(self) -> None: # noqa: PLR0915 "title": "album title here", "files": files[3:6], }, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, content_type="application/json", ) assert response.status_code == 201 @@ -123,7 +123,7 @@ def test_file_list(self) -> None: # noqa: PLR0915 response = self.client.get( reverse("api-v1-json:file_list"), data={"albums": [self.album_uuid]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 3 @@ -133,7 +133,7 @@ def test_file_list(self) -> None: # noqa: PLR0915 { "title": "another album title here", }, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, content_type="application/json", ) assert response.status_code == 201 @@ -143,33 +143,33 @@ def test_file_list(self) -> None: # noqa: PLR0915 response = self.client.get( reverse("api-v1-json:file_list"), data={"albums": [self.album_uuid, uuid]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 3 # test file size filter response = self.client.get( - reverse("api-v1-json:file_list"), data={"size": 9478}, headers={"authorization": self.user1.auth} + reverse("api-v1-json:file_list"), data={"size": 9478}, headers={"authorization": self.creator2.auth} ) assert len(response.json()["bma_response"]) == 15 # test file size_lt filter response = self.client.get( - reverse("api-v1-json:file_list"), data={"size_lt": 10000}, headers={"authorization": self.user1.auth} + reverse("api-v1-json:file_list"), data={"size_lt": 10000}, headers={"authorization": self.creator2.auth} ) assert len(response.json()["bma_response"]) == 15 response = self.client.get( - reverse("api-v1-json:file_list"), data={"size_lt": 1000}, headers={"authorization": self.user1.auth} + reverse("api-v1-json:file_list"), data={"size_lt": 1000}, headers={"authorization": self.creator2.auth} ) assert len(response.json()["bma_response"]) == 0 # test file size_gt filter response = self.client.get( - reverse("api-v1-json:file_list"), data={"size_gt": 10000}, headers={"authorization": self.user1.auth} + reverse("api-v1-json:file_list"), data={"size_gt": 10000}, headers={"authorization": self.creator2.auth} ) assert len(response.json()["bma_response"]) == 0 response = self.client.get( - reverse("api-v1-json:file_list"), data={"size_gt": 1000}, headers={"authorization": self.user1.auth} + reverse("api-v1-json:file_list"), data={"size_gt": 1000}, headers={"authorization": self.creator2.auth} ) assert len(response.json()["bma_response"]) == 15 @@ -177,13 +177,13 @@ def test_file_list(self) -> None: # noqa: PLR0915 response = self.client.get( reverse("api-v1-json:file_list"), data={"filetypes": ["picture"]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 15 response = self.client.get( reverse("api-v1-json:file_list"), data={"filetypes": ["audio", "video", "document"]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 0 @@ -191,13 +191,13 @@ def test_file_list(self) -> None: # noqa: PLR0915 response = self.client.get( reverse("api-v1-json:file_list"), data={"licenses": ["CC_ZERO_1_0"]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 15 response = self.client.get( reverse("api-v1-json:file_list"), data={"licenses": ["CC_BY_4_0", "CC_BY_SA_4_0"]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 0 @@ -206,7 +206,7 @@ def test_file_list_permissions(self) -> None: files = [self.file_upload(title=f"title{i}") for i in range(15)] # no files should be visible - response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user2.auth}) + response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user0.auth}) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 0 @@ -218,14 +218,14 @@ def test_file_list_permissions(self) -> None: # attempt to publish a file before approval response = self.client.patch( reverse("api-v1-json:publish_file", kwargs={"file_uuid": files[0]}), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 403 # approve the file without permission response = self.client.patch( reverse("api-v1-json:approve_file", kwargs={"file_uuid": files[0]}), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 403 @@ -254,26 +254,26 @@ def test_file_list_permissions(self) -> None: response = self.client.get( reverse("api-v1-json:file_list"), data={"statuses": ["UNPUBLISHED"]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 1 # publish a file, check mode response = self.client.patch( reverse("api-v1-json:publish_file", kwargs={"file_uuid": files[0]}) + "?check=true", - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 202 # publish the file response = self.client.patch( reverse("api-v1-json:publish_file", kwargs={"file_uuid": files[0]}), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 200 # make sure someone else can see it - response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user2.auth}) + response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user0.auth}) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 1 @@ -287,26 +287,26 @@ def test_file_list_permissions(self) -> None: # unpublish the file without permission response = self.client.patch( reverse("api-v1-json:unpublish_file", kwargs={"file_uuid": files[0]}), - headers={"authorization": self.user2.auth}, + headers={"authorization": self.user0.auth}, ) assert response.status_code == 403 # unpublish the file, check mode response = self.client.patch( reverse("api-v1-json:unpublish_file", kwargs={"file_uuid": files[0]}) + "?check=true", - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 202 # unpublish the file response = self.client.patch( reverse("api-v1-json:unpublish_file", kwargs={"file_uuid": files[0]}), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 200 # make sure it is not visible anymore - response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user2.auth}) + response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user0.auth}) assert response.status_code == 200 assert len(response.json()["bma_response"]) == 0 @@ -322,7 +322,7 @@ def test_metadata_get(self) -> None: self.file_upload() response = self.client.get( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 200 assert "uuid" in response.json()["bma_response"] @@ -331,15 +331,15 @@ def test_metadata_get(self) -> None: def test_file_download(self) -> None: """Test downloading a file after uploading it.""" self.file_upload() - metadata = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user1.auth}).json()[ - "bma_response" - ][0] + metadata = self.client.get( + reverse("api-v1-json:file_list"), headers={"authorization": self.creator2.auth} + ).json()["bma_response"][0] url = metadata["links"]["downloads"]["original"] # try download of unpublished file without auth response = self.client.get(url) assert response.status_code == 403 # try again with auth - self.client.force_login(self.user1) + self.client.force_login(self.creator2) response = self.client.get(url) assert response.status_code == 200 assert response["content-type"] == "image/png" @@ -351,7 +351,7 @@ def test_file_metadata_update(self) -> None: self.file_upload() response = self.client.get( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 200 original_metadata = response.json()["bma_response"] @@ -375,7 +375,7 @@ def test_file_metadata_update(self) -> None: response = self.client.put( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), updates, - headers={"authorization": f"Bearer {self.user2.tokeninfo['access_token']}"}, + headers={"authorization": f"Bearer {self.user0.tokeninfo['access_token']}"}, content_type="application/json", ) assert response.status_code == 403 @@ -384,7 +384,7 @@ def test_file_metadata_update(self) -> None: response = self.client.put( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}) + "?check=true", updates, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, content_type="application/json", ) assert response.status_code == 202 @@ -393,7 +393,7 @@ def test_file_metadata_update(self) -> None: response = self.client.put( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), updates, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, content_type="application/json", ) assert response.status_code == 200 @@ -414,7 +414,7 @@ def test_file_metadata_update(self) -> None: response = self.client.put( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), {"thumbnail_url": "/wrong/value.tiff"}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, content_type="application/json", ) assert response.status_code == 422 @@ -423,7 +423,7 @@ def test_file_metadata_update(self) -> None: response = self.client.patch( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), {"source": "outer space"}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, content_type="application/json", ) assert response.status_code == 422 @@ -431,7 +431,7 @@ def test_file_metadata_update(self) -> None: response = self.client.patch( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), {"source": "https://example.com/foo.png"}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, content_type="application/json", ) assert response.status_code == 200 @@ -444,7 +444,7 @@ def test_file_metadata_update(self) -> None: response = self.client.patch( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), {"thumbnail_url": "/foo/evil.ext"}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, content_type="application/json", ) assert response.status_code == 422 @@ -473,21 +473,21 @@ def test_file_delete(self) -> None: # test with wrong auth response = self.client.delete( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), - headers={"authorization": f"Bearer {self.user2.tokeninfo['access_token']}"}, + headers={"authorization": f"Bearer {self.user0.tokeninfo['access_token']}"}, ) assert response.status_code == 403 # delete file, check mode response = self.client.delete( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}) + "?check=true", - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 202 # delete file response = self.client.delete( reverse("api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 204 @@ -498,14 +498,15 @@ def test_metadata_get_404(self) -> None: "api-v1-json:file_get", kwargs={"file_uuid": "a35ce7c9-f814-46ca-8c4e-87b992e15819"}, ), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 404 def test_metadata_get_validationerror(self) -> None: """Get file metadata get with something that is not a uuid.""" response = self.client.get( - reverse("api-v1-json:file_get", kwargs={"file_uuid": "notuuid"}), headers={"authorization": self.user1.auth} + reverse("api-v1-json:file_get", kwargs={"file_uuid": "notuuid"}), + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 422 @@ -517,7 +518,7 @@ def test_metadata_get_403(self) -> None: "api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}, ), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert response.status_code == 200 response = self.client.get( @@ -525,7 +526,7 @@ def test_metadata_get_403(self) -> None: "api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}, ), - headers={"authorization": f"Bearer {self.user2.tokeninfo['access_token']}"}, + headers={"authorization": f"Bearer {self.user0.tokeninfo['access_token']}"}, ) assert response.status_code == 403 response = self.client.get( @@ -540,13 +541,13 @@ def test_approve_files(self) -> None: """Approve multiple files.""" for _ in range(10): self.file_upload() - response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.user1.auth}) + response = self.client.get(reverse("api-v1-json:file_list"), headers={"authorization": self.creator2.auth}) files = [f["uuid"] for f in response.json()["bma_response"]] # first try with no permissions response = self.client.patch( reverse("api-v1-json:approve_files"), {"files": files[0:5]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, content_type="application/json", ) assert response.status_code == 403 @@ -582,7 +583,7 @@ def test_approve_files(self) -> None: response = self.client.get( reverse("api-v1-json:file_list"), data={"statuses": ["UNPUBLISHED"]}, - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, ) assert len(response.json()["bma_response"]) == 5 @@ -596,7 +597,142 @@ def test_file_missing_on_disk(self) -> None: "api-v1-json:file_get", kwargs={"file_uuid": self.file_uuid}, ), - headers={"authorization": self.user1.auth}, + headers={"authorization": self.creator2.auth}, + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["bma_response"]["size_bytes"], 0) + + +class FileAdminTests(ApiTestBase): + """Tests for the FileAdmin.""" + + def test_file_list_status_code(self) -> None: + """Test the access controls for the list page in the FileAdmin.""" + url = reverse("file_admin:files_basefile_changelist") + # try accessing the file_admin without a login + response = self.client.get(url) + self.assertEqual(response.status_code, 302) + # try accessing the file_admin with a user without permissions for it + self.client.login(username="user0", password="secret") + response = self.client.get(url) + self.assertEqual(response.status_code, 302) + # try accessing the file_admin with a user with is_creator=True + self.client.login(username="creator2", password="secret") + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # try accessing the file_admin with a user with is_moderator=True + self.client.login(username="moderator4", password="secret") + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # try accessing the file_admin with a user with is_curator=True + self.client.login(username="curator6", password="secret") + response = self.client.get(url) + self.assertEqual(response.status_code, 302) + + def test_file_list_html(self) -> None: + """Test the file list page in the FileAdmin.""" + # upload some files + self.files = [self.file_upload() for _ in range(10)] + for _ in range(10): + self.files.append(self.file_upload(owner="creator3")) + + # the superuser can see all files + url = reverse("file_admin:files_basefile_changelist") + self.client.login(username="superuser", password="secret") + response = self.client.get(url) + self.assertInHTML( + '

20 files

', response.content.decode(), msg_prefix="superuser can not see 20 files" + ) + + # each creator can see 10 files + for c in ["creator2", "creator3"]: + self.client.login(username=c, password="secret") + response = self.client.get(url) + self.assertInHTML( + '

10 files

', + response.content.decode(), + msg_prefix=f"creator {c} can not see 10 files", + ) + + # each moderator can see all 20 files + for m in ["moderator4", "moderator5"]: + self.client.login(username=m, password="secret") + response = self.client.get(url) + self.assertInHTML( + '

20 files

', + response.content.decode(), + msg_prefix=f"moderator {m} can not see 20 files", + ) + + # make moderator4 approve 5 of the files owned by creator2 + data = {"action": "approve", "_selected_action": self.files[:5]} + self.client.login(username="moderator4", password="secret") + response = self.client.post(url, data, follow=True) + self.assertEqual(response.status_code, 200) + self.assertInHTML( + '

20 files

', + response.content.decode(), + msg_prefix=f"moderator {m} can not see 20 files", + ) + + # test filtering by status to show the approved files + response = self.client.get(url + "?status__exact=UNPUBLISHED") + self.assertInHTML( + '

5 files

', response.content.decode(), msg_prefix="can not see 5 unpublished files" + ) + + # each creator can still see 10 files + for c in ["creator2", "creator3"]: + self.client.login(username=c, password="secret") + response = self.client.get(url) + self.assertInHTML( + '

10 files

', + response.content.decode(), + msg_prefix=f"creator {c} can not see 10 files", + ) + + # make creator2 publish the 5 approved files + data = {"action": "publish", "_selected_action": self.files[:5]} + self.client.login(username="creator2", password="secret") + response = self.client.post(url, data, follow=True) + self.assertEqual(response.status_code, 200) + self.assertInHTML( + '

10 files

', response.content.decode(), msg_prefix="creator2 can not see 10 files" + ) + response = self.client.get(url + "?status__exact=PUBLISHED") + self.assertInHTML( + '

5 files

', response.content.decode(), msg_prefix="can not see 5 published files" + ) + + # make creator2 unpublish the 5 approved files + data = {"action": "unpublish", "_selected_action": self.files[:5]} + response = self.client.post(url, data, follow=True) + self.assertEqual(response.status_code, 200) + self.assertInHTML( + "5 files selected to be unpublished, " + "out of those 5 files had needed permission and expected status, " + "and out of those 5 files were successfully unpublished", + response.content.decode(), + msg_prefix="unpublished message not found", + ) + self.assertInHTML( + '

10 files

', response.content.decode(), msg_prefix="creator2 can not see 10 files" + ) + response = self.client.get(url + "?status__exact=UNPUBLISHED") + self.assertInHTML( + '

5 files

', + response.content.decode(), + msg_prefix="creator2 can not see 5 unpublished files after unpublishing", + ) + + # make moderator4 unapprove 5 of the files owned by creator2 + data = {"action": "unapprove", "_selected_action": self.files[:5]} + self.client.login(username="moderator4", password="secret") + response = self.client.post(url, data, follow=True) + self.assertEqual(response.status_code, 200) + response = self.client.get(url + "?status__exact=PENDING_MODERATION") + self.assertInHTML( + '

20 files

', + response.content.decode(), + msg_prefix=f"moderator {m} can not see 20 files pending moderation", ) - assert response.status_code == 200 - assert response.json()["bma_response"]["size_bytes"] == 0 diff --git a/src/users/migrations/0003_remove_user_is_bma_creator_and_more.py b/src/users/migrations/0003_remove_user_is_bma_creator_and_more.py new file mode 100644 index 0000000..7edb42f --- /dev/null +++ b/src/users/migrations/0003_remove_user_is_bma_creator_and_more.py @@ -0,0 +1,25 @@ +# Generated by Django 5.0.3 on 2024-04-14 13:03 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('users', '0002_user_is_bma_creator_user_is_bma_curator_and_more'), + ] + + operations = [ + migrations.RemoveField( + model_name='user', + name='is_bma_creator', + ), + migrations.RemoveField( + model_name='user', + name='is_bma_curator', + ), + migrations.RemoveField( + model_name='user', + name='is_bma_moderator', + ), + ] diff --git a/src/users/models.py b/src/users/models.py index 133d879..f6f2998 100644 --- a/src/users/models.py +++ b/src/users/models.py @@ -17,15 +17,3 @@ class User(AbstractUser): # type: ignore[django-manager-missing] description = models.TextField( help_text="The description field of the user profile on the BornHack website.", ) - is_bma_moderator = models.BooleanField( - help_text="User can approve/unapprove files in BMA", - default=False, - ) - is_bma_creator = models.BooleanField( - help_text="User can upload files to BMA", - default=False, - ) - is_bma_curator = models.BooleanField( - help_text="User can create albums and tag files in BMA", - default=False, - ) diff --git a/src/utils/admin.py b/src/utils/admin.py index f9710bd..f56f043 100644 --- a/src/utils/admin.py +++ b/src/utils/admin.py @@ -1,5 +1,7 @@ """The FileAdminSite used for creators and moderators.""" +from django.conf import settings from django.contrib import admin +from django.contrib.auth.models import Group from django.http import HttpRequest @@ -12,8 +14,10 @@ class FileAdminSite(admin.AdminSite): def has_permission(self, request: HttpRequest) -> bool: """The FileAdminSite requires is_creator, is_moderator, or is_staff to be True on the User object.""" + creators, created = Group.objects.get_or_create(name=settings.BMA_CREATOR_GROUP_NAME) + moderators, created = Group.objects.get_or_create(name=settings.BMA_MODERATOR_GROUP_NAME) return request.user.is_authenticated and any( - [request.user.is_bma_creator, request.user.is_bma_moderator, request.user.is_staff] + [creators in request.user.groups.all(), moderators in request.user.groups.all(), request.user.is_staff] ) diff --git a/src/utils/tests.py b/src/utils/tests.py index 7e5446a..b0dd659 100644 --- a/src/utils/tests.py +++ b/src/utils/tests.py @@ -10,6 +10,7 @@ from urllib.parse import urlsplit from django.conf import settings +from django.contrib.auth.models import Group from django.test import Client from django.test import TestCase from django.urls import reverse @@ -33,15 +34,25 @@ def setUpTestData(cls) -> None: logging.disable(logging.CRITICAL) cls.client = Client(enforce_csrf_checks=True) - # create 4 regular users and 1 superuser - for i in range(6): - username = "superuser" if i == 5 else f"user{i}" - user = UserFactory.create(username=username) + # create 2 regular users, 2 creators, 2 moderators, 2 curators, and 1 superuser + for i in range(9): + kwargs = {} + if i in [0, 1]: + kwargs["username"] = f"user{i}" + elif i in [2, 3]: + kwargs["username"] = f"creator{i}" + elif i in [4, 5]: + kwargs["username"] = f"moderator{i}" + elif i in [6, 7]: + kwargs["username"] = f"curator{i}" + elif i == 8: + kwargs["username"] = "superuser" + kwargs["is_superuser"] = True + kwargs["is_staff"] = True + user = UserFactory.create(**kwargs) user.set_password("secret") - if i == 5: - user.is_superuser = True user.save() - setattr(cls, username, user) + setattr(cls, user.username, user) # create oauth application cls.application = Application.objects.create( name="Test Application", @@ -49,13 +60,21 @@ def setUpTestData(cls) -> None: user=user, client_type=Application.CLIENT_PUBLIC, authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, - client_id=f"client_id_{username}", - client_secret="client_secret", # noqa: S106 + client_id=f"client_id_{user.username}", + client_secret="client_secret", skip_authorization=True, ) user.auth = cls.get_access_token(user) user.save() - cls.client.logout() + cls.client.logout() + + # create groups and add users + creators = Group.objects.create(name=settings.BMA_CREATOR_GROUP_NAME) + creators.user_set.add(cls.creator2, cls.creator3) + moderators = Group.objects.create(name=settings.BMA_MODERATOR_GROUP_NAME) + moderators.user_set.add(cls.moderator4, cls.moderator5) + curators = Group.objects.create(name=settings.BMA_CURATOR_GROUP_NAME) + curators.user_set.add(cls.curator6, cls.curator7) @classmethod def get_access_token(cls, user) -> str: # noqa: ANN001 @@ -90,7 +109,7 @@ def get_access_token(cls, user) -> str: # noqa: ANN001 # the rest doesn't require login cls.client.logout() - # get the code to get the access token + # get the access token response = cls.client.post( "/o/token/", { @@ -108,6 +127,7 @@ def get_access_token(cls, user) -> str: # noqa: ANN001 def file_upload( # noqa: PLR0913 self, *, + owner: str = "creator2", filepath: str = settings.BASE_DIR / "static_src/images/logo_wide_black_500_RGB.png", title: str = "some title", file_license: str = "CC_ZERO_1_0", @@ -133,7 +153,7 @@ def file_upload( # noqa: PLR0913 "f": f, "metadata": json.dumps(metadata), }, - headers={"authorization": self.user1.auth}, + headers={"authorization": getattr(self, owner).auth}, ) assert response.status_code == expect_status_code if expect_status_code == 422: