From 76864469d1150ca42ed311fa0028d461690b5d24 Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Thu, 14 Nov 2024 15:50:02 +0100 Subject: [PATCH 1/6] implementation of credential verification in freja_eid started --- src/eduid/webapp/common/session/namespaces.py | 4 +- .../webapp/freja_eid/callback_actions.py | 84 +++++++++++++++++++ src/eduid/webapp/freja_eid/callback_enums.py | 1 + src/eduid/webapp/freja_eid/schemas.py | 4 + src/eduid/webapp/freja_eid/views.py | 50 ++++++++++- 5 files changed, 140 insertions(+), 3 deletions(-) diff --git a/src/eduid/webapp/common/session/namespaces.py b/src/eduid/webapp/common/session/namespaces.py index 6e81a6e92..3f6c918c9 100644 --- a/src/eduid/webapp/common/session/namespaces.py +++ b/src/eduid/webapp/common/session/namespaces.py @@ -247,6 +247,8 @@ class BaseAuthnRequest(BaseModel, ABC): frontend_state: str | None = None # opaque data from frontend, returned in /status method: str | None = None # proofing method that frontend is invoking post_authn_action: AuthnAcsAction | EidasAcsAction | SvipeIDAction | BankIDAcsAction | FrejaEIDAction | None = None + # proofing_credential_id is the credential being person-proofed, when doing that + proofing_credential_id: ElementKey | None = None created_ts: datetime = Field(default_factory=utc_now) authn_instant: datetime | None = None status: str | None = None # populated by the SAML2 ACS/OIDC callback action @@ -258,8 +260,6 @@ class BaseAuthnRequest(BaseModel, ABC): class SP_AuthnRequest(BaseAuthnRequest): authn_id: AuthnRequestRef = Field(default_factory=lambda: AuthnRequestRef(uuid4_str())) credentials_used: list[ElementKey] = Field(default_factory=list) - # proofing_credential_id is the credential being person-proofed, when doing that - proofing_credential_id: ElementKey | None = None req_authn_ctx: list[str] = Field( default_factory=list ) # the authentication contexts requested for this authentication diff --git a/src/eduid/webapp/freja_eid/callback_actions.py b/src/eduid/webapp/freja_eid/callback_actions.py index 23604985d..b7772098f 100644 --- a/src/eduid/webapp/freja_eid/callback_actions.py +++ b/src/eduid/webapp/freja_eid/callback_actions.py @@ -1,7 +1,11 @@ from eduid.userdb import User +from eduid.userdb.credentials import FidoCredential from eduid.webapp.common.api.decorators import require_user +from eduid.webapp.common.api.messages import AuthnStatusMsg from eduid.webapp.common.authn.acs_registry import ACSArgs, ACSResult, acs_action +from eduid.webapp.common.authn.utils import check_reauthn from eduid.webapp.common.proofing.messages import ProofingMsg +from eduid.webapp.common.session.namespaces import RP_AuthnRequest from eduid.webapp.freja_eid.app import current_freja_eid_app as current_app from eduid.webapp.freja_eid.callback_enums import FrejaEIDAction from eduid.webapp.freja_eid.helpers import FrejaEIDDocumentUserInfo, FrejaEIDMsg @@ -42,3 +46,83 @@ def verify_identity_action(user: User, args: ACSArgs) -> ACSResult: return ACSResult(message=verify_result.error) return ACSResult(success=True, message=FrejaEIDMsg.identity_verify_success) + + +@acs_action(FrejaEIDAction.verify_credential) +@require_user +def verify_credential_action(user: User, args: ACSArgs) -> ACSResult: + """ + Use a Sweden Connect federation IdP assertion to person-proof a users' FIDO credential. + + :param args: ACS action arguments + :param user: Central db user + + :return: ACS action result + """ + # please type checking + if not args.proofing_method: + return ACSResult(message=FrejaEIDMsg.method_not_available) + + assert isinstance(args.authn_req, RP_AuthnRequest) + + credential = user.credentials.find(args.authn_req.proofing_credential_id) + if not isinstance(credential, FidoCredential): + current_app.logger.error(f"Credential {credential} is not a FidoCredential") + return ACSResult(message=FrejaEIDMsg.credential_not_found) + + # Check (again) if token was used to authenticate this session and that the auth is not stale. + _need_reauthn = check_reauthn( + frontend_action=args.authn_req.frontend_action, user=user, credential_requested=credential + ) + if _need_reauthn: + current_app.logger.error(f"User needs to authenticate: {_need_reauthn}") + return ACSResult(message=AuthnStatusMsg.must_authenticate) + + parsed = args.proofing_method.parse_session_info(args.session_info, args.backdoor) + if parsed.error: + return ACSResult(message=parsed.error) + + # please type checking + assert isinstance(parsed.info, FrejaEIDDocumentUserInfo) + + proofing = get_proofing_functions( + session_info=parsed.info, app_name=current_app.conf.app_name, config=current_app.conf, backdoor=args.backdoor + ) + + _identity = proofing.get_identity(user=user) + if not _identity or not _identity.is_verified: + # proof users' identity too in this process if the user didn't have a verified identity of this type already + verify_result = proofing.verify_identity(user=user) + if verify_result.error is not None: + return ACSResult(message=verify_result.error) + if verify_result.user: + # Get an updated user object + user = verify_result.user + # It is necessary to look up the credential again in order for changes to the instance to + # actually be saved to the database. Can't be references to old user objects credential. + credential = user.credentials.find(credential.key) + if not isinstance(credential, FidoCredential): + current_app.logger.error(f"Credential {credential} is not a FidoCredential") + return ACSResult(message=FrejaEIDMsg.credential_not_found) + + # Check that the users' verified identity matches the one that was asserted now + match_res = proofing.match_identity(user=user, proofing_method=args.proofing_method) + if match_res.error is not None: + return ACSResult(message=match_res.error) + + if not match_res.matched: + # Matching external mfa authentication with user identity failed, bail + current_app.stats.count(name=f"verify_credential_{args.proofing_method.method}_identity_not_matching") + return ACSResult(message=FrejaEIDMsg.identity_not_matching) + + # TODO: is this the correct way of doing this? + loa = "al2" + + verify_result = proofing.verify_credential(user=user, credential=credential, loa=loa) + if verify_result.error is not None: + return ACSResult(message=verify_result.error) + + current_app.stats.count(name="fido_token_verified") + current_app.stats.count(name=f"verify_credential_{args.proofing_method.method}_success") + + return ACSResult(success=True, message=FrejaEIDMsg.credential_verify_success) diff --git a/src/eduid/webapp/freja_eid/callback_enums.py b/src/eduid/webapp/freja_eid/callback_enums.py index 37745b44c..d65e10e55 100644 --- a/src/eduid/webapp/freja_eid/callback_enums.py +++ b/src/eduid/webapp/freja_eid/callback_enums.py @@ -6,3 +6,4 @@ @unique class FrejaEIDAction(StrEnum): verify_identity = "verify-identity-action" + verify_credential = "verify-credential-action" diff --git a/src/eduid/webapp/freja_eid/schemas.py b/src/eduid/webapp/freja_eid/schemas.py index d2932aa84..a0c717af4 100644 --- a/src/eduid/webapp/freja_eid/schemas.py +++ b/src/eduid/webapp/freja_eid/schemas.py @@ -30,6 +30,10 @@ class FrejaEIDCommonRequestSchema(EduidSchema, CSRFRequestMixin): frontend_state = fields.String(required=False) +class FrejaEIDVerifyCredentialRequestSchema(FrejaEIDCommonRequestSchema): + credential_id = fields.String(required=True) + + class FrejaEIDCommonResponseSchema(FluxStandardAction): class VerifyResponsePayload(EduidSchema, CSRFResponseMixin): location = fields.String(required=False) diff --git a/src/eduid/webapp/freja_eid/views.py b/src/eduid/webapp/freja_eid/views.py index f39f09ba5..871b08724 100644 --- a/src/eduid/webapp/freja_eid/views.py +++ b/src/eduid/webapp/freja_eid/views.py @@ -7,6 +7,8 @@ from eduid.common.config.base import FrontendAction from eduid.userdb import User +from eduid.userdb.credentials import FidoCredential +from eduid.userdb.element import ElementKey from eduid.webapp.common.api.decorators import MarshalWith, UnmarshalWith, require_user from eduid.webapp.common.api.errors import EduidErrorsContext, goto_errors_response from eduid.webapp.common.api.helpers import check_magic_cookie @@ -14,13 +16,18 @@ from eduid.webapp.common.api.schemas.authn_status import StatusRequestSchema, StatusResponseSchema from eduid.webapp.common.api.schemas.csrf import EmptyResponse from eduid.webapp.common.authn.acs_registry import ACSArgs, get_action +from eduid.webapp.common.authn.utils import check_reauthn from eduid.webapp.common.proofing.methods import get_proofing_method from eduid.webapp.common.session import session from eduid.webapp.common.session.namespaces import OIDCState, RP_AuthnRequest from eduid.webapp.freja_eid.app import current_freja_eid_app as current_app from eduid.webapp.freja_eid.callback_enums import FrejaEIDAction from eduid.webapp.freja_eid.helpers import FrejaEIDMsg -from eduid.webapp.freja_eid.schemas import FrejaEIDCommonRequestSchema, FrejaEIDCommonResponseSchema +from eduid.webapp.freja_eid.schemas import ( + FrejaEIDCommonRequestSchema, + FrejaEIDCommonResponseSchema, + FrejaEIDVerifyCredentialRequestSchema, +) __author__ = "lundberg" @@ -68,6 +75,45 @@ def verify_identity(user: User, method: str, frontend_action: str, frontend_stat return success_response(payload={"location": res.url}) +@eidas_views.route("/verify-credential", methods=["POST"]) +@UnmarshalWith(FrejaEIDVerifyCredentialRequestSchema) +@MarshalWith(FrejaEIDCommonResponseSchema) +@require_user +def verify_credential( + user: User, method: str, credential_id: ElementKey, frontend_action: str, frontend_state: str | None = None +) -> FluxData: + current_app.logger.debug(f"verify-credential called with credential_id: {credential_id}") + + _frontend_action = FrontendAction.VERIFY_CREDENTIAL + + if frontend_action != _frontend_action.value: + current_app.logger.error(f"Invalid frontend_action: {frontend_action}") + return error_response(message=FrejaEIDMsg.frontend_action_not_supported) + + # verify that the user has the credential and that it was used for login recently + credential = user.credentials.find(credential_id) + if credential is None or isinstance(credential, FidoCredential) is False: + current_app.logger.error(f"Can't find credential with id: {credential_id}") + return error_response(message=FrejaEIDMsg.credential_not_found) + + _need_reauthn = check_reauthn(frontend_action=_frontend_action, user=user, credential_requested=credential) + if _need_reauthn: + return _need_reauthn + + result = _authn( + FrejaEIDAction.verify_credential, + method=method, + frontend_action=_frontend_action.value, + frontend_state=frontend_state, + proofing_credential_id=credential_id, + ) + + if result.error: + return error_response(message=result.error) + + return success_response(payload={"location": result.url}) + + @dataclass class AuthnResult: authn_req: RP_AuthnRequest | None = None @@ -81,6 +127,7 @@ def _authn( method: str, frontend_action: str, frontend_state: str | None = None, + proofing_credential_id: ElementKey | None = None, ) -> AuthnResult: current_app.logger.debug(f"Requested method: {method}, frontend action: {frontend_action}") @@ -118,6 +165,7 @@ def _authn( frontend_action=_frontend_action, frontend_state=frontend_state, post_authn_action=action, + proofing_credential_id=proofing_credential_id, method=proofing_method.method, finish_url=authn_params.finish_url, ) From 13c6353d6c224587a2599bc1bce6d0183a30173a Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Fri, 6 Dec 2024 14:45:30 +0100 Subject: [PATCH 2/6] finish implementing credential verification with freja_eid --- src/eduid/common/config/base.py | 13 + src/eduid/userdb/credentials/external.py | 2 + src/eduid/userdb/logs/element.py | 42 +++ src/eduid/webapp/common/proofing/methods.py | 5 + .../webapp/freja_eid/callback_actions.py | 8 +- src/eduid/webapp/freja_eid/helpers.py | 19 +- src/eduid/webapp/freja_eid/proofing.py | 85 +++++- src/eduid/webapp/freja_eid/settings/common.py | 1 + src/eduid/webapp/freja_eid/tests/test_app.py | 287 +++++++++++++++--- src/eduid/webapp/freja_eid/views.py | 8 +- 10 files changed, 402 insertions(+), 68 deletions(-) diff --git a/src/eduid/common/config/base.py b/src/eduid/common/config/base.py index 0b186e750..8af2340a0 100644 --- a/src/eduid/common/config/base.py +++ b/src/eduid/common/config/base.py @@ -467,6 +467,17 @@ class ProofingConfigMixin(FrontendActionMixin): bankid_required_loa: list[str] = Field(default=["uncertified-loa3"]) bankid_idp: str | None = None + # freja eid + freja_eid_trust_framework: TrustFramework = TrustFramework.FREJA + freja_eid_required_loa: list[str] = Field(default=["freja-loa3"]) + freja_eid_required_registration_level: list[str] = Field(default=["PLUS"]) + freja_eid_registration_level_to_loa: dict[str, str | None] = Field( + default={ + "EXTENDED": None, + "PLUS": "freja-loa3", + } + ) + # identity proofing freja_proofing_version: str = Field(default="2023v1") foreign_eid_proofing_version: str = Field(default="2022v1") @@ -478,6 +489,8 @@ class ProofingConfigMixin(FrontendActionMixin): security_key_proofing_method: CredentialProofingMethod = Field(default=CredentialProofingMethod.SWAMID_AL3_MFA) security_key_proofing_version: str = Field(default="2023v2") security_key_foreign_eid_proofing_version: str = Field(default="2022v1") + security_key_freja_eid_proofing_version: str = Field(default="2024v1") + security_key_foreign_freja_eid_proofing_version: str = Field(default="2024v1") class EduIDBaseAppConfig(RootConfig, LoggingConfigMixin, StatsConfigMixin, RedisConfigMixin): diff --git a/src/eduid/userdb/credentials/external.py b/src/eduid/userdb/credentials/external.py index a3be65d8a..b49d96832 100644 --- a/src/eduid/userdb/credentials/external.py +++ b/src/eduid/userdb/credentials/external.py @@ -66,6 +66,8 @@ class BankIDCredential(ExternalCredential): # and mapping a level to an authnContextClassRef really ought to be dependent on configuration matching # the IdP:s expected values at a certain time. Such configuration is better to have in the SP than in # the database layer. +class FrejaCredential(ExternalCredential): + framework: Literal[TrustFramework.FREJA] = TrustFramework.FREJA level: str # a value like "loa3", "eidas_sub", ... diff --git a/src/eduid/userdb/logs/element.py b/src/eduid/userdb/logs/element.py index a946fce25..def522c6c 100644 --- a/src/eduid/userdb/logs/element.py +++ b/src/eduid/userdb/logs/element.py @@ -574,6 +574,48 @@ class MFATokenBankIDProofing(BankIDProofing): key_id: str +class MFATokenFrejaEIDProofing(FrejaEIDNINProofing): + """ + { + 'eduPersonPrincipalName': eppn, + 'created_ts': utc_now(), + 'created_by': 'application', + 'proofing_method': 'freja_eid', + 'proofing_version': '2024v1', + 'user_id': 'unique identifier for the user', + 'document_type': 'type of document used for identification', + 'document_number': 'document number', + 'nin': 'national_identity_number', + 'given_name': 'name', + 'surname': 'name', + 'key_id: 'Key id of token vetted', + } + """ + + # Data used to initialize the vetting process + key_id: str + + +class MFATokenFrejaEIDForeignProofing(FrejaEIDForeignProofing): + """ + { + 'eduPersonPrincipalName': eppn, + 'created_ts': utc_now(), + 'created_by': 'application', + 'proofing_method': 'freja_eid', + 'proofing_version': '2024v1', + 'user_id': 'unique identifier for the user', + 'document_type': 'type of document used for identification', + 'document_number': 'document number', + 'issuing_country': 'country of issuance', + 'key_id: 'Key id of token vetted', + } + """ + + # Data used to initialize the vetting process + key_id: str + + class NameUpdateProofing(NinNavetProofingLogElement): """ Used when a user request an update of their name from Navet. diff --git a/src/eduid/webapp/common/proofing/methods.py b/src/eduid/webapp/common/proofing/methods.py index e7d819688..4366870b7 100644 --- a/src/eduid/webapp/common/proofing/methods.py +++ b/src/eduid/webapp/common/proofing/methods.py @@ -128,6 +128,9 @@ def parse_session_info(self, session_info: SessionInfo, backdoor: bool) -> Sessi @dataclass(frozen=True) class ProofingMethodFrejaEID(ProofingMethod): + required_registration_level: list[str] + required_loa: list[str] + def parse_session_info(self, session_info: SessionInfo, backdoor: bool) -> SessionInfoParseResult: try: parsed_session_info = FrejaEIDDocumentUserInfo(**session_info) @@ -206,6 +209,8 @@ def get_proofing_method( method=method, framework=TrustFramework.FREJA, finish_url=authn_params.finish_url, + required_registration_level=config.freja_eid_required_registration_level, + required_loa=config.freja_eid_required_loa, ) logger.warning(f"Unknown proofing method {method}") diff --git a/src/eduid/webapp/freja_eid/callback_actions.py b/src/eduid/webapp/freja_eid/callback_actions.py index b7772098f..98d06b763 100644 --- a/src/eduid/webapp/freja_eid/callback_actions.py +++ b/src/eduid/webapp/freja_eid/callback_actions.py @@ -115,10 +115,12 @@ def verify_credential_action(user: User, args: ACSArgs) -> ACSResult: current_app.stats.count(name=f"verify_credential_{args.proofing_method.method}_identity_not_matching") return ACSResult(message=FrejaEIDMsg.identity_not_matching) - # TODO: is this the correct way of doing this? - loa = "al2" + current_loa = proofing.get_current_loa() + if current_loa.result is None: + current_app.logger.error(f"No LOA configured for registration level {parsed.info.registration_level}") + return ACSResult(message=FrejaEIDMsg.registration_level_not_satisfied) - verify_result = proofing.verify_credential(user=user, credential=credential, loa=loa) + verify_result = proofing.verify_credential(user=user, credential=credential, loa=current_loa.result) if verify_result.error is not None: return ACSResult(message=verify_result.error) diff --git a/src/eduid/webapp/freja_eid/helpers.py b/src/eduid/webapp/freja_eid/helpers.py index 125828051..bf380e3b6 100644 --- a/src/eduid/webapp/freja_eid/helpers.py +++ b/src/eduid/webapp/freja_eid/helpers.py @@ -21,16 +21,15 @@ class FrejaEIDMsg(TranslatableMsg): attempted operations on the back end. """ - # failed to create authn request authn_request_failed = "freja_eid.authn_request_failed" - # Unavailable vetting method requested - method_not_available = "freja_eid.method_not_available" - # Identity verification success - identity_verify_success = "freja_eid.identity_verify_success" - # Authorization error at Freja EID authorization_error = "freja_eid.authorization_fail" + credential_not_found = "freja_eid.credential_not_found" + credential_verification_not_allowed = "freja_eid.credential_verification_not_allowed" + credential_verify_success = "freja_eid.credential_verify_success" frontend_action_not_supported = "freja_eid.frontend_action_not_supported" - # registration level not satisfied + identity_not_matching = "freja_eid.identity_not_matching" + identity_verify_success = "freja_eid.identity_verify_success" + method_not_available = "freja_eid.method_not_available" registration_level_not_satisfied = "freja_eid.registration_level_not_satisfied" @@ -95,8 +94,8 @@ class FrejaEIDDocumentUserInfo(UserInfoBase): class FrejaEIDTokenResponse(BaseModel): access_token: str - expires_at: int - expires_in: int - id_token: str token_type: str + id_token: str + expires_at: int | None = Field(default=None) + expires_in: int | None = Field(default=None) userinfo: FrejaEIDDocumentUserInfo diff --git a/src/eduid/webapp/freja_eid/proofing.py b/src/eduid/webapp/freja_eid/proofing.py index f06ec68fe..3e99eaaf4 100644 --- a/src/eduid/webapp/freja_eid/proofing.py +++ b/src/eduid/webapp/freja_eid/proofing.py @@ -17,13 +17,21 @@ IdentityProofingMethod, IdentityType, ) -from eduid.userdb.logs.element import FrejaEIDForeignProofing, FrejaEIDNINProofing, NinProofingLogElement +from eduid.userdb.logs.element import ( + FrejaEIDForeignProofing, + FrejaEIDNINProofing, + MFATokenFrejaEIDForeignProofing, + MFATokenFrejaEIDProofing, + NinProofingLogElement, +) from eduid.userdb.proofing import NinProofingElement, ProofingUser from eduid.userdb.proofing.state import NinProofingState from eduid.webapp.common.api.helpers import set_user_names_from_foreign_id, verify_nin_for_user from eduid.webapp.common.api.messages import CommonMsg from eduid.webapp.common.proofing.base import ( + GenericResult, MatchResult, + MfaData, ProofingElementResult, ProofingFunctions, VerifyCredentialResult, @@ -49,6 +57,25 @@ def is_swedish_document(self) -> bool: return True return False + def get_current_loa(self) -> GenericResult[str | None]: + current_loa = self.config.freja_eid_registration_level_to_loa.get(self.session_info.registration_level) + if current_loa is None: + return GenericResult(error=FrejaEIDMsg.registration_level_not_satisfied) + return GenericResult(result=current_loa) + + def get_mfa_data(self) -> GenericResult[MfaData]: + current_loa = self.get_current_loa() + if current_loa.error: + return GenericResult(error=current_loa.error) + + return GenericResult( + result=MfaData( + issuer=self.session_info.iss, + authn_instant=datetime.fromtimestamp(self.session_info.iat).isoformat(), + authn_context=current_loa.result, + ) + ) + def get_identity(self, user: User) -> IdentityElement | None: if self.is_swedish_document(): return user.identities.nin @@ -163,6 +190,10 @@ def _verify_foreign_identity(self, user: User) -> VerifyUserResult: return VerifyUserResult(user=_user) def _can_replace_identity(self, proofing_user: ProofingUser) -> bool: + if self.is_swedish_document(): + # TODO: Implement support for NIN identites + return False + locked_identity = proofing_user.locked_identity.freja if locked_identity is None: return True @@ -184,6 +215,15 @@ def identity_proofing_element(self, user: User) -> ProofingElementResult: return self._nin_identity_proofing_element(user) return self._foreign_identity_proofing_element(user) + def credential_proofing_element(self, user: User, credential: Credential) -> ProofingElementResult: + if self.backdoor: + # TODO: implement backdoor support? + pass + + if self.is_swedish_document(): + return self._nin_credential_proofing_element(user, credential) + return self._foreign_credential_proofing_element(user, credential) + def _nin_identity_proofing_element(self, user: User) -> ProofingElementResult: _nin = self.session_info.personal_identity_number if not _nin: @@ -223,14 +263,47 @@ def _foreign_identity_proofing_element(self, user: User) -> ProofingElementResul ) return ProofingElementResult(data=data) - def match_identity(self, user: User, proofing_method: ProofingMethod) -> MatchResult: - raise NotImplementedError("No support for mfa") + def _nin_credential_proofing_element(self, user: User, credential: Credential) -> ProofingElementResult: + proofing_element_result = self._nin_identity_proofing_element(user) + assert proofing_element_result.data is not None # please mypy + data = MFATokenFrejaEIDProofing(**proofing_element_result.data.to_dict(), key_id=credential.key) + return ProofingElementResult(data=data) - def credential_proofing_element(self, user: User, credential: Credential) -> ProofingElementResult: - raise NotImplementedError("No support for credential proofing") + def _foreign_credential_proofing_element(self, user: User, credential: Credential) -> ProofingElementResult: + proofing_element_result = self._foreign_identity_proofing_element(user) + assert proofing_element_result.data is not None # please mypy + data = MFATokenFrejaEIDForeignProofing(**proofing_element_result.data.to_dict(), key_id=credential.key) + return ProofingElementResult(data=data) + + def match_identity(self, user: User, proofing_method: ProofingMethod) -> MatchResult: + if self.is_swedish_document(): + identity_type = IdentityType.NIN + asserted_unique_value = self.session_info.personal_identity_number + assert asserted_unique_value is not None # please mypy + else: + identity_type = IdentityType.FREJA + asserted_unique_value = self.session_info.user_id + + return self._match_identity_for_mfa( + user=user, + identity_type=identity_type, + asserted_unique_value=asserted_unique_value, + proofing_method=proofing_method, + ) def mark_credential_as_verified(self, credential: Credential, loa: str | None) -> VerifyCredentialResult: - raise NotImplementedError("No support for credential proofing") + if loa not in self.config.freja_eid_required_loa: + return VerifyCredentialResult(error=FrejaEIDMsg.registration_level_not_satisfied) + + credential.is_verified = True + credential.proofing_method = self.config.security_key_proofing_method + + if not self.is_swedish_document(): + credential.proofing_version = self.config.security_key_freja_eid_proofing_version + else: + credential.proofing_version = self.config.security_key_foreign_freja_eid_proofing_version + + return VerifyCredentialResult(credential=credential) def get_proofing_functions( diff --git a/src/eduid/webapp/freja_eid/settings/common.py b/src/eduid/webapp/freja_eid/settings/common.py index bd8bc3715..342e73c17 100644 --- a/src/eduid/webapp/freja_eid/settings/common.py +++ b/src/eduid/webapp/freja_eid/settings/common.py @@ -46,3 +46,4 @@ class FrejaEIDConfig( app_name: str = "freja_eid" freja_eid_client: FrejaEIDClientConfig + allow_credential_verification: bool = False diff --git a/src/eduid/webapp/freja_eid/tests/test_app.py b/src/eduid/webapp/freja_eid/tests/test_app.py index 6b59608a8..62e8a5381 100644 --- a/src/eduid/webapp/freja_eid/tests/test_app.py +++ b/src/eduid/webapp/freja_eid/tests/test_app.py @@ -10,6 +10,7 @@ from eduid.common.config.base import FrontendAction from eduid.common.misc.timeutil import utc_now +from eduid.userdb.element import ElementKey from eduid.userdb.identity import FrejaIdentity, FrejaRegistrationLevel, IdentityProofingMethod from eduid.userdb.testing import SetupConfig from eduid.webapp.common.api.messages import CommonMsg @@ -39,12 +40,6 @@ def setUp(self, config: SetupConfig | None = None) -> None: self.unverified_test_user = self.app.central_userdb.get_user_by_eppn("hubba-baar") self._user_setup() - self.default_frontend_data = { - "method": "freja_eid", - "frontend_action": "verifyIdentity", - "frontend_state": "test_state", - } - self.oidc_provider_config = { "response_types_supported": ["code"], "request_parameter_supported": True, @@ -132,7 +127,12 @@ def update_config(self, config: dict[str, Any]) -> dict[str, Any]: "force_authn": True, "finish_url": "https://dashboard.example.com/profile/ext-return/{app_name}/{authn_id}", }, + FrontendAction.VERIFY_CREDENTIAL.value: { + "force_authn": True, + "finish_url": "https://dashboard.example.com/profile/ext-return/{app_name}/{authn_id}", + }, }, + "allow_credential_verification": True, } ) return config @@ -145,6 +145,14 @@ def _user_setup(self) -> None: user.identities.remove(user.identities.freja.key) self.app.central_userdb.save(user) + @staticmethod + def default_frontend_data(frontend_action: str) -> dict[str, str]: + return { + "method": "freja_eid", + "frontend_action": frontend_action, + "frontend_state": "test_state", + } + @staticmethod def get_mock_userinfo( issuing_country: Country, @@ -322,7 +330,11 @@ def test_verify_identity_request(self) -> None: with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=self.test_user.eppn) + response = self._start_auth( + endpoint=endpoint, + data=self.default_frontend_data(frontend_action="verifyIdentity"), + eppn=self.test_user.eppn, + ) assert response.status_code == 200 self._check_success_response(response, type_="POST_FREJA_EID_VERIFY_IDENTITY_SUCCESS") assert self.get_response_payload(response)["location"].startswith("https://example.com/op/oidc/authorize") @@ -347,7 +359,9 @@ def test_verify_nin_identity(self, mock_request_user_sync: MagicMock, mock_refer with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) userinfo = self.get_mock_userinfo(issuing_country=country, registration_level=FrejaRegistrationLevel.PLUS) @@ -355,9 +369,11 @@ def test_verify_nin_identity(self, mock_request_user_sync: MagicMock, mock_refer assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_msg=FrejaEIDMsg.identity_verify_success, ) @@ -390,16 +406,20 @@ def test_verify_foreign_identity(self, mock_request_user_sync: MagicMock) -> Non with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) userinfo = self.get_mock_userinfo(issuing_country=country) response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_msg=FrejaEIDMsg.identity_verify_success, ) @@ -425,16 +445,20 @@ def test_verify_foreign_identity_no_identity_number(self, mock_request_user_sync with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) userinfo = self.get_mock_userinfo(issuing_country=country, personal_identity_number=None) response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_msg=FrejaEIDMsg.identity_verify_success, ) @@ -467,16 +491,20 @@ def test_verify_nin_identity_already_verified( with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) userinfo = self.get_mock_userinfo(issuing_country=country) response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_error=True, expect_msg=ProofingMsg.identity_already_verified, ) @@ -507,16 +535,20 @@ def test_verify_foreign_identity_already_verified(self, mock_request_user_sync: with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) userinfo = self.get_mock_userinfo(issuing_country=country) response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_error=True, expect_msg=ProofingMsg.identity_already_verified, ) @@ -549,15 +581,19 @@ def test_verify_foreign_identity_replace_locked_identity(self, mock_request_user with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_msg=FrejaEIDMsg.identity_verify_success, ) new_locked_identity = FrejaIdentity( @@ -597,16 +633,20 @@ def test_verify_foreign_identity_replace_locked_identity_fail(self, mock_request with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) userinfo = self.get_mock_userinfo(issuing_country=country) response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_error=True, expect_msg=CommonMsg.locked_identity_not_matching, ) @@ -644,16 +684,20 @@ def test_verify_foreign_identity_replace_locked_identity_fail_personal_id_number with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) userinfo = self.get_mock_userinfo(issuing_country=country) response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_error=True, expect_msg=CommonMsg.locked_identity_not_matching, ) @@ -675,16 +719,20 @@ def test_verify_foreign_identity_already_verified_nin( with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) userinfo = self.get_mock_userinfo(issuing_country=country) response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_msg=FrejaEIDMsg.identity_verify_success, ) user = self.app.central_userdb.get_user_by_eppn(eppn) @@ -709,7 +757,9 @@ def test_verify_identity_expired_document( with self.app.test_request_context(): endpoint = url_for("freja_eid.verify_identity") - start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + start_auth_response = self._start_auth( + endpoint=endpoint, data=self.default_frontend_data(frontend_action="verifyIdentity"), eppn=eppn + ) state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) yesterday = utc_now() - timedelta(days=1) userinfo = self.get_mock_userinfo(issuing_country=country, document_expires=yesterday) @@ -717,10 +767,153 @@ def test_verify_identity_expired_document( assert response.status_code == 302 self._verify_status( finish_url=response.headers["Location"], - frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), - frontend_state=self.default_frontend_data["frontend_state"], - method=self.default_frontend_data["method"], + frontend_action=FrontendAction( + self.default_frontend_data(frontend_action="verifyIdentity")["frontend_action"] + ), + frontend_state=self.default_frontend_data(frontend_action="verifyIdentity")["frontend_state"], + method=self.default_frontend_data(frontend_action="verifyIdentity")["method"], expect_error=True, expect_msg=ProofingMsg.session_info_not_valid, ) self._verify_user_parameters(eppn, identity_verified=False, num_proofings=0, num_mfa_tokens=0) + + @patch("eduid.webapp.common.api.helpers.get_reference_nin_from_navet_data") + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_credential_nin_identity( + self, mock_request_user_sync: MagicMock, mock_reference_nin: MagicMock + ) -> None: + mock_request_user_sync.side_effect = self.request_user_sync + mock_reference_nin.return_value = None + + eppn = self.unverified_test_user.eppn + country = countries.get("Sweden") + + self._verify_user_parameters(eppn, identity_present=False, num_mfa_tokens=0) + + credential = self.add_security_key_to_user(eppn, keyhandle="test_security_key_1", token_type="webauthn") + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_credential") + + self.set_authn_action( + eppn=eppn, + frontend_action=FrontendAction.VERIFY_CREDENTIAL, + credentials_used=[credential.key, ElementKey("user_password_cred_id")], + ) + + data = self.default_frontend_data(frontend_action="verifyCredential") + data["credential_id"] = credential.key + + start_auth_response = self._start_auth( + endpoint=endpoint, + data=data, + eppn=eppn, + ) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + + userinfo = self.get_mock_userinfo(issuing_country=country, registration_level=FrejaRegistrationLevel.PLUS) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(data["frontend_action"]), + frontend_state=data["frontend_state"], + method=data["method"], + expect_msg=FrejaEIDMsg.credential_verify_success, + ) + + self._verify_user_parameters(eppn, token_verified=True, num_proofings=2, num_mfa_tokens=1) + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_credential_foreign_identity(self, mock_request_user_sync: MagicMock) -> None: + mock_request_user_sync.side_effect = self.request_user_sync + + # Allow registration level extended to be used + self.app.conf.freja_eid_registration_level_to_loa = { + "EXTENDED": "freja-loa2", + "PLUS": "freja-loa3", + } + self.app.conf.freja_eid_required_loa = ["freja-loa2", "freja-loa3"] + + eppn = self.unverified_test_user.eppn + country = countries.get("Denmark") + + self._verify_user_parameters(eppn, identity_present=False, num_mfa_tokens=0) + + credential = self.add_security_key_to_user(eppn, keyhandle="test_security_key_1", token_type="webauthn") + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_credential") + + self.set_authn_action( + eppn=eppn, + frontend_action=FrontendAction.VERIFY_CREDENTIAL, + credentials_used=[credential.key, ElementKey("user_password_cred_id")], + ) + + data = self.default_frontend_data(frontend_action="verifyCredential") + data["credential_id"] = credential.key + + start_auth_response = self._start_auth( + endpoint=endpoint, + data=data, + eppn=eppn, + ) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + + userinfo = self.get_mock_userinfo(issuing_country=country, registration_level=FrejaRegistrationLevel.EXTENDED) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(data["frontend_action"]), + frontend_state=data["frontend_state"], + method=data["method"], + expect_msg=FrejaEIDMsg.credential_verify_success, + ) + + self._verify_user_parameters(eppn, token_verified=True, num_proofings=2, num_mfa_tokens=1) + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_credential_low_registration_level(self, mock_request_user_sync: MagicMock) -> None: + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.unverified_test_user.eppn + country = countries.get("Sweden") + + self._verify_user_parameters(eppn, num_mfa_tokens=0) + + credential = self.add_security_key_to_user(eppn, keyhandle="test_security_key_1", token_type="webauthn") + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_credential") + + self.set_authn_action( + eppn=eppn, + frontend_action=FrontendAction.VERIFY_CREDENTIAL, + credentials_used=[credential.key, ElementKey("user_password_cred_id")], + ) + + data = self.default_frontend_data(frontend_action="verifyCredential") + data["credential_id"] = credential.key + + start_auth_response = self._start_auth( + endpoint=endpoint, + data=data, + eppn=eppn, + ) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + + userinfo = self.get_mock_userinfo(issuing_country=country, registration_level=FrejaRegistrationLevel.EXTENDED) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(data["frontend_action"]), + frontend_state=data["frontend_state"], + method=data["method"], + expect_error=True, + expect_msg=FrejaEIDMsg.registration_level_not_satisfied, + ) + + self._verify_user_parameters(eppn, token_verified=False, num_proofings=0, num_mfa_tokens=1) diff --git a/src/eduid/webapp/freja_eid/views.py b/src/eduid/webapp/freja_eid/views.py index 871b08724..98ea611de 100644 --- a/src/eduid/webapp/freja_eid/views.py +++ b/src/eduid/webapp/freja_eid/views.py @@ -75,13 +75,17 @@ def verify_identity(user: User, method: str, frontend_action: str, frontend_stat return success_response(payload={"location": res.url}) -@eidas_views.route("/verify-credential", methods=["POST"]) +@freja_eid_views.route("/verify-credential", methods=["POST"]) @UnmarshalWith(FrejaEIDVerifyCredentialRequestSchema) @MarshalWith(FrejaEIDCommonResponseSchema) @require_user def verify_credential( user: User, method: str, credential_id: ElementKey, frontend_action: str, frontend_state: str | None = None ) -> FluxData: + if current_app.conf.allow_credential_verification is False: + current_app.logger.error("Credential verification is not allowed") + return error_response(message=FrejaEIDMsg.credential_verification_not_allowed) + current_app.logger.debug(f"verify-credential called with credential_id: {credential_id}") _frontend_action = FrontendAction.VERIFY_CREDENTIAL @@ -179,7 +183,7 @@ def _authn( @require_user def authn_callback(user: User) -> WerkzeugResponse: """ - This is the callback endpoint for the Svipe ID OIDC flow. + This is the callback endpoint for the Freja EID OIDC flow. """ current_app.logger.debug("authn_callback called") current_app.logger.debug(f"request.args: {request.args}") From ee6ac1e8dd369e3e77ecb366465602c1c26d54c5 Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Fri, 6 Dec 2024 14:49:04 +0100 Subject: [PATCH 3/6] make credential verification with eidas allowed/disallowed from settings --- src/eduid/webapp/eidas/settings/common.py | 1 + src/eduid/webapp/eidas/tests/test_app.py | 10 ++++++++++ src/eduid/webapp/eidas/views.py | 4 ++++ 3 files changed, 15 insertions(+) diff --git a/src/eduid/webapp/eidas/settings/common.py b/src/eduid/webapp/eidas/settings/common.py index abcec0bdb..a193a83cb 100644 --- a/src/eduid/webapp/eidas/settings/common.py +++ b/src/eduid/webapp/eidas/settings/common.py @@ -60,3 +60,4 @@ class EidasConfig( # magic cookie IdP is used for integration tests when magic cookie is set magic_cookie_idp: str | None = None magic_cookie_foreign_id_idp: str | None = None + allow_eidas_credential_verification: bool = False diff --git a/src/eduid/webapp/eidas/tests/test_app.py b/src/eduid/webapp/eidas/tests/test_app.py index 577d79a63..2457a4134 100644 --- a/src/eduid/webapp/eidas/tests/test_app.py +++ b/src/eduid/webapp/eidas/tests/test_app.py @@ -1055,6 +1055,8 @@ def test_foreign_eid_mfa_login_unverified_identity(self, mock_request_user_sync: def test_foreign_eid_webauthn_token_verify(self, mock_request_user_sync: MagicMock) -> None: mock_request_user_sync.side_effect = self.request_user_sync + self.app.conf.allow_eidas_credential_verification = True + eppn = self.test_user.eppn credential = self.add_security_key_to_user(eppn, "test", "webauthn") self.set_eidas_for_user(eppn=eppn, identity=self.test_user_eidas, verified=True) @@ -1078,6 +1080,8 @@ def test_foreign_eid_webauthn_token_verify(self, mock_request_user_sync: MagicMo ) def test_foreign_eid_mfa_token_verify_wrong_verified_identity(self) -> None: + self.app.conf.allow_eidas_credential_verification = True + eppn = self.test_user.eppn identity = self.test_user_other_eidas self.set_eidas_for_user(eppn=eppn, identity=identity, verified=True) @@ -1106,6 +1110,8 @@ def test_foreign_eid_mfa_token_verify_wrong_verified_identity(self) -> None: def test_foreign_eid_mfa_token_verify_no_verified_identity(self, mock_request_user_sync: MagicMock) -> None: mock_request_user_sync.side_effect = self.request_user_sync + self.app.conf.allow_eidas_credential_verification = True + eppn = self.test_unverified_user_eppn identity = self.test_user_eidas credential = self.add_security_key_to_user(eppn, "test", "webauthn") @@ -1130,6 +1136,8 @@ def test_foreign_eid_mfa_token_verify_no_verified_identity(self, mock_request_us ) def test_foreign_eid_mfa_token_verify_no_mfa_login(self) -> None: + self.app.conf.allow_eidas_credential_verification = True + eppn = self.test_user.eppn credential = self.add_security_key_to_user(eppn, "test", "webauthn") @@ -1149,6 +1157,8 @@ def test_foreign_eid_mfa_token_verify_no_mfa_login(self) -> None: self._verify_user_parameters(eppn) def test_foreign_eid_mfa_token_verify_no_mfa_token_in_session(self) -> None: + self.app.conf.allow_eidas_credential_verification = True + eppn = self.test_user.eppn identity = self.test_user_eidas credential = self.add_security_key_to_user(eppn, "test", "webauthn") diff --git a/src/eduid/webapp/eidas/views.py b/src/eduid/webapp/eidas/views.py index 55639545c..5606bf8c4 100644 --- a/src/eduid/webapp/eidas/views.py +++ b/src/eduid/webapp/eidas/views.py @@ -81,6 +81,10 @@ def verify_credential( ) -> FluxData: current_app.logger.debug(f"verify-credential called with credential_id: {credential_id}") + if method == "eidas" and current_app.conf.allow_eidas_credential_verification is False: + current_app.logger.error("Credential verification is not allowed") + return error_response(message=EidasMsg.credential_verification_not_allowed) + _frontend_action = FrontendAction.VERIFY_CREDENTIAL if frontend_action != _frontend_action.value: From 200a7580fa13895763f8b4304dbee03e0fc323e4 Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Fri, 6 Dec 2024 14:51:58 +0100 Subject: [PATCH 4/6] refactor authn context to loa shared code --- src/eduid/webapp/bankid/acs_actions.py | 8 +- src/eduid/webapp/bankid/helpers.py | 2 +- src/eduid/webapp/bankid/proofing.py | 121 ++---------- src/eduid/webapp/bankid/settings/common.py | 10 +- src/eduid/webapp/common/proofing/base.py | 184 +++++++++++++++++- .../webapp/common/proofing/saml_helpers.py | 16 -- src/eduid/webapp/eidas/acs_actions.py | 8 +- src/eduid/webapp/eidas/helpers.py | 4 +- src/eduid/webapp/eidas/proofing.py | 147 +++----------- src/eduid/webapp/eidas/settings/common.py | 8 +- 10 files changed, 253 insertions(+), 255 deletions(-) diff --git a/src/eduid/webapp/bankid/acs_actions.py b/src/eduid/webapp/bankid/acs_actions.py index fa35e76a0..f2ea117db 100644 --- a/src/eduid/webapp/bankid/acs_actions.py +++ b/src/eduid/webapp/bankid/acs_actions.py @@ -10,7 +10,7 @@ from eduid.webapp.common.authn.utils import check_reauthn from eduid.webapp.common.proofing.messages import ProofingMsg from eduid.webapp.common.proofing.methods import ProofingMethodSAML -from eduid.webapp.common.proofing.saml_helpers import authn_ctx_to_loa, is_required_loa, is_valid_authn_instant +from eduid.webapp.common.proofing.saml_helpers import is_required_loa, is_valid_authn_instant from eduid.webapp.common.session import session from eduid.webapp.common.session.namespaces import SP_AuthnRequest @@ -25,7 +25,7 @@ def common_saml_checks(args: ACSArgs) -> ACSResult | None: """ assert isinstance(args.proofing_method, ProofingMethodSAML) # please mypy if not is_required_loa( - args.session_info, args.proofing_method.required_loa, current_app.conf.authentication_context_map + args.session_info, args.proofing_method.required_loa, current_app.conf.loa_authn_context_map ): args.authn_req.error = True args.authn_req.status = BankIDMsg.authn_context_mismatch.value @@ -150,7 +150,9 @@ def verify_credential_action(user: User, args: ACSArgs) -> ACSResult: current_app.stats.count(name=f"verify_credential_{args.proofing_method.method}_identity_not_matching") return ACSResult(message=BankIDMsg.identity_not_matching) - loa = authn_ctx_to_loa(args.session_info, current_app.conf.authentication_context_map) + loa = None + if parsed.info.authn_context is not None: + loa = current_app.conf.authn_context_loa_map.get(parsed.info.authn_context) verify_result = proofing.verify_credential(user=user, credential=credential, loa=loa) if verify_result.error is not None: diff --git a/src/eduid/webapp/bankid/helpers.py b/src/eduid/webapp/bankid/helpers.py index eae6574a5..2c8afaeab 100644 --- a/src/eduid/webapp/bankid/helpers.py +++ b/src/eduid/webapp/bankid/helpers.py @@ -69,7 +69,7 @@ def create_authn_info( # LOA logger.debug(f"Requesting AuthnContext {required_loa}") - loa_uris = [current_app.conf.authentication_context_map[loa] for loa in required_loa] + loa_uris = [current_app.conf.loa_authn_context_map[loa] for loa in required_loa] kwargs["requested_authn_context"] = {"authn_context_class_ref": loa_uris, "comparison": "exact"} client = Saml2Client(current_app.saml2_config) diff --git a/src/eduid/webapp/bankid/proofing.py b/src/eduid/webapp/bankid/proofing.py index 5a449c86e..c77c3c0ce 100644 --- a/src/eduid/webapp/bankid/proofing.py +++ b/src/eduid/webapp/bankid/proofing.py @@ -5,8 +5,6 @@ from eduid.common.rpc.exceptions import AmTaskFailed from eduid.userdb import User from eduid.userdb.credentials import Credential -from eduid.userdb.credentials.external import BankIDCredential, ExternalCredential, TrustFramework -from eduid.userdb.element import ElementKey from eduid.userdb.exceptions import LockedIdentityViolation from eduid.userdb.identity import IdentityElement, IdentityType from eduid.userdb.logs.element import BankIDProofing, MFATokenBankIDProofing, NinProofingLogElement @@ -18,18 +16,34 @@ from eduid.webapp.common.api.helpers import verify_nin_for_user from eduid.webapp.common.api.messages import CommonMsg from eduid.webapp.common.proofing.base import ( + GenericResult, MatchResult, + MfaData, ProofingElementResult, ProofingFunctions, VerifyCredentialResult, VerifyUserResult, ) -from eduid.webapp.common.proofing.methods import ProofingMethod, ProofingMethodSAML -from eduid.webapp.common.session import session +from eduid.webapp.common.proofing.methods import ProofingMethod @dataclass class BankIDProofingFunctions(ProofingFunctions[BankIDSessionInfo]): + def get_mfa_data(self) -> GenericResult[MfaData]: + return GenericResult( + result=MfaData( + issuer=self.session_info.issuer, + authn_instant=self.session_info.authn_instant.isoformat(), + authn_context=self.session_info.authn_context, + ) + ) + + def get_current_loa(self) -> GenericResult[str | None]: + if self.session_info.authn_context is None: + return GenericResult(result=None) + current_loa = current_app.conf.authn_context_loa_map.get(self.session_info.authn_context) + return GenericResult(result=current_loa) + def get_identity(self, user: User) -> IdentityElement | None: return user.identities.nin @@ -113,65 +127,6 @@ def match_identity(self, user: User, proofing_method: ProofingMethod) -> MatchRe proofing_method=proofing_method, ) - def _match_identity_for_mfa( - self, user: User, identity_type: IdentityType, asserted_unique_value: str, proofing_method: ProofingMethod - ) -> MatchResult: - user_identity = user.identities.find(identity_type) - user_locked_identity = user.locked_identity.find(identity_type) - - if user_identity and (user_identity.unique_value == asserted_unique_value and user_identity.is_verified): - # asserted identity matched verified identity - mfa_success = True - current_app.logger.debug(f"Current identity {user_identity} matched asserted identity") - elif user_locked_identity and user_locked_identity.unique_value == asserted_unique_value: - # previously verified identity that the user just showed possession of - mfa_success = True - current_app.logger.debug(f"Locked identity {user_locked_identity} matched asserted identity") - # and we can verify it again - proofing_user = ProofingUser.from_user(user, current_app.private_userdb) - res = self.verify_identity(user=proofing_user) - if res.error is not None: - # If a message was returned, verifying the identity failed, and we abort - return MatchResult(error=res.error) - elif user_identity is None and user_locked_identity is None: - # TODO: we _could_ allow the user to give consent to just adding this identity to the user here, - # with a request parameter passed from frontend to /mfa-authentication for example. - mfa_success = False - current_app.logger.debug("No identity or locked identity found for user") - else: - mfa_success = False - current_app.logger.debug("No matching identity found for user") - - credential_used = None - if mfa_success: - assert isinstance(proofing_method, ProofingMethodSAML) # please mypy - credential_used = _find_or_add_credential(user, proofing_method.framework, proofing_method.required_loa) - current_app.logger.debug(f"Found or added credential {credential_used}") - - # OLD way - remove as soon as possible - # update session - session.mfa_action.success = mfa_success - if mfa_success is True: - # add metadata if the authentication was a success - session.mfa_action.issuer = self.session_info.issuer - session.mfa_action.authn_instant = self.session_info.authn_instant.isoformat() - session.mfa_action.authn_context = self.session_info.authn_context - session.mfa_action.credential_used = credential_used - - if not mfa_success: - current_app.logger.error("Asserted identity not matching user verified identity") - current_identity = self.get_identity(user) - current_unique_value = None - if current_identity: - current_unique_value = current_identity.unique_value - current_app.logger.debug(f"Current identity: {current_identity}") - current_app.logger.debug( - f"Current identity unique value: {current_unique_value}. Asserted unique value: {asserted_unique_value}" - ) - current_app.logger.debug(f"Asserted attributes: {self.session_info.attributes}") - - return MatchResult(matched=mfa_success, credential_used=credential_used) - def mark_credential_as_verified(self, credential: Credential, loa: str | None) -> VerifyCredentialResult: if loa != "uncertified-loa3": return VerifyCredentialResult(error=BankIDMsg.authn_context_mismatch) @@ -183,46 +138,6 @@ def mark_credential_as_verified(self, credential: Credential, loa: str | None) - return VerifyCredentialResult(credential=credential) -def _find_or_add_credential(user: User, framework: TrustFramework | None, required_loa: list[str]) -> ElementKey | None: - if not required_loa: - # mainly keep mypy calm - current_app.logger.debug("Not recording credential used without required_loa") - return None - - cred: ExternalCredential - this: ExternalCredential - if framework == TrustFramework.BANKID: - for this in user.credentials.filter(BankIDCredential): - if this.level in required_loa: - current_app.logger.debug(f"Found suitable credential on user: {this}") - return this.key - - cred = BankIDCredential(level=required_loa[0]) - cred.created_by = current_app.conf.app_name - else: - current_app.logger.info(f"Not recording credential used for unknown trust framework: {framework}") - return None - - # Reload the user from the central database, to not overwrite any earlier NIN proofings - _user = current_app.central_userdb.get_user_by_eppn(user.eppn) - - proofing_user = ProofingUser.from_user(_user, current_app.private_userdb) - - proofing_user.credentials.add(cred) - - current_app.logger.info(f"Adding new credential to proofing_user: {cred}") - - # Save proofing_user to private db - current_app.private_userdb.save(proofing_user) - - # Ask AM to sync proofing_user to central db - current_app.logger.info(f"Request sync for proofing_user {proofing_user}") - result = current_app.am_relay.request_user_sync(proofing_user) - current_app.logger.info(f"Sync result for proofing_user {proofing_user}: {result}") - - return cred.key - - def get_proofing_functions( session_info: BaseSessionInfo, app_name: str, diff --git a/src/eduid/webapp/bankid/settings/common.py b/src/eduid/webapp/bankid/settings/common.py index 0ad1ce9a5..431259bd1 100644 --- a/src/eduid/webapp/bankid/settings/common.py +++ b/src/eduid/webapp/bankid/settings/common.py @@ -2,6 +2,8 @@ Configuration (file) handling for the eduID eidas app. """ +from functools import cached_property + from pydantic import Field from eduid.common.config.base import ( @@ -33,11 +35,17 @@ class BankIDConfig( app_name: str = "bankid" # Federation config - authentication_context_map: dict[str, str] = Field( + loa_authn_context_map: dict[str, str] = Field( default={ "uncertified-loa3": "http://id.swedenconnect.se/loa/1.0/uncertified-loa3", } ) + + # + @cached_property + def authn_context_loa_map(self) -> dict[str, str]: + return {value: key for key, value in self.loa_authn_context_map.items()} + # magic cookie IdP is used for integration tests when magic cookie is set magic_cookie_idp: str | None = None magic_cookie_foreign_id_idp: str | None = None diff --git a/src/eduid/webapp/common/proofing/base.py b/src/eduid/webapp/common/proofing/base.py index 94c1b754b..022ab022b 100644 --- a/src/eduid/webapp/common/proofing/base.py +++ b/src/eduid/webapp/common/proofing/base.py @@ -6,17 +6,39 @@ from eduid.common.rpc.exceptions import AmTaskFailed from eduid.userdb import User from eduid.userdb.credentials import Credential +from eduid.userdb.credentials.external import ( + BankIDCredential, + EidasCredential, + ExternalCredential, + FrejaCredential, + SwedenConnectCredential, + TrustFramework, +) from eduid.userdb.element import ElementKey -from eduid.userdb.identity import IdentityElement +from eduid.userdb.identity import IdentityElement, IdentityType from eduid.userdb.logs.element import ProofingLogElement from eduid.userdb.proofing import ProofingUser from eduid.webapp.common.api.messages import CommonMsg, TranslatableMsg from eduid.webapp.common.api.utils import save_and_sync_user -from eduid.webapp.common.proofing.methods import ProofingMethod +from eduid.webapp.common.proofing.methods import ( + ProofingMethod, + ProofingMethodBankID, + ProofingMethodFrejaEID, + ProofingMethodSAML, +) +from eduid.webapp.common.session import session from eduid.webapp.eidas.app import current_eidas_app as current_app SessionInfoVar = TypeVar("SessionInfoVar") +T = TypeVar("T") + + +@dataclass +class GenericResult(Generic[T]): + result: T | None = None + error: TranslatableMsg | None = None + @dataclass class MatchResult: @@ -42,6 +64,13 @@ class ProofingElementResult: error: TranslatableMsg | None = None +@dataclass +class MfaData: + issuer: str + authn_instant: str + authn_context: str | None + + @dataclass() class ProofingFunctions(ABC, Generic[SessionInfoVar]): session_info: SessionInfoVar @@ -49,6 +78,12 @@ class ProofingFunctions(ABC, Generic[SessionInfoVar]): config: ProofingConfigMixin backdoor: bool + def get_current_loa(self) -> GenericResult[str | None]: + raise NotImplementedError("Subclass must implement get_current_loa") + + def get_mfa_data(self) -> GenericResult[MfaData]: + raise NotImplementedError("Subclass must implement get_mfa_data") + def get_identity(self, user: User) -> IdentityElement | None: raise NotImplementedError("Subclass must implement get_identity") @@ -105,3 +140,148 @@ def credential_proofing_element(self, user: User, credential: Credential) -> Pro def mark_credential_as_verified(self, credential: Credential, loa: str | None) -> VerifyCredentialResult: raise NotImplementedError("Subclass must implement mark_credential_as_verified") + + def _match_identity_for_mfa( + self, user: User, identity_type: IdentityType, asserted_unique_value: str, proofing_method: ProofingMethod + ) -> MatchResult: + credential_used = None + user_identity = user.identities.find(identity_type) + user_locked_identity = user.locked_identity.find(identity_type) + + if user_identity and (user_identity.unique_value == asserted_unique_value and user_identity.is_verified): + # asserted identity matched verified identity + mfa_success = True + current_app.logger.debug(f"Current identity {user_identity} matched asserted identity") + elif user_locked_identity and user_locked_identity.unique_value == asserted_unique_value: + # previously verified identity that the user just showed possession of + mfa_success = True + current_app.logger.debug(f"Locked identity {user_locked_identity} matched asserted identity") + # and we can verify it again + proofing_user = ProofingUser.from_user(user, current_app.private_userdb) + res = self.verify_identity(user=proofing_user) + if res.error is not None: + # If a message was returned, verifying the identity failed, and we abort + return MatchResult(error=res.error) + elif user_identity is None and user_locked_identity is None: + # TODO: we _could_ allow the user to give consent to just adding this identity to the user here, + # with a request parameter passed from frontend to /mfa-authentication for example. + mfa_success = False + current_app.logger.debug("No identity or locked identity found for user") + else: + mfa_success = False + current_app.logger.debug("No matching identity found for user") + + match proofing_method: + case ProofingMethodSAML() | ProofingMethodBankID() | ProofingMethodFrejaEID(): + current_loa = self.get_current_loa() + + mfa_data = self.get_mfa_data() + if mfa_data.error is not None: + return MatchResult(error=mfa_data.error) + assert mfa_data.result is not None # please mypy + + if mfa_success: + credential_used = self._find_or_add_credential( + user, proofing_method.framework, current_loa.result, proofing_method.required_loa + ) + current_app.logger.debug(f"Found or added credential {credential_used}") + case _: + raise NotImplementedError(f"Proofing method {proofing_method} not supported") + + # OLD way - remove as soon as possible + # update session + session.mfa_action.success = mfa_success + if mfa_success is True: + # add metadata if the authentication was a success + session.mfa_action.issuer = mfa_data.result.issuer + session.mfa_action.authn_instant = mfa_data.result.authn_instant + session.mfa_action.authn_context = mfa_data.result.authn_context + session.mfa_action.credential_used = credential_used + + if not mfa_success: + current_app.logger.error("Asserted identity not matching user verified identity") + current_identity = self.get_identity(user) + current_unique_value = None + if current_identity: + current_unique_value = current_identity.unique_value + current_app.logger.debug(f"Current identity: {current_identity}") + current_app.logger.debug( + f"Current identity unique value: {current_unique_value}. Asserted unique value: {asserted_unique_value}" + ) + current_app.logger.debug(f"Session info: {self.session_info}") + + return MatchResult(matched=mfa_success, credential_used=credential_used) + + @staticmethod + def _find_or_add_credential( + user: User, framework: TrustFramework | None, current_loa: str | None, required_loa: list[str] + ) -> ElementKey | None: + if not required_loa: + # mainly keep mypy calm + current_app.logger.debug("Not recording credential used without required_loa") + return None + if current_loa not in required_loa: + current_app.logger.error( + f"Can not add or find credential because current_loa {current_loa} not in required_loa {required_loa}" + ) + return None + + cred: ExternalCredential + this: ExternalCredential + match framework: + case TrustFramework.SWECONN: + for this in user.credentials.filter(SwedenConnectCredential): + if this.level in required_loa: + current_app.logger.debug(f"Found suitable credential on user: {this}") + return this.key + + cred = SwedenConnectCredential(level=current_loa) + cred.created_by = current_app.conf.app_name + if cred.level == "loa3": + # TODO: proof token as SWAMID_AL3_MFA? + pass + case TrustFramework.EIDAS: + for this in user.credentials.filter(EidasCredential): + if this.level in required_loa: + current_app.logger.debug(f"Found suitable credential on user: {this}") + return this.key + + cred = EidasCredential(level=current_loa) + cred.created_by = current_app.conf.app_name + case TrustFramework.BANKID: + for this in user.credentials.filter(BankIDCredential): + if this.level in required_loa: + current_app.logger.debug(f"Found suitable credential on user: {this}") + return this.key + + cred = BankIDCredential(level=current_loa) + cred.created_by = current_app.conf.app_name + case TrustFramework.FREJA: + for this in user.credentials.filter(FrejaCredential): + if this.level in required_loa: + current_app.logger.debug(f"Found suitable credential on user: {this}") + return this.key + + cred = FrejaCredential(level=current_loa) + cred.created_by = current_app.conf.app_name + case _: + current_app.logger.info(f"Not recording credential used for unknown trust framework: {framework}") + return None + + # Reload the user from the central database, to not overwrite any earlier NIN proofings + _user = current_app.central_userdb.get_user_by_eppn(user.eppn) + proofing_user = ProofingUser.from_user(_user, current_app.private_userdb) + + # add cred to proofing_user + current_app.logger.info(f"Adding new credential to proofing_user: {cred}") + proofing_user.credentials.add(cred) + + # Save proofing_user to private db + current_app.private_userdb.save(proofing_user) + + # Ask AM to sync proofing_user to central db + current_app.logger.info(f"Request sync for proofing_user {proofing_user}") + result = current_app.am_relay.request_user_sync(proofing_user) + current_app.logger.info(f"Sync result for proofing_user {proofing_user}: {result}") + + return cred.key diff --git a/src/eduid/webapp/common/proofing/saml_helpers.py b/src/eduid/webapp/common/proofing/saml_helpers.py index 05ee55eb0..73ae8f625 100644 --- a/src/eduid/webapp/common/proofing/saml_helpers.py +++ b/src/eduid/webapp/common/proofing/saml_helpers.py @@ -33,22 +33,6 @@ def is_required_loa( return False -def authn_ctx_to_loa(session_info: SessionInfo, authentication_context_map: dict[str, str]) -> str | None: - """Lookup short name (such as 'loa3') for an authentication context class we've received.""" - parsed = BaseSessionInfo(**session_info) - for k, v in authentication_context_map.items(): - if v == parsed.authn_context: - return k - return None - - -def authn_context_class_to_loa(session_info: BaseSessionInfo, authentication_context_map: dict[str, str]) -> str | None: - for key, value in authentication_context_map.items(): - if value == session_info.authn_context: - return key - return None - - def is_valid_authn_instant(session_info: SessionInfo, max_age: int = 60) -> bool: """ :param session_info: The SAML2 session_info diff --git a/src/eduid/webapp/eidas/acs_actions.py b/src/eduid/webapp/eidas/acs_actions.py index be876d861..4de423ce0 100644 --- a/src/eduid/webapp/eidas/acs_actions.py +++ b/src/eduid/webapp/eidas/acs_actions.py @@ -7,7 +7,7 @@ from eduid.webapp.common.authn.utils import check_reauthn from eduid.webapp.common.proofing.messages import ProofingMsg from eduid.webapp.common.proofing.methods import ProofingMethodSAML -from eduid.webapp.common.proofing.saml_helpers import authn_ctx_to_loa, is_required_loa, is_valid_authn_instant +from eduid.webapp.common.proofing.saml_helpers import is_required_loa, is_valid_authn_instant from eduid.webapp.common.session import session from eduid.webapp.common.session.namespaces import SP_AuthnRequest from eduid.webapp.eidas.app import current_eidas_app as current_app @@ -24,7 +24,7 @@ def common_saml_checks(args: ACSArgs) -> ACSResult | None: """ assert isinstance(args.proofing_method, ProofingMethodSAML) # please mypy if not is_required_loa( - args.session_info, args.proofing_method.required_loa, current_app.conf.authentication_context_map + args.session_info, args.proofing_method.required_loa, current_app.conf.loa_authn_context_map ): current_app.logger.error("SAML response did not meet required LOA") args.authn_req.error = True @@ -154,7 +154,9 @@ def verify_credential_action(user: User, args: ACSArgs) -> ACSResult: current_app.stats.count(name=f"verify_credential_{args.proofing_method.method}_identity_not_matching") return ACSResult(message=EidasMsg.identity_not_matching) - loa = authn_ctx_to_loa(args.session_info, current_app.conf.authentication_context_map) + loa = None + if parsed.info.authn_context is not None: + loa = current_app.conf.authn_context_loa_map.get(parsed.info.authn_context) verify_result = proofing.verify_credential(user=user, credential=credential, loa=loa) if verify_result.error is not None: diff --git a/src/eduid/webapp/eidas/helpers.py b/src/eduid/webapp/eidas/helpers.py index a01a47e56..1d405b1cc 100644 --- a/src/eduid/webapp/eidas/helpers.py +++ b/src/eduid/webapp/eidas/helpers.py @@ -54,6 +54,8 @@ class EidasMsg(TranslatableMsg): mfa_authn_success = "eidas.mfa_authn_success" # Successfully verified a credential credential_verify_success = "eidas.credential_verify_success" + # be able to toggle eidas credential verification allowed + credential_verification_not_allowed = "eidas.credential_verification_not_allowed" def create_authn_info( @@ -72,7 +74,7 @@ def create_authn_info( # LOA logger.debug(f"Requesting AuthnContext {required_loa}") - loa_uris = [current_app.conf.authentication_context_map[loa] for loa in required_loa] + loa_uris = [current_app.conf.loa_authn_context_map[loa] for loa in required_loa] kwargs["requested_authn_context"] = {"authn_context_class_ref": loa_uris, "comparison": "exact"} client = Saml2Client(current_app.saml2_config) diff --git a/src/eduid/webapp/eidas/proofing.py b/src/eduid/webapp/eidas/proofing.py index 9a0e3e598..a2eff6243 100644 --- a/src/eduid/webapp/eidas/proofing.py +++ b/src/eduid/webapp/eidas/proofing.py @@ -6,12 +6,6 @@ from eduid.common.rpc.exceptions import AmTaskFailed from eduid.userdb import EIDASIdentity, User from eduid.userdb.credentials import Credential -from eduid.userdb.credentials.external import ( - EidasCredential, - ExternalCredential, - SwedenConnectCredential, - TrustFramework, -) from eduid.userdb.element import ElementKey from eduid.userdb.exceptions import LockedIdentityViolation from eduid.userdb.identity import EIDASLoa, IdentityElement, IdentityProofingMethod, IdentityType, PridPersistence @@ -28,15 +22,15 @@ from eduid.webapp.common.api.helpers import set_user_names_from_foreign_id, verify_nin_for_user from eduid.webapp.common.api.messages import CommonMsg from eduid.webapp.common.proofing.base import ( + GenericResult, MatchResult, + MfaData, ProofingElementResult, ProofingFunctions, VerifyCredentialResult, VerifyUserResult, ) -from eduid.webapp.common.proofing.methods import ProofingMethod, ProofingMethodSAML -from eduid.webapp.common.proofing.saml_helpers import authn_context_class_to_loa -from eduid.webapp.common.session import session +from eduid.webapp.common.proofing.methods import ProofingMethod from eduid.webapp.eidas.app import current_eidas_app as current_app from eduid.webapp.eidas.helpers import EidasMsg from eduid.webapp.eidas.saml_session_info import BaseSessionInfo, ForeignEidSessionInfo, NinSessionInfo @@ -46,6 +40,21 @@ @dataclass class SwedenConnectProofingFunctions(ProofingFunctions[BaseSessionInfoVar], Generic[BaseSessionInfoVar]): + def get_mfa_data(self) -> GenericResult[MfaData]: + return GenericResult( + result=MfaData( + issuer=self.session_info.issuer, + authn_instant=self.session_info.authn_instant.isoformat(), + authn_context=self.session_info.authn_context, + ) + ) + + def get_current_loa(self) -> GenericResult[str | None]: + if self.session_info.authn_context is None: + return GenericResult(result=None) + current_loa = current_app.conf.authn_context_loa_map.get(self.session_info.authn_context) + return GenericResult(result=current_loa) + def get_identity(self, user: User) -> IdentityElement | None: raise NotImplementedError("Subclass must implement get_identity") @@ -61,65 +70,6 @@ def credential_proofing_element(self, user: User, credential: Credential) -> Pro def mark_credential_as_verified(self, credential: Credential, loa: str | None) -> VerifyCredentialResult: raise NotImplementedError("Subclass must implement mark_credential_as_verified") - def _match_identity_for_mfa( - self, user: User, identity_type: IdentityType, asserted_unique_value: str, proofing_method: ProofingMethod - ) -> MatchResult: - user_identity = user.identities.find(identity_type) - user_locked_identity = user.locked_identity.find(identity_type) - - if user_identity and (user_identity.unique_value == asserted_unique_value and user_identity.is_verified): - # asserted identity matched verified identity - mfa_success = True - current_app.logger.debug(f"Current identity {user_identity} matched asserted identity") - elif user_locked_identity and user_locked_identity.unique_value == asserted_unique_value: - # previously verified identity that the user just showed possession of - mfa_success = True - current_app.logger.debug(f"Locked identity {user_locked_identity} matched asserted identity") - # and we can verify it again - proofing_user = ProofingUser.from_user(user, current_app.private_userdb) - res = self.verify_identity(user=proofing_user) - if res.error is not None: - # If a message was returned, verifying the identity failed, and we abort - return MatchResult(error=res.error) - elif user_identity is None and user_locked_identity is None: - # TODO: we _could_ allow the user to give consent to just adding this identity to the user here, - # with a request parameter passed from frontend to /mfa-authentication for example. - mfa_success = False - current_app.logger.debug("No identity or locked identity found for user") - else: - mfa_success = False - current_app.logger.debug("No matching identity found for user") - - credential_used = None - if mfa_success: - assert isinstance(proofing_method, ProofingMethodSAML) # please mypy - credential_used = _find_or_add_credential(user, proofing_method.framework, proofing_method.required_loa) - current_app.logger.debug(f"Found or added credential {credential_used}") - - # OLD way - remove as soon as possible - # update session - session.mfa_action.success = mfa_success - if mfa_success is True: - # add metadata if the authentication was a success - session.mfa_action.issuer = self.session_info.issuer - session.mfa_action.authn_instant = self.session_info.authn_instant.isoformat() - session.mfa_action.authn_context = self.session_info.authn_context - session.mfa_action.credential_used = credential_used - - if not mfa_success: - current_app.logger.error("Asserted identity not matching user verified identity") - current_identity = self.get_identity(user) - current_unique_value = None - if current_identity: - current_unique_value = current_identity.unique_value - current_app.logger.debug(f"Current identity: {current_identity}") - current_app.logger.debug( - f"Current identity unique value: {current_unique_value}. Asserted unique value: {asserted_unique_value}" - ) - current_app.logger.debug(f"Asserted attributes: {self.session_info.attributes}") # type: ignore - - return MatchResult(matched=mfa_success, credential_used=credential_used) - @dataclass class FrejaProofingFunctions(SwedenConnectProofingFunctions[NinSessionInfo]): @@ -248,11 +198,11 @@ def verify_identity(self, user: User) -> VerifyUserResult: existing_identity = user.identities.eidas locked_identity = user.locked_identity.eidas - acc_loa = authn_context_class_to_loa( - session_info=self.session_info, authentication_context_map=current_app.conf.authentication_context_map - ) - assert acc_loa is not None # please mypy - loa = EIDASLoa(acc_loa) + acc_loa = self.get_current_loa() + if acc_loa is None or acc_loa.result is None: + return VerifyUserResult(error=EidasMsg.authn_context_mismatch) + + loa = EIDASLoa(acc_loa.result) date_of_birth = self.session_info.attributes.date_of_birth new_identity = EIDASIdentity( created_by=current_app.conf.app_name, @@ -386,57 +336,6 @@ def mark_credential_as_verified(self, credential: Credential, loa: str | None) - return VerifyCredentialResult(credential=credential) -def _find_or_add_credential(user: User, framework: TrustFramework | None, required_loa: list[str]) -> ElementKey | None: - if not required_loa: - # mainly keep mypy calm - current_app.logger.debug("Not recording credential used without required_loa") - return None - - cred: ExternalCredential - this: ExternalCredential - if framework == TrustFramework.SWECONN: - for this in user.credentials.filter(SwedenConnectCredential): - if this.level in required_loa: - current_app.logger.debug(f"Found suitable credential on user: {this}") - return this.key - - cred = SwedenConnectCredential(level=required_loa[0]) - cred.created_by = current_app.conf.app_name - if cred.level == "loa3": - # TODO: proof token as SWAMID_AL3_MFA? - pass - elif framework == TrustFramework.EIDAS: - for this in user.credentials.filter(EidasCredential): - if this.level in required_loa: - current_app.logger.debug(f"Found suitable credential on user: {this}") - return this.key - - cred = EidasCredential(level=required_loa[0]) - cred.created_by = current_app.conf.app_name - else: - current_app.logger.info(f"Not recording credential used for unknown trust framework: {framework}") - return None - - # Reload the user from the central database, to not overwrite any earlier NIN proofings - _user = current_app.central_userdb.get_user_by_eppn(user.eppn) - - proofing_user = ProofingUser.from_user(_user, current_app.private_userdb) - - proofing_user.credentials.add(cred) - - current_app.logger.info(f"Adding new credential to proofing_user: {cred}") - - # Save proofing_user to private db - current_app.private_userdb.save(proofing_user) - - # Ask AM to sync proofing_user to central db - current_app.logger.info(f"Request sync for proofing_user {proofing_user}") - result = current_app.am_relay.request_user_sync(proofing_user) - current_app.logger.info(f"Sync result for proofing_user {proofing_user}: {result}") - - return cred.key - - def get_proofing_functions( session_info: BaseSessionInfo, app_name: str, diff --git a/src/eduid/webapp/eidas/settings/common.py b/src/eduid/webapp/eidas/settings/common.py index a193a83cb..35b2a5d0b 100644 --- a/src/eduid/webapp/eidas/settings/common.py +++ b/src/eduid/webapp/eidas/settings/common.py @@ -3,6 +3,7 @@ """ from collections.abc import Mapping +from functools import cached_property from pydantic import Field @@ -35,7 +36,7 @@ class EidasConfig( app_name: str = "eidas" # Federation config - authentication_context_map: dict[str, str] = Field( + loa_authn_context_map: dict[str, str] = Field( default={ "loa1": "http://id.elegnamnden.se/loa/1.0/loa1", "loa2": "http://id.elegnamnden.se/loa/1.0/loa2", @@ -51,6 +52,10 @@ class EidasConfig( } ) + @cached_property + def authn_context_loa_map(self) -> dict[str, str]: + return {value: key for key, value in self.loa_authn_context_map.items()} + # Staging nin map nin_attribute_map: Mapping[str, str] = Field( default={ @@ -60,4 +65,5 @@ class EidasConfig( # magic cookie IdP is used for integration tests when magic cookie is set magic_cookie_idp: str | None = None magic_cookie_foreign_id_idp: str | None = None + allow_eidas_credential_verification: bool = False From 7c6221f7e24e1e3a717f04ea64b13114ac69ecbc Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Fri, 6 Dec 2024 14:53:34 +0100 Subject: [PATCH 5/6] no need for the same comment on all credentials use match instead of if --- src/eduid/userdb/credentials/external.py | 37 ++++++++++++------------ 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/eduid/userdb/credentials/external.py b/src/eduid/userdb/credentials/external.py index b49d96832..71e7b9fc7 100644 --- a/src/eduid/userdb/credentials/external.py +++ b/src/eduid/userdb/credentials/external.py @@ -42,40 +42,39 @@ def key(self) -> ElementKey: return ElementKey(self.credential_id) +# To be technology neutral, we don't want to store e.g. the SAML authnContextClassRef in the database, +# and mapping a level to an authnContextClassRef really ought to be dependent on configuration matching +# the IdP:s expected values at a certain time. Such configuration is better to have in the SP than in +# the database layer. class SwedenConnectCredential(ExternalCredential): framework: Literal[TrustFramework.SWECONN] = TrustFramework.SWECONN - # To be technology neutral, we don't want to store e.g. the SAML authnContextClassRef in the database, - # and mapping a level to an authnContextClassRef really ought to be dependent on configuration matching - # the IdP:s expected values at a certain time. Such configuration is better to have in the SP than in - # the database layer. level: str # a value like "loa3", "eidas_sub", ... class EidasCredential(ExternalCredential): framework: Literal[TrustFramework.EIDAS] = TrustFramework.EIDAS - # To be technology neutral, we don't want to store e.g. the SAML authnContextClassRef in the database, - # and mapping a level to an authnContextClassRef really ought to be dependent on configuration matching - # the IdP:s expected values at a certain time. Such configuration is better to have in the SP than in - # the database layer. level: str # a value like "loa3", "eidas_sub", ... class BankIDCredential(ExternalCredential): framework: Literal[TrustFramework.BANKID] = TrustFramework.BANKID - # To be technology neutral, we don't want to store e.g. the SAML authnContextClassRef in the database, - # and mapping a level to an authnContextClassRef really ought to be dependent on configuration matching - # the IdP:s expected values at a certain time. Such configuration is better to have in the SP than in - # the database layer. + level: str # a value like "loa3", "eidas_sub", ... + + class FrejaCredential(ExternalCredential): framework: Literal[TrustFramework.FREJA] = TrustFramework.FREJA level: str # a value like "loa3", "eidas_sub", ... def external_credential_from_dict(data: Mapping[str, Any]) -> ExternalCredential | None: - if data["framework"] == TrustFramework.SWECONN.value: - return SwedenConnectCredential.from_dict(data) - if data["framework"] == TrustFramework.EIDAS.value: - return EidasCredential.from_dict(data) - if data["framework"] == TrustFramework.BANKID.value: - return BankIDCredential.from_dict(data) - return None + match data["framework"]: + case TrustFramework.SWECONN.value: + return SwedenConnectCredential.from_dict(data) + case TrustFramework.EIDAS.value: + return EidasCredential.from_dict(data) + case TrustFramework.BANKID.value: + return BankIDCredential.from_dict(data) + case TrustFramework.FREJA.value: + return FrejaCredential.from_dict(data) + case _: + return None From 8e8cb4053c2cdafcbdae02e9a9e87b059e5f706e Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Fri, 6 Dec 2024 14:54:07 +0100 Subject: [PATCH 6/6] let eppn field also be known as eduPersonPrincipalName as it is in db --- src/eduid/userdb/logs/element.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eduid/userdb/logs/element.py b/src/eduid/userdb/logs/element.py index def522c6c..04c9a474e 100644 --- a/src/eduid/userdb/logs/element.py +++ b/src/eduid/userdb/logs/element.py @@ -65,7 +65,7 @@ class ProofingLogElement(LogElement): """ """ # eduPersonPrincipalName - eppn: str + eppn: str = Field(alias="eduPersonPrincipalName") # Proofing method version number proofing_version: str # Proofing method name