From 51fb223e643f0856aeefacf79bada602b976ea02 Mon Sep 17 00:00:00 2001 From: Rob Savoye Date: Sun, 24 Mar 2024 08:09:18 -0600 Subject: [PATCH] fix: Start adding some API tests --- tm_admin/access.py | 66 ++++++++++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/tm_admin/access.py b/tm_admin/access.py index a17f1ac..cc9fbfb 100755 --- a/tm_admin/access.py +++ b/tm_admin/access.py @@ -29,8 +29,8 @@ from codetiming import Timer from tm_admin.yamlfile import YamlFile import asyncio -from rbac import RBAC -from enum import IntEnum +from rbac import RBAC, RBACConfigurationError, RBACAuthorizationError +from enum import IntEnum, StrEnum # This is only used for debugging output during development import json @@ -43,9 +43,17 @@ class Operation(IntEnum): READ = 1 CREATE = 2 - MODIFY = 3 + UPDATE = 3 DELETE = 4 +class Roles(IntEnum): + READ_ONLY = 1 + ORGANIZATION_ADMIN = 4 + PROJECT_MANAGER = 5 + ASSOCIATE_MANAGER = 6 + VALIDATOR = 7 + MAPPER = 8 + class Access(object): def __init__(self, yamlfile: str = None, @@ -57,6 +65,7 @@ def __init__(self, self.permissions = dict() self.domains = dict() self.roles = dict() + self.subjects = dict() # self.subjects = dict() # Convert to a dict to make it easy to query @@ -74,12 +83,14 @@ def __init__(self, if category == "domains": for entry in values: self.domains[entry] = self.rbac.create_domain(entry) + log.debug(f"Creating domain for {entry}") continue if category == "permissions": for roles in values: [[k, v]] = roles.items() self.permissions[k] = dict() self.roles[k] = self.rbac.create_role(k) + log.debug(f"Creating role for {k}") # print(f"\t{k}, {v}") if type(v) == list: for i in v: @@ -92,6 +103,7 @@ def __init__(self, elif k1 == 'tables': # print(f"TABLES: {v1}") self.permissions[k]['tables'] = v1 + log.debug(f"Setting permissions for {k} = {v1}") else: # This is an access permission # print(f"STR: {k} = {i}") @@ -101,39 +113,46 @@ def __init__(self, # print(json.dumps(self.permissions, indent=4)) # print("------------------") for k, v in self.permissions.items(): + print(k) if 'children' in v: # print(f"CHILD: {k} = {v['children']}") self.rbac.create_role(k, children=v['children']) elif 'tables' in v: - # print(f"TABLE: {k1} = {v['tables']}") + print(f"TABLE: {k} = {v['tables']}") for table in v['tables']: + log.debug(f"Adding permission '{v['access']}' for '{k.upper()}' on '{table}'") self.roles[k].add_permission(self.perms[v['access']], self.domains[table]) - for role, data in self.roles.items(): - # [[ k, v]] = - subject = self.rbac.create_subject(role) - subject.authorize(self.roles[role]) + # for role, data in self.roles.items(): + for role in self.roles: + self.subjects[role] = self.rbac.create_subject(role) + self.subjects[role].authorize(self.roles[role]) self.rbac.lock() async def check(self, domain: str, - access: str, - userrole: Userrole = None, - teamrole: Teamrole = None, + role: Roles = Roles.MAPPER, + op: Operation = Operation.READ, ): """ Check a role in a domain for access permissions. Args: domain (str): The domain to check for the role - access (str): The access permissions - userrole (Userrole): A user role - teamrole (Teamrole): A team role +] role (Roles): The role + op (Operation): The access permissions """ - return self.rbac.go(self.roles[role.lower()], + xxx = None + try: + xxx = self.rbac.go(self.subjects[role.name.lower()], self.domains[domain.lower()], - self.perms[access.lower()]) + self.perms[op.name.lower()]) + except RBACAuthorizationError as e : + log.error(f"{e}") + print(f"\tARGS: {domain}, {role.name}, {op.name}") + + return xxx async def main(): """This main function lets this class be run standalone by a bash script.""" @@ -157,12 +176,15 @@ async def main(): ) acl = Access(args.config) - role = Userrole(Userrole.PROJECT_MANAGER) - op = Operation(Operation.READ) - # await acl.check('projects', userrole=op) - - # rights = await roles.getRights(Userrole.PROJECT_MANAGER) - # print(rights) + await acl.check('projects', Roles.MAPPER, Operation.READ) + await acl.check('users', Roles.MAPPER, Operation.READ) + await acl.check('tasks', Roles.MAPPER, Operation.READ) + await acl.check('messages', Roles.MAPPER, Operation.READ) + await acl.check('campaigns', Roles.MAPPER, Operation.READ) + await acl.check('campaigns', Roles.MAPPER, Operation.CREATE) + + await acl.check('projects', Roles.VALIDATOR, Operation.UPDATE) + # await acl.check('tasks', Roles.VALIDATOR, Operation.UPDATE) if __name__ == "__main__": """This is just a hook so this file can be run standalone during development."""