Skip to content

Commit

Permalink
Merge pull request #3869 from kobotoolbox/3864-permission-sorting-issue
Browse files Browse the repository at this point in the history
Permission assignments bug fix
  • Loading branch information
bufke authored Jun 14, 2022
2 parents c8c3413 + 1bbd225 commit edbe6e3
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 41 deletions.
68 changes: 27 additions & 41 deletions kpi/serializers/v2/asset_permission_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass
from typing import Optional

from django.db import transaction
from django.contrib.auth.models import Permission, User
from django.urls import Resolver404
from django.utils.translation import gettext as t
Expand Down Expand Up @@ -326,6 +327,7 @@ class PermissionAssignment:
permission_codename: str
partial_permissions_json: Optional[str] = None

@transaction.atomic
def create(self, validated_data):
asset = self.context['asset']
user_pk_to_obj_cache = dict()
Expand Down Expand Up @@ -468,10 +470,10 @@ def validate(self, attrs):
Validate users and permissions, and convert them from API URLs into
model instances, using a minimal number of database queries
"""
# A dictionary for looking up usernames by API user URLs
url_to_username = dict()
# …for looking up codenames by API permission URLs
url_to_codename = dict()
# A dictionary for looking up API user URLs by username
username_to_url = dict()
# …for looking up by API permission URLs by codename
codename_to_url = dict()

assignable_permissions = self.context[
'asset'
Expand All @@ -497,9 +499,9 @@ def validate(self, attrs):
codename = self._get_arg_from_url('codename', perm_url)
if codename not in assignable_permissions:
raise serializers.ValidationError(INVALID_PERMISSION_ERROR)
url_to_codename[perm_url] = codename
codename_to_url[codename] = perm_url
username = self._get_arg_from_url('username', user_url)
url_to_username[user_url] = username
username_to_url[username] = user_url
for partial_assignment in assignment.get('partial_permissions', []):
partial_codename = self._get_arg_from_url(
'codename', partial_assignment['url']
Expand All @@ -511,42 +513,28 @@ def validate(self, attrs):
raise serializers.ValidationError(
INVALID_PARTIAL_PERMISSION_ERROR
)
url_to_codename[partial_assignment['url']] = partial_codename
codename_to_url[partial_codename] = partial_assignment['url']

# Create a dictionary of API user URLs to `User` objects
urls_sorted_by_username = [
url
for url, username in sorted(
url_to_username.items(), key=lambda item: item[1]
)
]
url_to_user = dict(
zip(
urls_sorted_by_username,
User.objects.only('pk', 'username')
.filter(username__in=url_to_username.values())
.order_by('username'),
)
)
if len(url_to_user) != len(url_to_username):
url_to_user = dict()
for user in User.objects.only('pk', 'username').filter(
username__in=username_to_url.keys()
):
url = username_to_url[user.username]
url_to_user[url] = user
if len(url_to_user) != len(username_to_url):
raise serializers.ValidationError(INVALID_USER_ERROR)

# Create a dictionary of API permission URLs to `Permission` objects
urls_sorted_by_codename = [
url
for url, codename in sorted(
url_to_codename.items(), key=lambda item: item[1]
)
]
url_to_permission = dict(
zip(
urls_sorted_by_codename,
Permission.objects.filter(codename__in=assignable_permissions)
.filter(codename__in=url_to_codename.values())
.order_by('codename'),
)
)
if len(url_to_permission) != len(url_to_codename):
url_to_permission = dict()
for permission in (
Permission.objects.filter(codename__in=assignable_permissions)
.filter(codename__in=codename_to_url.keys())
.order_by('codename')
):
url = codename_to_url[permission.codename]
url_to_permission[url] = permission
if len(url_to_permission) != len(codename_to_url):
# This should never happen since all codenames were found within
# `assignable_permissions`
raise RuntimeError(
Expand All @@ -569,11 +557,9 @@ def validate(self, attrs):
list
)
for partial_assignment in assignment['partial_permissions']:
# Convert partial permission URLs to codenames only; it's
# unnecessary to instantiate objects for them
partial_codename = url_to_codename[
partial_codename = url_to_permission[
partial_assignment['url']
]
].codename
assignment_with_objects['partial_permissions'][
partial_codename
] = partial_assignment['filters']
Expand Down
70 changes: 70 additions & 0 deletions kpi/tests/api/v2/test_api_asset_permission_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,3 +589,73 @@ def test_partial_permission_grants_implied_view_asset(self):
# `someuser` should have received the implied `view_asset`
# permission
assert self.someuser.has_perm(PERM_VIEW_ASSET, self.asset)

def test_no_assignments_saved_on_error(self):

# Call `get_anonymous_user()` to create AnonymousUser if it does not exist
get_anonymous_user()

# Ensure someuser and anotheruser do not have 'view_submissions' on `self.asset`
self.assertFalse(self.asset.has_perm(self.someuser, PERM_VIEW_SUBMISSIONS))
self.assertFalse(self.asset.has_perm(self.anotheruser, PERM_VIEW_SUBMISSIONS))

# Allow someuser and anotheruser to view submissions
good_assignments = [
{
'user': 'someuser',
'permission': PERM_VIEW_SUBMISSIONS,
},
{
'user': 'anotheruser',
'permission': PERM_VIEW_SUBMISSIONS,
}
]

assignments = self.translate_usernames_and_codenames_to_urls(
good_assignments
)
bulk_endpoint = reverse(
self._get_endpoint('asset-permission-assignment-bulk-assignments'),
kwargs={'parent_lookup_asset': self.asset.uid}
)
response = self.client.post(bulk_endpoint, assignments, format='json')

# Everything worked as expected, someuser and anotheruser got 'view_submissions'
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(self.asset.has_perm(self.someuser, PERM_VIEW_SUBMISSIONS))
self.assertTrue(self.asset.has_perm(self.anotheruser, PERM_VIEW_SUBMISSIONS))

# but do not have respectively 'delete_submissions' and 'change_submissions'
self.assertFalse(self.asset.has_perm(self.someuser, PERM_DELETE_SUBMISSIONS))
self.assertFalse(self.asset.has_perm(self.anotheruser, PERM_CHANGE_SUBMISSIONS))

bad_assignments = [
{
'user': 'AnonymousUser',
'permission': PERM_ADD_SUBMISSIONS, # should return a 400
},
{
'user': 'someuser',
'permission': PERM_DELETE_SUBMISSIONS,
},
{
'user': 'anotheruser',
'permission': PERM_CHANGE_SUBMISSIONS,
}
]
assignments = self.translate_usernames_and_codenames_to_urls(
bad_assignments
)

bulk_endpoint = reverse(
self._get_endpoint('asset-permission-assignment-bulk-assignments'),
kwargs={'parent_lookup_asset': self.asset.uid}
)
response = self.client.post(bulk_endpoint, assignments, format='json')
# Could not assign 'add_submissions' to anonymous user.
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

# Ensure that someuser and anotheruser did not get any other permissions
# than the one they already had, i.e.: 'view_submissions'.
self.assertFalse(self.asset.has_perm(self.someuser, PERM_DELETE_SUBMISSIONS))
self.assertFalse(self.asset.has_perm(self.anotheruser, PERM_CHANGE_SUBMISSIONS))
48 changes: 48 additions & 0 deletions kpi/tests/test_sorting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# coding: utf-8
from django.test import TestCase
from django.contrib.auth.models import User

from kpi.utils.object_permission import get_anonymous_user


class SortingTestCase(TestCase):

def test_different_sort_between_python_and_db(self):

# Ensure that `AnonymousUser` is created to include it in the list below
get_anonymous_user()

User.objects.bulk_create([
User(first_name='A', last_name='User', username='a_user'),
User(first_name='Alexander', last_name='Mtembenuzeni', username='alex_Mtemb'),
User(first_name='Another', last_name='User', username='anotheruser'),
])

users = list(
User.objects.filter(username__istartswith='a')
.values_list('username', flat=True)
.order_by('username')
)

# The database (PostgreSQL, as of Jun, 14, 2022) seems to be case
# insensitive and treats `_` after any letters.
# Python is case sensitive and treats `_` before any letters.
expected_database = [
'alex_Mtemb',
'AnonymousUser',
'anotheruser',
'a_user',
]

expected_python = [
'AnonymousUser',
'a_user',
'alex_Mtemb',
'anotheruser',
]

self.assertEqual(users, expected_database)
self.assertEqual(sorted(users), expected_python)
# Obviously if the first two assertions are True, the one below should
# be false. No matter what, let's be paranoid and test it anyway.
self.assertNotEqual(users, sorted(users))

0 comments on commit edbe6e3

Please sign in to comment.