Skip to content

Commit

Permalink
added api-keys based on a map file
Browse files Browse the repository at this point in the history
  • Loading branch information
amazy committed Feb 12, 2024
1 parent 7a19fe0 commit 28798d6
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 62 deletions.
2 changes: 2 additions & 0 deletions release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL <[email protected]>
SPDX-License-Identifier: GPL-3.0-or-later
-->

- added support for Api-keys through a new configuration file `API_KEYS_FILE_PATH`

v 24.1.1
========

Expand Down
21 changes: 19 additions & 2 deletions sources/orthanc_auth_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,23 @@
from shares.models import *
from shares.orthanc_token_service_factory import create_token_service_from_secrets
from shares.keycloak import create_keycloak_from_secrets
from shares.roles_configuration import RolesConfiguration, create_roles_configuration_from_file
from shares.api_keys import create_api_keys_from_file

logging.basicConfig(level=logging.DEBUG)

token_service = create_token_service_from_secrets()
keycloak = create_keycloak_from_secrets()

permissions_file_path = os.environ.get("PERMISSIONS_FILE_PATH", "/orthanc_auth_service/permissions.json")
api_keys_file_path = os.environ.get("API_KEYS_FILE_PATH", "/orthanc_auth_service/api-keys.json")

roles_configuration = create_roles_configuration_from_file(permissions_file_path)
keycloak = create_keycloak_from_secrets(roles_configuration)

api_keys = None
if roles_configuration and api_keys_file_path:
api_keys = create_api_keys_from_file(api_keys_file_path=api_keys_file_path,
roles_configuration=roles_configuration)
app = FastAPI()

# check if the service requires basic auth (by checking of some USERS have been defined)
Expand Down Expand Up @@ -167,8 +179,13 @@ def get_user_profile(user_profile_request: UserProfileRequest):
if keycloak is None:
logging.warning("Keycloak is not configured, all users are considered anonymous")
return anonymous_profile

elif user_profile_request.token_key is not None:
response = keycloak.get_user_profile_from_token(user_profile_request.token_value)
if user_profile_request.token_key == "api-key" and api_keys is not None:
response = api_keys.get_user_profile_from_api_key(api_key=user_profile_request.token_value)
else:
response = keycloak.get_user_profile_from_token(user_profile_request.token_value)

else:
return anonymous_profile

Expand Down
6 changes: 6 additions & 0 deletions sources/orthanc_auth_service/demo-api-keys.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// "SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL <[email protected]>"
// SPDX-License-Identifier: CC0-1.0
{
"123456" : "admin-role",
"abcde-fghb": "doctor-role"
}
58 changes: 58 additions & 0 deletions sources/orthanc_auth_service/shares/api_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL <[email protected]>
#
# SPDX-License-Identifier: GPL-3.0-or-later

import os
import logging
import requests
import jwt
import jsonc
from typing import Dict, Any, List, Tuple
from .models import *
from .utils.utils import get_secret_or_die, is_secret_defined
from .roles_configuration import RolesConfiguration

class ApiKeys:

def __init__(self, role_per_key: Dict[str, str], roles_configuration: RolesConfiguration):
self.role_per_key = role_per_key
self.roles_configuration = roles_configuration

def get_role_from_api_key(self, api_key: str) -> List[str]:
if api_key in self.role_per_key:
return self.role_per_key[api_key]
else:
logging.warning(f"No role found for api-key")
return None


def get_user_profile_from_api_key(self, api_key: str) -> UserProfileResponse:
response = UserProfileResponse(
name="API KEY",
permissions=[],
validity=60,
authorized_labels=[])

role = self.get_role_from_api_key(api_key=api_key)

response.permissions, response.authorized_labels = self.roles_configuration.get_role_configuration([role])

return response

def create_api_keys_from_file(api_keys_file_path: str, roles_configuration: RolesConfiguration):
try:
with open(api_keys_file_path) as f:
role_per_key = jsonc.load(f)

for key, role in role_per_key.items():
if role not in roles_configuration.get_roles():
logging.error(f"Unknown role in Api-keys file: {role}")
exit(-1)

return ApiKeys(role_per_key=role_per_key, roles_configuration=roles_configuration)

except Exception as ex:
logging.exception(ex)
logging.error(f"Unable to get api-keys from configuration file ({api_keys_file_path}), exiting...")
exit(-1)

66 changes: 6 additions & 60 deletions sources/orthanc_auth_service/shares/keycloak.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
import jsonc
from typing import Dict, Any, List, Tuple
from .models import *
from .roles_configuration import RolesConfiguration
from .utils.utils import get_secret_or_die, is_secret_defined


class Keycloak:

def __init__(self, public_key, configured_roles: Dict[str, Any]):
def __init__(self, public_key, roles_configuration: RolesConfiguration):
self.public_key = public_key
self.configured_roles = configured_roles
self.roles_configuration = roles_configuration

def decode_token(self, jwt_token: str) -> Dict[str, Any]:
return jwt.decode(jwt=jwt_token, key=self.public_key, audience="account", algorithms=["RS256"])
Expand Down Expand Up @@ -95,40 +96,11 @@ def get_user_profile_from_token(self, jwt_token: str) -> UserProfileResponse:

roles = self.get_roles_from_decoded_token(decoded_token=decoded_token)

response.permissions, response.authorized_labels = self.get_role_configuration(roles)
response.permissions, response.authorized_labels = self.roles_configuration.get_role_configuration(roles)

return response


