Skip to content

Commit

Permalink
Issue #33: Implemented the AC & wrote the tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Francois-Werbrouck committed Jan 27, 2025
1 parent ff9954b commit 4c39cd1
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 65 deletions.
153 changes: 121 additions & 32 deletions datastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
and the user container in the blob storage.
"""

from enum import Enum
import datastore.db.queries.user as user
import datastore.db.queries.group as group
import datastore.db.queries.picture as picture
Expand Down Expand Up @@ -62,6 +63,11 @@ class Container(BaseModel):
folders: Optional[Dict[UUID, Folder]] = {}
path: Optional[str] = None

class Permission(Enum):
READ = 1
WRITE = 2
OWNER = 3


class ContainerController:
def __init__(self, container_model: Container):
Expand All @@ -75,6 +81,49 @@ def __init__(self, container_model: Container):
# self.model: Optional[Container] = None
# self.container_client: Optional[ContainerClient] = None

def __verify_user_can_manage(self, cursor: Cursor, user_id: UUID) ->bool:
"""
This function verifies if the user can manage the container.
Only the Owner of the container and the users with the Dev/Admin role can manage the container.
"""
if user.is_a_user_admin(cursor, user_id):
return True
elif container_db.has_user_access_to_container(cursor, user_id, self.id):
return container_db.get_user_permission_to_container(cursor, user_id, self.id) == Permission.OWNER

def __verify_user_can_write(self, cursor: Cursor, user_id: UUID) ->bool:
"""
This function verifies if the user can write in the container.
"""

if user.is_a_user_admin(cursor, user_id):
return True
elif container_db.has_user_access_to_container(cursor, user_id, self.id):
perm = container_db.get_user_permission_to_container(cursor, user_id, self.id)
if perm >= Permission.WRITE:
return True
if container_db.has_user_group_access_to_container(cursor,user_id,self.id):
perm = container_db.get_group_permission_to_container(cursor, user_id, self.id)
if perm >= Permission.WRITE:
return True
return False

def __verify_user_can_read(self, cursor: Cursor, user_id: UUID) ->bool:
"""
This function verifies if the user can read in the container.
"""
if user.is_a_user_admin(cursor, user_id):
return True
elif container_db.has_user_access_to_container(cursor, user_id, self.id):
perm = container_db.get_user_permission_to_container(cursor, user_id, self.id)
if perm >= Permission.READ:
return True
if container_db.has_user_group_access_to_container(cursor,user_id,self.id):
perm = container_db.get_group_permission_to_container(cursor, user_id, self.id)
if perm >= Permission.READ:
return True
return False

# This could be achieved by fetching the storage and performing Azure API calls
def fetch_all_folders_metadata(self, cursor: Cursor):
"""
Expand Down Expand Up @@ -166,44 +215,57 @@ async def get_container_client(self, connection_str: str, credentials: str):
def get_id(self):
return self.id

def add_user(self, cursor: Cursor, user_id: UUID, performed_by: UUID):
def add_user(self, cursor: Cursor, user_id: UUID, performed_by: UUID, permission: Permission = Permission.READ):
"""
This function adds a user to the container rights.
Parameters:
- cursor: The cursor object to interact with the database.
- user_id (str): The UUID of the user.
"""
if not container_db.has_user_access_to_container(cursor, user_id, self.id):
if not container_db.has_user_group_access_to_container(
cursor, user_id, self.id
):
# Link the container to the user in the database
container_db.add_user_to_container(
cursor, user_id, self.id, performed_by
)
# Make sure the user has the permission to add a user to the container
if not self.__verify_user_can_manage(cursor, performed_by):
raise ValueError("The user performing the action is not the owner of the container, they can't add other users to the container")
# Link the container to the user in the database
container_db.add_user_to_container(
cursor, user_id, self.id, performed_by, permission.value
)
self.model.user_ids.append(user_id)

def remove_user(self, cursor: Cursor, user_id: UUID):
def remove_user(self, cursor: Cursor, user_id: UUID,performed_by: UUID):
"""
This function removes a user from the container rights.
Parameters:
- cursor: The cursor object to interact with the database.
- user_id (str): The UUID of the user.
"""
# Check if the user to remove has access to the container
if container_db.has_user_access_to_container(cursor, user_id, self.id):
# Make sure the user has the permission to add a user to the container
if not self.__verify_user_can_manage(cursor, performed_by):
raise ValueError("The user performing this action is not the owner of the container, they can't remove other users to the container")
# Unlink the container to the user in the database
container_db.delete_user_from_container(cursor, user_id, self.id)
if user_id in self.model.user_ids:
self.model.user_ids.remove(user_id)
if user_id in self.model.user_ids:
self.model.user_ids.remove(user_id)

