Skip to content

Commit

Permalink
RBAC full update
Browse files Browse the repository at this point in the history
	modified:   .gitignore
	new file:   .vscode/settings.json
	modified:   setup.py
	modified:   trustauthx/authlite.py
  • Loading branch information
moonlightnexus committed Mar 26, 2024
1 parent c3c19ca commit 06f9d73
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__pycache__/
*.py[cod]
*$py.class
*.ipynb

# C extensions
*.so
Expand Down
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.analysis.autoImportCompletions": true
}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name='trustauthx',
version='0.6.0',
version='0.6.2',
description='Official connector SDK for TrustAuthx',
long_description=long_description,
long_description_content_type='text/markdown', # This is important!
Expand Down
301 changes: 288 additions & 13 deletions trustauthx/authlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,275 @@
from jose import JWTError, jwt
from jose.constants import ALGORITHMS
import json

# authx/authlite.py

class AuthLiteClient:
import sqlite3
from dataclasses import dataclass, asdict
from typing import List, Dict

@dataclass
class Permission:
name: str
value: str

@dataclass
class Role:
org_id: str
rol_id: str
name: str
permissions: List[Permission]

class _EdgeDBRoleQuery:
def __init__(self, roles, in_memory=True):
self.in_memory = in_memory
if self.in_memory:
self.roles = {role_id: permissions for role in roles for role_id, permissions in role.items()}
else:
self.conn = sqlite3.connect(':memory:') # replace ':memory:' with your database path
self.cursor = self.conn.cursor()
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS roles (
role_id TEXT PRIMARY KEY,
permissions TEXT
)
""")
for role in roles:
for role_id, permissions in role.items():
self.cursor.execute("INSERT INTO roles VALUES (?, ?)", (role_id, permissions))
self.conn.commit()

def query(self, role_id=None, permission_key=None):
if self.in_memory:
if role_id and permission_key:
return self.roles.get(role_id, {}).get(permission_key, None)
elif role_id:
return self.roles.get(role_id, None)
elif permission_key:
return {role_id: permissions[permission_key] for role_id, permissions in self.roles.items() if permission_key in permissions}
else:
return self.roles
else:
if role_id and permission_key:
self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,))
permissions = self.cursor.fetchone()
if permissions:
return permissions[0].get(permission_key, None)
elif role_id:
self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,))
return self.cursor.fetchone()
elif permission_key:
self.cursor.execute("SELECT * FROM roles")
return {role_id: permissions[permission_key] for role_id, permissions in self.cursor.fetchall() if permission_key in permissions}
else:
self.cursor.execute("SELECT * FROM roles")
return self.cursor.fetchall()

def validate(self, role_id, permission_key, permission_val):
if self.in_memory:
return self.roles.get(role_id, {}).get(permission_key, None) == permission_val
else:
self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,))
permissions = self.cursor.fetchone()
if permissions:
return permissions[0].get(permission_key, None) == permission_val

def count_roles(self):
if self.in_memory:
return len(self.roles)
else:
self.cursor.execute("SELECT COUNT(*) FROM roles")
return self.cursor.fetchone()[0]

class _Roles(_EdgeDBRoleQuery):
def __init__(self, roles, org_id, api_key, signed_key, secret_key, API_BASE_URL, InMemory=True):
self.org_id = org_id
self.api_key = api_key
self._secret_key = secret_key
self.signed_key = signed_key
self.API_BASE_URL = API_BASE_URL
super().__init__(roles, in_memory=InMemory)
print(self.roles)

def get_all_roles(self):
"""[
{
"org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88",
"rol_id": "rol_gCD_ebc6f7715bb14554",
"name": "string",
"permissions": [
{
"user": "administration"
}
]
},
{
"org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88",
"rol_id": "rol_Ahy_f51d73ff656545e5",
"name": "string",
"permissions": []
},
{
"org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88",
"rol_id": "rol_rce_474ae9e59b3d49ce",
"name": "string",
"permissions": [
{
"user": "administration"
},
{
"viewer": "administration"
},
{
"maintainer": "administration"
}
]
}
]"""
url = f'{self.API_BASE_URL}/rbac/role'
headers = {'accept': 'application/json'}
params = {
'org_id': f'{self.org_id}',
'api_key': f'{self.api_key}',
'signed_key': f'{self._secret_key}'
}
response = requests.get(url, headers=headers, params=params)
roles = [Role(**role_data) for role_data in response.json()]
return roles

def add_role(self, name, **Permission):
"""{
"org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88",
"rol_id": "rol_rce_474ae9e59b3d49ce",
"name": "string",
"permissions": [
{
"user": "administration"
},
{
"viewer": "administration"
},
{
"maintainer": "administration"
}
]
}"""
url = f'{self.API_BASE_URL}/rbac/role'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
params = {
'org_id': f'{self.org_id}',
'api_key': f'{self.api_key}',
'signed_key': f'{self._secret_key}'
}
permissions = [{k: v} for k, v in Permission.items()]
data = {
"org_id": f'{self.org_id}',
"name": name,
"permissions": permissions
}
response = requests.post(url, headers=headers, params=params, data=json.dumps(data))
return response.json()

def delete_role(self, rol_id):

"""{
"org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88",
"rol_id": "rol_YHV_78ae9006bcaa4c77",
"name": "string",
"permissions": [
{
"user": "administration"
},
{
"viewer": "administration"
},
{
"maintainer": "administration"
}
]
}"""
url = f'{self.API_BASE_URL}/rbac/role'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
params = {
'org_id': f'{self.org_id}',
'api_key': f'{self.api_key}',
'signed_key': f'{self._secret_key}'
}
data = {
"org_id": f'{self.org_id}',
"rol_id": rol_id
}
response = requests.delete(url, headers=headers, params=params, data=json.dumps(data))
return response.json()

def add_permission(self, rol_id, **Permission):
"""{
"org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88",
"rol_id": "rol_rce_474ae9e59b3d49ce",
"permissions": [
{
"any": "view" ##only return added content
}
]
}"""
url = f'{self.API_BASE_URL}/rbac/permission'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
params = {
'org_id': f'{self.org_id}',
'api_key': f'{self.api_key}',
'signed_key': f'{self._secret_key}'
}
permissions = [{k: v} for k, v in Permission.items()]
data = {
"org_id": f'{self.org_id}',
"rol_id": rol_id,
"permissions": permissions
}
response = requests.post(url, headers=headers, params=params, data=json.dumps(data))
return response.json()

def delete_permission(self, rol_id, **Permission):
"""{
"org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88",
"rol_id": "rol_rce_474ae9e59b3d49ce",
"permissions": [
{
"user": "administration"
},
{
"viewer": "administration"
},
{
"maintainer": "administration"
}
]
}""" #return full
url = f'{self.API_BASE_URL}/rbac/permission'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
params = {
'org_id': f'{self.org_id}',
'api_key': f'{self.api_key}',
'signed_key': f'{self._secret_key}'
}
permissions = [{k: v} for k, v in Permission.items()]
data = {
"org_id": f'{self.org_id}',
"rol_id": rol_id,
"permissions": permissions
}
response = requests.delete(url, headers=headers, params=params, data=json.dumps(data))
return response.json()

class AuthLiteClient():

"""
AuthLiteClient is a Python client for the TrustAuthX authentication service.
Expand Down Expand Up @@ -63,7 +328,7 @@ class TokenCheck:
refresh:str
state:bool

