diff --git a/release-notes.md b/release-notes.md index fe3527c..aa8ca55 100644 --- a/release-notes.md +++ b/release-notes.md @@ -4,6 +4,8 @@ SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL SPDX-License-Identifier: GPL-3.0-or-later --> +- added support for Api-keys through a new configuration file `API_KEYS_FILE_PATH` + v 24.1.1 ======== diff --git a/sources/orthanc_auth_service/app.py b/sources/orthanc_auth_service/app.py index 2d399fa..4038b88 100644 --- a/sources/orthanc_auth_service/app.py +++ b/sources/orthanc_auth_service/app.py @@ -15,11 +15,23 @@ from shares.models import * from shares.orthanc_token_service_factory import create_token_service_from_secrets from shares.keycloak import create_keycloak_from_secrets +from shares.roles_configuration import RolesConfiguration, create_roles_configuration_from_file +from shares.api_keys import create_api_keys_from_file logging.basicConfig(level=logging.DEBUG) token_service = create_token_service_from_secrets() -keycloak = create_keycloak_from_secrets() + +permissions_file_path = os.environ.get("PERMISSIONS_FILE_PATH", "/orthanc_auth_service/permissions.json") +api_keys_file_path = os.environ.get("API_KEYS_FILE_PATH", "/orthanc_auth_service/api-keys.json") + +roles_configuration = create_roles_configuration_from_file(permissions_file_path) +keycloak = create_keycloak_from_secrets(roles_configuration) + +api_keys = None +if roles_configuration and api_keys_file_path: + api_keys = create_api_keys_from_file(api_keys_file_path=api_keys_file_path, + roles_configuration=roles_configuration) app = FastAPI() # check if the service requires basic auth (by checking of some USERS have been defined) @@ -167,8 +179,13 @@ def get_user_profile(user_profile_request: UserProfileRequest): if keycloak is None: logging.warning("Keycloak is not configured, all users are considered anonymous") return anonymous_profile + elif user_profile_request.token_key is not None: - response = keycloak.get_user_profile_from_token(user_profile_request.token_value) + if user_profile_request.token_key == "api-key" and api_keys is not None: + response = api_keys.get_user_profile_from_api_key(api_key=user_profile_request.token_value) + else: + response = keycloak.get_user_profile_from_token(user_profile_request.token_value) + else: return anonymous_profile diff --git a/sources/orthanc_auth_service/demo-api-keys.json b/sources/orthanc_auth_service/demo-api-keys.json new file mode 100644 index 0000000..865bfcb --- /dev/null +++ b/sources/orthanc_auth_service/demo-api-keys.json @@ -0,0 +1,6 @@ +// "SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL " +// SPDX-License-Identifier: CC0-1.0 +{ + "123456" : "admin-role", + "abcde-fghb": "doctor-role" +} \ No newline at end of file diff --git a/sources/orthanc_auth_service/shares/api_keys.py b/sources/orthanc_auth_service/shares/api_keys.py new file mode 100644 index 0000000..f90772e --- /dev/null +++ b/sources/orthanc_auth_service/shares/api_keys.py @@ -0,0 +1,58 @@ +# SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import os +import logging +import requests +import jwt +import jsonc +from typing import Dict, Any, List, Tuple +from .models import * +from .utils.utils import get_secret_or_die, is_secret_defined +from .roles_configuration import RolesConfiguration + +class ApiKeys: + + def __init__(self, role_per_key: Dict[str, str], roles_configuration: RolesConfiguration): + self.role_per_key = role_per_key + self.roles_configuration = roles_configuration + + def get_role_from_api_key(self, api_key: str) -> List[str]: + if api_key in self.role_per_key: + return self.role_per_key[api_key] + else: + logging.warning(f"No role found for api-key") + return None + + + def get_user_profile_from_api_key(self, api_key: str) -> UserProfileResponse: + response = UserProfileResponse( + name="API KEY", + permissions=[], + validity=60, + authorized_labels=[]) + + role = self.get_role_from_api_key(api_key=api_key) + + response.permissions, response.authorized_labels = self.roles_configuration.get_role_configuration([role]) + + return response + +def create_api_keys_from_file(api_keys_file_path: str, roles_configuration: RolesConfiguration): + try: + with open(api_keys_file_path) as f: + role_per_key = jsonc.load(f) + + for key, role in role_per_key.items(): + if role not in roles_configuration.get_roles(): + logging.error(f"Unknown role in Api-keys file: {role}") + exit(-1) + + return ApiKeys(role_per_key=role_per_key, roles_configuration=roles_configuration) + + except Exception as ex: + logging.exception(ex) + logging.error(f"Unable to get api-keys from configuration file ({api_keys_file_path}), exiting...") + exit(-1) + diff --git a/sources/orthanc_auth_service/shares/keycloak.py b/sources/orthanc_auth_service/shares/keycloak.py index 14b9de4..527fb09 100644 --- a/sources/orthanc_auth_service/shares/keycloak.py +++ b/sources/orthanc_auth_service/shares/keycloak.py @@ -9,14 +9,15 @@ import jsonc from typing import Dict, Any, List, Tuple from .models import * +from .roles_configuration import RolesConfiguration from .utils.utils import get_secret_or_die, is_secret_defined class Keycloak: - def __init__(self, public_key, configured_roles: Dict[str, Any]): + def __init__(self, public_key, roles_configuration: RolesConfiguration): self.public_key = public_key - self.configured_roles = configured_roles + self.roles_configuration = roles_configuration def decode_token(self, jwt_token: str) -> Dict[str, Any]: return jwt.decode(jwt=jwt_token, key=self.public_key, audience="account", algorithms=["RS256"]) @@ -95,40 +96,11 @@ def get_user_profile_from_token(self, jwt_token: str) -> UserProfileResponse: roles = self.get_roles_from_decoded_token(decoded_token=decoded_token) - response.permissions, response.authorized_labels = self.get_role_configuration(roles) + response.permissions, response.authorized_labels = self.roles_configuration.get_role_configuration(roles) return response - def get_role_configuration(self, roles: List[str]) -> Tuple[List[UserPermissions], List[str], List[str]]: - permissions = [] - authorized_labels = [] - configured_user_roles = [] - - for r in roles: - if r in self.configured_roles: - configured_user_roles.append(r) - - # complain if there are 2 roles for the same user ??? How should we combine the authorized and forbidden labels in this case ??? - if len(configured_user_roles) > 1: - raise ValueError("Unable to handle multiple roles for a single user") - - role = configured_user_roles[0] - # search for it in the configured roles - configured_role = self.configured_roles.get(role) - # if it has been configured: - if configured_role is None: - raise ValueError(f"Role not found in configuration: {role}") - - for item in configured_role.get('permissions'): - # (if not already there) - if UserPermissions(item) not in permissions: - permissions.append(UserPermissions(item)) - - if configured_role.get("authorized_labels"): - authorized_labels = configured_role.get("authorized_labels") - - return permissions, authorized_labels def _get_keycloak_public_key(keycloak_uri: str) -> str: @@ -146,27 +118,9 @@ def _get_keycloak_public_key(keycloak_uri: str) -> str: return ''.join([begin_public_key, public_key, end_public_key]).encode() -def _get_config_from_file(file_path: str): - with open(file_path) as f: - data = jsonc.load(f) - - roles = data.get('roles') - for key, role_def in roles.items(): - if not role_def.get("authorized_labels"): - msg = f'No "authorized_labels" defined for role "{key}". You should, e.g, include "authorized_labels" = ["*"] if you want to authorize all labels.")' - logging.error(msg) - raise ValueError(msg) - if not role_def.get("permissions"): - msg = f'No "permissions" defined for role "{key}". You should, e.g, include "permissions" = ["all"] if you want to authorize all permissions.")' - logging.error(msg) - raise ValueError(msg) - - return roles - - -def create_keycloak_from_secrets(): +def create_keycloak_from_secrets(roles_configuration: RolesConfiguration): handle_users_with_keycloak = os.environ.get("ENABLE_KEYCLOAK", "false") == "true" if not handle_users_with_keycloak: @@ -175,7 +129,6 @@ def create_keycloak_from_secrets(): logging.warning("ENABLE_KEYCLOAK is set, will handle users with Keycloak") keycloak_uri = os.environ.get("KEYCLOAK_URI", "http://keycloak:8080/realms/orthanc/") - permissions_file_path = os.environ.get("PERMISSIONS_FILE_PATH", "/orthanc_auth_service/permissions.json") try: public_key = _get_keycloak_public_key(keycloak_uri) @@ -186,13 +139,6 @@ def create_keycloak_from_secrets(): logging.error(f"Unable to reach keycloak (be patient, Keycloak may need more than 1 min to start), exiting...") exit(-1) - try: - configured_roles = _get_config_from_file(permissions_file_path) - logging.info(f"Got the roles and permissions from configuration file") - except Exception as ex: - logging.exception(ex) - logging.error(f"Unable to get roles and permissions from configuration file ({permissions_file_path}), exiting...") - exit(-1) - return Keycloak(public_key=public_key, configured_roles=configured_roles) + return Keycloak(public_key=public_key, roles_configuration=roles_configuration) diff --git a/sources/orthanc_auth_service/shares/roles_configuration.py b/sources/orthanc_auth_service/shares/roles_configuration.py new file mode 100644 index 0000000..741bda2 --- /dev/null +++ b/sources/orthanc_auth_service/shares/roles_configuration.py @@ -0,0 +1,76 @@ +import logging +import jsonc +import os +from .models import * +from typing import Dict, Any, List, Tuple + + +def _get_config_from_file(file_path: str): + with open(file_path) as f: + data = jsonc.load(f) + + roles = data.get('roles') + + for key, role_def in roles.items(): + if not role_def.get("authorized_labels"): + msg = f'No "authorized_labels" defined for role "{key}". You should, e.g, include "authorized_labels" = ["*"] if you want to authorize all labels.")' + logging.error(msg) + raise ValueError(msg) + + if not role_def.get("permissions"): + msg = f'No "permissions" defined for role "{key}". You should, e.g, include "permissions" = ["all"] if you want to authorize all permissions.")' + logging.error(msg) + raise ValueError(msg) + + return roles + + +class RolesConfiguration: + + def __init__(self, roles: Dict[str, Any]): + self.configured_roles = roles + + def get_roles(self) -> List[str]: + return self.configured_roles.keys() + + def get_role_configuration(self, roles: List[str]) -> Tuple[List[UserPermissions], List[str], List[str]]: + permissions = [] + authorized_labels = [] + configured_user_roles = [] + + for r in roles: + if r in self.configured_roles: + configured_user_roles.append(r) + + # complain if there are 2 roles for the same user ??? How should we combine the authorized and forbidden labels in this case ??? + if len(configured_user_roles) > 1: + raise ValueError("Unable to handle multiple roles for a single user") + + role = configured_user_roles[0] + # search for it in the configured roles + configured_role = self.configured_roles.get(role) + # if it has been configured: + if configured_role is None: + raise ValueError(f"Role not found in configuration: {role}") + + for item in configured_role.get('permissions'): + # (if not already there) + if UserPermissions(item) not in permissions: + permissions.append(UserPermissions(item)) + + if configured_role.get("authorized_labels"): + authorized_labels = configured_role.get("authorized_labels") + + return permissions, authorized_labels + +def create_roles_configuration_from_file(permissions_file_path: str): + try: + roles = _get_config_from_file(permissions_file_path) + logging.info(f"Got the roles and permissions from configuration file") + + except Exception as ex: + logging.exception(ex) + logging.error(f"Unable to get roles and permissions from configuration file ({permissions_file_path}), exiting...") + exit(-1) + + return RolesConfiguration(roles) \ No newline at end of file