def add_group(self, cursor: Cursor, group_id: UUID, performed_by: UUID):
def add_group(self, cursor: Cursor, group_id: UUID, performed_by: UUID, permission: Permission = Permission.READ):
"""
Function to add a group to the container rights.
"""
# Make sure the user has the permission to add a user to the container
if not self.__verify_user_can_manage(cursor, performed_by):
raise ValueError("The user performing this action is not the owner of the container, they can't add groups to the container")
# Link the container to the user in the database
container_db.add_group_to_container(cursor, group_id, self.id, performed_by)
container_db.add_group_to_container(cursor, group_id, self.id, performed_by, permission.value)
self.model.group_ids.append(group_id)

def remove_group(self, cursor: Cursor, group_id: UUID):
def remove_group(self, cursor: Cursor, group_id: UUID, performed_by: UUID):
# Make sure the user has the permission to add a user to the container
if not self.__verify_user_can_manage(cursor, performed_by):
raise ValueError("The user is not the owner of the container, they can't add other users to the container")

# Unlink the container to the user in the database
container_db.delete_group_from_container(cursor, group_id, self.id)
if group_id in self.model.group_ids:
Expand Down Expand Up @@ -263,6 +325,9 @@ async def create_folder(
raise user.UserNotFoundError(
f"User not found based on the given id: {performed_by}"
)
# TODO : Check the user container permission if he has at least write permission
if not self.__verify_user_can_write(cursor, performed_by):
raise ValueError("The user does not have the permission to create a folder in the container: " + str(self.id))
# Check if folder exists in the database in the container & parent folder
if folder_name is not None:
# This is also a trigger in the db to create the folder
Expand Down Expand Up @@ -340,6 +405,9 @@ async def upload_pictures(
raise ContainerCreationError(
"Error: container client does not exist or not set, please set the container client first"
)
# TODO : Check the user container permission if he has at least write permission
if not self.__verify_user_can_write(cursor, user_id):
raise ValueError("The user does not have the permission to upload pictures in the container: " + str(self.id))
# Create a folder if not provided
if folder_id is None:
folder_id = await self.create_folder(
Expand Down Expand Up @@ -429,7 +497,7 @@ async def get_folder_pictures(self, cursor: Cursor, folder_id: UUID, user_id: UU
f"Folder does not exist in the container: {folder_id}"
)
# Check if user has access to the container
if user_id not in self.model.user_ids:
if user_id not in self.model.user_ids and not user.is_a_user_admin(cursor, user_id) and not self.model.is_public:
raise UserNotOwnerError(
f"User can't access this Container, user uuid :{user_id}, Container id : {self.id}"
)
Expand Down Expand Up @@ -478,8 +546,9 @@ async def get_picture_blob(self, cursor: Cursor, picture_id: UUID, user_id: UUID
raise user.UserNotFoundError(
f"User not found based on the given id: {user_id}"
)
# Check if user has access to the container
# TODO
#TODO: Check if user has access to the container
if not self.__verify_user_can_read(cursor, user_id) and not self.model.is_public:
raise ValueError("The user does not have the permission to download pictures in the container: " + str(self.id))
# Check if picture exists
if not picture.is_a_picture_id(cursor, picture_id):
raise picture.PictureNotFoundError(
Expand Down Expand Up @@ -519,6 +588,8 @@ async def delete_folder_permanently(
f"Picture set not found based on the given id: {folder_id}"
)
# TODO : Check user is owner of the picture set
if not self.__verify_user_can_write(cursor, user_id):
raise ValueError("The user does not have the permission to delete the folder in the container: " + str(self.id))
# if picture.get_picture_set_owner_id(cursor, folder_id) != user_id:

# Delete the folder in the blob storage
Expand All @@ -540,6 +611,12 @@ async def delete_picture_permanently(self, cursor: Cursor, user_id: UUID, pictur
"""
This function deletes a picture from the blob storage and the database.
"""
if not user.is_a_user_id(cursor=cursor, user_id=user_id):
raise user.UserNotFoundError(
f"User not found based on the given id: {user_id}"
)
if not self.__verify_user_can_write(cursor, user_id):
raise ValueError("The user does not have the permission to delete the picture in the container: " + str(self.id))
# Get the picture set id
picture_set_id = picture.get_picture_picture_set_id(cursor, picture_id)

Expand Down Expand Up @@ -606,8 +683,14 @@ async def create_container(
def fetch_all_containers():
pass

class Role(Enum):
DEV = 1
ADMIN = 2
TEAM_LEADER = 3
INSPECTOR = 4

class User(ClientController):
def __init__(self, email: str, id: UUID = None, tier: str = "user"):
def __init__(self, email: str, id: UUID = None, tier: str = "user",role: Role = Role.INSPECTOR):
if id is None:
raise ValueError("The user id must be provided")
client_model = Client(
Expand All @@ -617,6 +700,7 @@ def __init__(self, email: str, id: UUID = None, tier: str = "user"):
storage_prefix=tier,
containers={},
)
self.role = role
super().__init__(client_model)

async def fetch_all_containers(
Expand Down Expand Up @@ -713,28 +797,33 @@ def fetch_all_containers(self, cursor: Cursor):
container_obj.fetch_all_data(cursor)
self.model.containers[container_obj] = container_obj.model

def add_user(self, cursor: Cursor, user_id: UUID, performed_by: UUID):
def add_user(self, cursor: Cursor, user_id: UUID, performed_by: UUID,permission: Permission=Permission.READ):
"""
This function adds a user to the group rights.
Parameters:
- cursor: The cursor object to interact with the database.
- user_id (str): The UUID of the user.
"""
# TODO: Check permission
#if not group.has_user_access_to_group(cursor, user_id, self.id):
# Link the group to the user in the database
group.add_user_to_group(cursor, user_id, self.model.id, performed_by)

def remove_user(self, cursor: Cursor, user_id: UUID):
if not group.is_user_group_creator(cursor, performed_by, self.model.id) or not user.is_a_user_admin(cursor, performed_by):
raise ValueError("The user is not the creator of the group therefore does not have the right to manage its users")
if permission == Permission.OWNER and user_id == performed_by:
# This should only be the case when it's the creator of the group which should be the owner
pass
elif permission == Permission.OWNER:
raise ValueError("The user does not have the right to assign the owner permission")
group.add_user_to_group(cursor, user_id, self.id, performed_by, permission)

def remove_user(self, cursor: Cursor, user_id: UUID,performed_by: UUID):
"""
This function removes a user from the group rights.
Parameters:
- cursor: The cursor object to interact with the database.
- user_id (str): The UUID of the user.
"""
# TODO : Check permission
if not group.is_user_group_creator(cursor, performed_by, self.model.id) or not user.is_a_user_admin(cursor, performed_by):
raise ValueError("The user is not the creator of the group therefore does not have the right to manage its users")
group.remove_user_from_group(cursor, user_id, self.id)


Expand All @@ -752,7 +841,7 @@ async def get_user(cursor: Cursor, email: str) -> User:
return user_obj


async def new_user(cursor: Cursor, email: str, connection_string, tier="user") -> User:
async def new_user(cursor: Cursor, email: str, connection_string, tier="user",role:Role = Role.INSPECTOR) -> User:
"""
Create a new user in the database and creates its personal container in the blob storage.
Expand All @@ -765,9 +854,9 @@ async def new_user(cursor: Cursor, email: str, connection_string, tier="user") -
# Register the user in the database
if user.is_user_registered(cursor, email):
raise UserAlreadyExistsError("User already exists")
user_uuid = user.register_user(cursor, email)
user_uuid = user.register_user(cursor, email, role.value)
# Create the user object
user_obj = User(email, user_uuid, tier)
user_obj = User(email = email, id=user_uuid, tier=tier,role=role)
# Create the user container in the blob storage
await user_obj.create_container(
cursor=cursor,
Expand Down Expand Up @@ -861,7 +950,7 @@ async def create_group(
user_id=user_id,
is_public=False,
)
group_obj.add_user(cursor, user_id, user_id)
group_obj.add_user(cursor, user_id, user_id,Permission.OWNER)
return group_obj
except Exception as e:
raise Exception("Datastore Unhandled Error " + str(e))
Expand Down Expand Up @@ -921,7 +1010,7 @@ async def create_container(

await container_obj.create_storage(connection_str=connection_str, credentials=None)
if add_user_to_storage:
container_obj.add_user(cursor, user_id, user_id)
container_obj.add_user(cursor, user_id, user_id,Permission.OWNER)
await container_obj.create_folder(
cursor=cursor, performed_by=user_id, folder_name="General", nb_pictures=0
)
Expand Down
Loading

0 comments on commit 4c39cd1

Please sign in to comment.