def __init__(self, api_key, secret_key, org_id=None):
def __init__(self, api_key, secret_key, org_id=None, API_BASE_URL="https://api.trustauthx.com", in_memory=True):
"""
Initializes an AuthLiteClient instance.
Expand All @@ -81,6 +346,11 @@ def __init__(self, api_key, secret_key, org_id=None):
self.api_key = api_key
self.org_id = org_id
self.signed_key = self.jwt_encode(key=self.secret_key, data={"api_key":self.api_key})
self.API_BASE_URL = API_BASE_URL
self.Roles: _Roles = _Roles(roles=self._set_edge_roles(), org_id=self.org_id,
api_key=self.api_key, signed_key=self.signed_key,
secret_key=self.secret_key, API_BASE_URL=self.API_BASE_URL,
InMemory=in_memory)

def generate_url(self) -> str:
"""
Expand Down Expand Up @@ -115,7 +385,7 @@ def generate_edit_user_url(self, access_token, url) -> str:
'signed_key': self.signed_key,
'url':url
}
url = "https://api.trustauthx.com/api/user/me/settings/"
url = f"{self.API_BASE_URL}/api/user/me/settings/"
req = requests.Request('GET', url, params=params, headers=headers).prepare()
return req.url

Expand All @@ -132,7 +402,7 @@ def re_auth(self, code):
Raises:
HTTPError: If the request fails with an HTTP error status code.
"""
url = "https://api.trustauthx.com/api/user/me/widget/re-auth/token"
url = f"{self.API_BASE_URL}/api/user/me/widget/re-auth/token"
params = {
"code": code,
'api_key': self.api_key,
Expand Down Expand Up @@ -167,7 +437,7 @@ def get_user(self, token) -> dict:
HTTPError: If the request fails with an HTTP error status code.
"""
# Validate the given authentication token
url = 'https://api.trustauthx.com/api/user/me/auth/data'
url = f'{self.API_BASE_URL}/api/user/me/auth/data'
headers = {'accept': 'application/json'}
params = {
'UserToken': token,
Expand Down Expand Up @@ -203,7 +473,7 @@ def get_user_data(self, AccessToken) -> dict:
"""
# Validate the given authentication token
"""returns a dict containing 'access_token', 'refresh_token', 'img', 'sub'"""
url = 'https://api.trustauthx.com/api/user/me/data'
url = f'{self.API_BASE_URL}/api/user/me/data'
headers = {'accept': 'application/json'}
params = {
'AccessToken': AccessToken,
Expand Down Expand Up @@ -234,7 +504,7 @@ def get_access_token_from_refresh_token(self, refresh_token):
HTTPError: If the request fails with an HTTP error status code.
"""
# Store the given authentication token
url = 'https://api.trustauthx.com/api/user/me/access/token/'
url = f'{self.API_BASE_URL}/api/user/me/access/token/'
headers = {'accept': 'application/json'}
params = {
'RefreshToken': refresh_token,
Expand All @@ -260,7 +530,7 @@ def validate_access_token(self, access_token) -> bool:
bool: True if the access token is valid, False otherwise.
"""
# Store the given authentication token
url = 'https://api.trustauthx.com/api/user/me/auth/validate/token'
url = f'{self.API_BASE_URL}/api/user/me/auth/validate/token'
headers = {'accept': 'application/json'}
params = {
'AccessToken': access_token,
Expand All @@ -286,7 +556,7 @@ def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_
HTTPError: If the request fails with an HTTP error status code.
AttributeError: If neither AccessToken nor RefreshToken is provided.
"""
url = 'https://api.trustauthx.com/api/user/me/token/'
url = f'{self.API_BASE_URL}/api/user/me/token/'
headers = {'accept': 'application/json'}
if not AccessToken and not RefreshToken:raise AttributeError("must provide either AccessToken or RefreshToken")
tt=True if AccessToken else False
Expand Down Expand Up @@ -333,4 +603,9 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck:
d.refresh = refresh_token
return d
except:
raise HTTPError('both tokens are invalid login again')
raise HTTPError('both tokens are invalid login again')

def _set_edge_roles(self) -> list:
return []


0 comments on commit 06f9d73

Please sign in to comment.