def get_role_configuration(self, roles: List[str]) -> Tuple[List[UserPermissions], List[str], List[str]]:
permissions = []
authorized_labels = []
configured_user_roles = []

for r in roles:
if r in self.configured_roles:
configured_user_roles.append(r)

# complain if there are 2 roles for the same user ??? How should we combine the authorized and forbidden labels in this case ???
if len(configured_user_roles) > 1:
raise ValueError("Unable to handle multiple roles for a single user")

role = configured_user_roles[0]
# search for it in the configured roles
configured_role = self.configured_roles.get(role)
# if it has been configured:
if configured_role is None:
raise ValueError(f"Role not found in configuration: {role}")

for item in configured_role.get('permissions'):
# (if not already there)
if UserPermissions(item) not in permissions:
permissions.append(UserPermissions(item))

if configured_role.get("authorized_labels"):
authorized_labels = configured_role.get("authorized_labels")

return permissions, authorized_labels


def _get_keycloak_public_key(keycloak_uri: str) -> str:
Expand All @@ -146,27 +118,9 @@ def _get_keycloak_public_key(keycloak_uri: str) -> str:
return ''.join([begin_public_key, public_key, end_public_key]).encode()


def _get_config_from_file(file_path: str):
with open(file_path) as f:
data = jsonc.load(f)

roles = data.get('roles')

for key, role_def in roles.items():
if not role_def.get("authorized_labels"):
msg = f'No "authorized_labels" defined for role "{key}". You should, e.g, include "authorized_labels" = ["*"] if you want to authorize all labels.")'
logging.error(msg)
raise ValueError(msg)

if not role_def.get("permissions"):
msg = f'No "permissions" defined for role "{key}". You should, e.g, include "permissions" = ["all"] if you want to authorize all permissions.")'
logging.error(msg)
raise ValueError(msg)

return roles


def create_keycloak_from_secrets():
def create_keycloak_from_secrets(roles_configuration: RolesConfiguration):
handle_users_with_keycloak = os.environ.get("ENABLE_KEYCLOAK", "false") == "true"

if not handle_users_with_keycloak:
Expand All @@ -175,7 +129,6 @@ def create_keycloak_from_secrets():

logging.warning("ENABLE_KEYCLOAK is set, will handle users with Keycloak")
keycloak_uri = os.environ.get("KEYCLOAK_URI", "http://keycloak:8080/realms/orthanc/")
permissions_file_path = os.environ.get("PERMISSIONS_FILE_PATH", "/orthanc_auth_service/permissions.json")

try:
public_key = _get_keycloak_public_key(keycloak_uri)
Expand All @@ -186,13 +139,6 @@ def create_keycloak_from_secrets():
logging.error(f"Unable to reach keycloak (be patient, Keycloak may need more than 1 min to start), exiting...")
exit(-1)

try:
configured_roles = _get_config_from_file(permissions_file_path)
logging.info(f"Got the roles and permissions from configuration file")

except Exception as ex:
logging.exception(ex)
logging.error(f"Unable to get roles and permissions from configuration file ({permissions_file_path}), exiting...")
exit(-1)

return Keycloak(public_key=public_key, configured_roles=configured_roles)
return Keycloak(public_key=public_key, roles_configuration=roles_configuration)
76 changes: 76 additions & 0 deletions sources/orthanc_auth_service/shares/roles_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logging
import jsonc
import os
from .models import *
from typing import Dict, Any, List, Tuple


def _get_config_from_file(file_path: str):
with open(file_path) as f:
data = jsonc.load(f)

roles = data.get('roles')

for key, role_def in roles.items():
if not role_def.get("authorized_labels"):
msg = f'No "authorized_labels" defined for role "{key}". You should, e.g, include "authorized_labels" = ["*"] if you want to authorize all labels.")'
logging.error(msg)
raise ValueError(msg)

if not role_def.get("permissions"):
msg = f'No "permissions" defined for role "{key}". You should, e.g, include "permissions" = ["all"] if you want to authorize all permissions.")'
logging.error(msg)
raise ValueError(msg)

return roles


class RolesConfiguration:

def __init__(self, roles: Dict[str, Any]):
self.configured_roles = roles

def get_roles(self) -> List[str]:
return self.configured_roles.keys()

def get_role_configuration(self, roles: List[str]) -> Tuple[List[UserPermissions], List[str], List[str]]:
permissions = []
authorized_labels = []
configured_user_roles = []

for r in roles:
if r in self.configured_roles:
configured_user_roles.append(r)

# complain if there are 2 roles for the same user ??? How should we combine the authorized and forbidden labels in this case ???
if len(configured_user_roles) > 1:
raise ValueError("Unable to handle multiple roles for a single user")

role = configured_user_roles[0]
# search for it in the configured roles
configured_role = self.configured_roles.get(role)
# if it has been configured:
if configured_role is None:
raise ValueError(f"Role not found in configuration: {role}")

for item in configured_role.get('permissions'):
# (if not already there)
if UserPermissions(item) not in permissions:
permissions.append(UserPermissions(item))

if configured_role.get("authorized_labels"):
authorized_labels = configured_role.get("authorized_labels")

return permissions, authorized_labels

def create_roles_configuration_from_file(permissions_file_path: str):
try:
roles = _get_config_from_file(permissions_file_path)
logging.info(f"Got the roles and permissions from configuration file")

except Exception as ex:
logging.exception(ex)
logging.error(f"Unable to get roles and permissions from configuration file ({permissions_file_path}), exiting...")
exit(-1)

return RolesConfiguration(roles)

0 comments on commit 28798d6

Please sign in to comment.