From 9eb33ce26fa67a623c3856244258d9e1d00d9a25 Mon Sep 17 00:00:00 2001 From: Sattvik Chakravarthy Date: Wed, 4 Dec 2024 18:22:22 +0530 Subject: [PATCH] fix: typing --- .../oauth2provider/api/implementation.py | 48 +++--- .../recipe/oauth2provider/api/user_info.py | 2 + .../recipe/oauth2provider/api/utils.py | 34 +++-- .../recipe/oauth2provider/interfaces.py | 138 +++++++++--------- .../oauth2provider/recipe_implementation.py | 134 ++++++++--------- 5 files changed, 182 insertions(+), 174 deletions(-) diff --git a/supertokens_python/recipe/oauth2provider/api/implementation.py b/supertokens_python/recipe/oauth2provider/api/implementation.py index 99d102be..cc06b796 100644 --- a/supertokens_python/recipe/oauth2provider/api/implementation.py +++ b/supertokens_python/recipe/oauth2provider/api/implementation.py @@ -42,9 +42,9 @@ async def login_get( self, login_challenge: str, options: APIOptions, - session: Optional[SessionContainer] = None, - should_try_refresh: bool = False, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + should_try_refresh: bool, + user_context: Dict[str, Any], ) -> Union[FrontendRedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: response = await login_get( recipe_implementation=options.recipe_implementation, @@ -52,6 +52,7 @@ async def login_get( session=session, should_try_refresh=should_try_refresh, is_direct_call=True, + cookies=None, user_context=user_context, ) @@ -82,7 +83,7 @@ async def auth_get( session: Optional[SessionContainer], should_try_refresh: bool, options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: response = await options.recipe_implementation.authorization( params=params, @@ -108,7 +109,7 @@ async def token_post( authorization_header: Optional[str], body: Any, options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[TokenInfo, ErrorOAuth2Response, GeneralErrorResponse]: return await options.recipe_implementation.token_exchange( authorization_header=authorization_header, @@ -120,7 +121,7 @@ async def login_info_get( self, login_challenge: str, options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[LoginInfo, ErrorOAuth2Response, GeneralErrorResponse]: login_res = await options.recipe_implementation.get_login_request( challenge=login_challenge, @@ -149,7 +150,7 @@ async def user_info_get( scopes: List[str], tenant_id: str, options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[Dict[str, Any], GeneralErrorResponse]: return await options.recipe_implementation.build_user_info( user=user, @@ -161,16 +162,16 @@ async def user_info_get( async def revoke_token_post( self, - token: str, options: APIOptions, - user_context: Dict[str, Any] = {}, - authorization_header: Optional[str] = None, - client_id: Optional[str] = None, - client_secret: Optional[str] = None, + token: str, + authorization_header: Optional[str], + client_id: Optional[str], + client_secret: Optional[str], + user_context: Dict[str, Any], ) -> Union[None, ErrorOAuth2Response, GeneralErrorResponse]: if authorization_header is not None: return await options.recipe_implementation.revoke_token( - input=RevokeTokenUsingAuthorizationHeader( + params=RevokeTokenUsingAuthorizationHeader( token=token, authorization_header=authorization_header, ), @@ -181,7 +182,7 @@ async def revoke_token_post( raise Exception("client_secret is required") return await options.recipe_implementation.revoke_token( - input=RevokeTokenUsingClientIDAndClientSecret( + params=RevokeTokenUsingClientIDAndClientSecret( token=token, client_id=client_id, client_secret=client_secret, @@ -198,7 +199,7 @@ async def introspect_token_post( token: str, scopes: Optional[List[str]], options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[ActiveTokenResponse, InactiveTokenResponse, GeneralErrorResponse]: return await options.recipe_implementation.introspect_token( token=token, @@ -210,9 +211,9 @@ async def end_session_get( self, params: Dict[str, str], options: APIOptions, - session: Optional[SessionContainer] = None, - should_try_refresh: bool = False, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + should_try_refresh: bool, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: response = await options.recipe_implementation.end_session( params=params, @@ -235,9 +236,9 @@ async def end_session_post( self, params: Dict[str, str], options: APIOptions, - session: Optional[SessionContainer] = None, - should_try_refresh: bool = False, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + should_try_refresh: bool, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: response = await options.recipe_implementation.end_session( params=params, @@ -260,8 +261,8 @@ async def logout_post( self, logout_challenge: str, options: APIOptions, - session: Optional[SessionContainer] = None, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + user_context: Dict[str, Any], ) -> Union[FrontendRedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: if session is not None: await session.revoke_session(user_context) @@ -277,6 +278,7 @@ async def logout_post( res = await handle_logout_internal_redirects( response=response, recipe_implementation=options.recipe_implementation, + session=session, user_context=user_context, ) diff --git a/supertokens_python/recipe/oauth2provider/api/user_info.py b/supertokens_python/recipe/oauth2provider/api/user_info.py index 7501559e..9fbdeaf1 100644 --- a/supertokens_python/recipe/oauth2provider/api/user_info.py +++ b/supertokens_python/recipe/oauth2provider/api/user_info.py @@ -60,6 +60,8 @@ async def user_info_get( try: payload = await api_options.recipe_implementation.validate_oauth2_access_token( token=access_token, + requirements=None, + check_database=None, user_context=user_context, ) diff --git a/supertokens_python/recipe/oauth2provider/api/utils.py b/supertokens_python/recipe/oauth2provider/api/utils.py index 201b8727..8fb9a5db 100644 --- a/supertokens_python/recipe/oauth2provider/api/utils.py +++ b/supertokens_python/recipe/oauth2provider/api/utils.py @@ -38,11 +38,11 @@ async def login_get( recipe_implementation: RecipeInterface, login_challenge: str, - session: Optional[SessionContainer] = None, - should_try_refresh: bool = False, - cookies: Optional[str] = None, - is_direct_call: bool = False, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + should_try_refresh: bool, + cookies: Optional[str], + is_direct_call: bool, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response]: login_request = await recipe_implementation.get_login_request( challenge=login_challenge, @@ -116,6 +116,10 @@ async def login_get( ): accept = await recipe_implementation.accept_login_request( challenge=login_challenge, + acr=None, + amr=None, + context=None, + extend_session_lifespan=None, subject=session.get_user_id(), identity_provider_session_id=session.get_handle(), user_context=user_context, @@ -128,7 +132,7 @@ async def login_get( if should_try_refresh and prompt_param != "login": return RedirectResponse( redirect_to=await recipe_implementation.get_frontend_redirection_url( - input=FrontendRedirectionURLTypeTryRefresh( + params=FrontendRedirectionURLTypeTryRefresh( login_challenge=login_challenge, ), user_context=user_context, @@ -152,7 +156,7 @@ async def login_get( return RedirectResponse( redirect_to=await recipe_implementation.get_frontend_redirection_url( - input=FrontendRedirectionURLTypeLogin( + params=FrontendRedirectionURLTypeLogin( login_challenge=login_challenge, force_fresh_auth=session is not None or prompt_param == "login", tenant_id=tenant_id_param or DEFAULT_TENANT_ID, @@ -168,9 +172,7 @@ async def login_get( ) -def get_merged_cookies( - orig_cookies: str = "", new_cookies: Optional[str] = None -) -> str: +def get_merged_cookies(orig_cookies: str, new_cookies: Optional[str]) -> str: if not new_cookies: return orig_cookies @@ -223,10 +225,10 @@ def is_logout_internal_redirect(redirect_to: str) -> bool: async def handle_login_internal_redirects( response: RedirectResponse, recipe_implementation: RecipeInterface, - session: Optional[SessionContainer] = None, - should_try_refresh: bool = False, - cookie: str = "", - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + should_try_refresh: bool, + cookie: str, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response]: if not is_login_internal_redirect(response.redirect_to): return response @@ -297,8 +299,8 @@ async def handle_login_internal_redirects( async def handle_logout_internal_redirects( response: RedirectResponse, recipe_implementation: RecipeInterface, - session: Optional[SessionContainer] = None, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response]: if not is_logout_internal_redirect(response.redirect_to): return response diff --git a/supertokens_python/recipe/oauth2provider/interfaces.py b/supertokens_python/recipe/oauth2provider/interfaces.py index 10ad4eae..af8eec1e 100644 --- a/supertokens_python/recipe/oauth2provider/interfaces.py +++ b/supertokens_python/recipe/oauth2provider/interfaces.py @@ -368,7 +368,7 @@ async def authorization( params: Dict[str, str], cookies: Optional[str], session: Optional[SessionContainer], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response]: pass @@ -377,13 +377,13 @@ async def token_exchange( self, authorization_header: Optional[str], body: Dict[str, Optional[str]], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[TokenInfo, ErrorOAuth2Response]: pass @abstractmethod async def get_consent_request( - self, challenge: str, user_context: Dict[str, Any] = {} + self, challenge: str, user_context: Dict[str, Any] ) -> ConsentRequest: pass @@ -391,16 +391,16 @@ async def get_consent_request( async def accept_consent_request( self, challenge: str, - context: Optional[Any] = None, - grant_access_token_audience: Optional[List[str]] = None, - grant_scope: Optional[List[str]] = None, - handled_at: Optional[str] = None, - tenant_id: str = "", - rsub: str = "", - session_handle: str = "", - initial_access_token_payload: Optional[Dict[str, Any]] = None, - initial_id_token_payload: Optional[Dict[str, Any]] = None, - user_context: Dict[str, Any] = {}, + context: Optional[Any], + grant_access_token_audience: Optional[List[str]], + grant_scope: Optional[List[str]], + handled_at: Optional[str], + tenant_id: str, + rsub: str, + session_handle: str, + initial_access_token_payload: Optional[Dict[str, Any]], + initial_id_token_payload: Optional[Dict[str, Any]], + user_context: Dict[str, Any], ) -> RedirectResponse: pass @@ -420,13 +420,13 @@ async def get_login_request( async def accept_login_request( self, challenge: str, - acr: Optional[str] = None, - amr: Optional[List[str]] = None, - context: Optional[Any] = None, - extend_session_lifespan: Optional[bool] = None, - identity_provider_session_id: Optional[str] = None, - subject: str = "", - user_context: Dict[str, Any] = {}, + acr: Optional[str], + amr: Optional[List[str]], + context: Optional[Any], + extend_session_lifespan: Optional[bool], + identity_provider_session_id: Optional[str], + subject: str, + user_context: Dict[str, Any], ) -> RedirectResponse: pass @@ -435,17 +435,17 @@ async def reject_login_request( self, challenge: str, error: ErrorOAuth2Response, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> RedirectResponse: pass @abstractmethod async def get_oauth2_clients( self, - page_size: Optional[int] = None, - pagination_token: Optional[str] = None, - client_name: Optional[str] = None, - user_context: Dict[str, Any] = {}, + page_size: Optional[int], + pagination_token: Optional[str], + client_name: Optional[str], + user_context: Dict[str, Any], ) -> Union[GetOAuth2ClientsOkResult, ErrorOAuth2Response]: pass @@ -453,21 +453,21 @@ async def get_oauth2_clients( async def get_oauth2_client( self, client_id: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[GetOAuth2ClientOkResult, ErrorOAuth2Response]: pass @abstractmethod async def create_oauth2_client( self, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[CreateOAuth2ClientOkResult, ErrorOAuth2Response]: pass @abstractmethod async def update_oauth2_client( self, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[UpdateOAuth2ClientOkResult, ErrorOAuth2Response]: pass @@ -475,7 +475,7 @@ async def update_oauth2_client( async def delete_oauth2_client( self, client_id: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[DeleteOAuth2ClientOkResult, ErrorOAuth2Response]: pass @@ -483,9 +483,9 @@ async def delete_oauth2_client( async def validate_oauth2_access_token( self, token: str, - requirements: Optional[OAuth2TokenValidationRequirements] = None, - check_database: Optional[bool] = None, - user_context: Dict[str, Any] = {}, + requirements: Optional[OAuth2TokenValidationRequirements], + check_database: Optional[bool], + user_context: Dict[str, Any], ) -> Dict[str, Any]: pass @@ -496,7 +496,7 @@ async def get_requested_scopes( session_handle: Optional[str], scope_param: List[str], client_id: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> List[str]: pass @@ -507,7 +507,7 @@ async def build_access_token_payload( client: OAuth2Client, session_handle: Optional[str], scopes: List[str], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Dict[str, Any]: pass @@ -518,7 +518,7 @@ async def build_id_token_payload( client: OAuth2Client, session_handle: Optional[str], scopes: List[str], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Dict[str, Any]: pass @@ -529,31 +529,31 @@ async def build_user_info( access_token_payload: Dict[str, Any], scopes: List[str], tenant_id: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Dict[str, Any]: pass @abstractmethod async def get_frontend_redirection_url( self, - input: Union[ + params: Union[ FrontendRedirectionURLTypeLogin, FrontendRedirectionURLTypeTryRefresh, FrontendRedirectionURLTypeLogoutConfirmation, FrontendRedirectionURLTypePostLogoutFallback, ], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> str: pass @abstractmethod async def revoke_token( self, - input: Union[ + params: Union[ RevokeTokenUsingAuthorizationHeader, RevokeTokenUsingClientIDAndClientSecret, ], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Optional[ErrorOAuth2Response]: pass @@ -561,7 +561,7 @@ async def revoke_token( async def revoke_tokens_by_client_id( self, client_id: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ): pass @@ -569,7 +569,7 @@ async def revoke_tokens_by_client_id( async def revoke_tokens_by_session_handle( self, session_handle: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ): pass @@ -577,8 +577,8 @@ async def revoke_tokens_by_session_handle( async def introspect_token( self, token: str, - scopes: Optional[List[str]] = None, - user_context: Dict[str, Any] = {}, + scopes: Optional[List[str]], + user_context: Dict[str, Any], ) -> Union[ActiveTokenResponse, InactiveTokenResponse]: pass @@ -587,8 +587,8 @@ async def end_session( self, params: Dict[str, str], should_try_refresh: bool, - session: Optional[SessionContainer] = None, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response]: pass @@ -596,7 +596,7 @@ async def end_session( async def accept_logout_request( self, challenge: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response]: pass @@ -604,7 +604,7 @@ async def accept_logout_request( async def reject_logout_request( self, challenge: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ): pass @@ -643,9 +643,9 @@ async def login_get( self, login_challenge: str, options: APIOptions, - session: Optional[SessionContainer] = None, - should_try_refresh: bool = False, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + should_try_refresh: bool, + user_context: Dict[str, Any], ) -> Union[FrontendRedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: pass @@ -657,7 +657,7 @@ async def auth_get( session: Optional[SessionContainer], should_try_refresh: bool, options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: pass @@ -667,7 +667,7 @@ async def token_post( authorization_header: Optional[str], body: Any, options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[TokenInfo, ErrorOAuth2Response, GeneralErrorResponse]: pass @@ -676,7 +676,7 @@ async def login_info_get( self, login_challenge: str, options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[ LoginInfo, ErrorOAuth2Response, @@ -692,19 +692,19 @@ async def user_info_get( scopes: List[str], tenant_id: str, options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[Dict[str, Any], GeneralErrorResponse]: pass @abstractmethod async def revoke_token_post( self, - token: str, options: APIOptions, - user_context: Dict[str, Any] = {}, - authorization_header: Optional[str] = None, - client_id: Optional[str] = None, - client_secret: Optional[str] = None, + token: str, + authorization_header: Optional[str], + client_id: Optional[str], + client_secret: Optional[str], + user_context: Dict[str, Any], ) -> Union[None, ErrorOAuth2Response, GeneralErrorResponse]: pass @@ -714,7 +714,7 @@ async def introspect_token_post( token: str, scopes: Optional[List[str]], options: APIOptions, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[ActiveTokenResponse, InactiveTokenResponse, GeneralErrorResponse]: pass @@ -723,9 +723,9 @@ async def end_session_get( self, params: Dict[str, str], options: APIOptions, - session: Optional[SessionContainer] = None, - should_try_refresh: bool = False, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + should_try_refresh: bool, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: pass @@ -734,9 +734,9 @@ async def end_session_post( self, params: Dict[str, str], options: APIOptions, - session: Optional[SessionContainer] = None, - should_try_refresh: bool = False, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + should_try_refresh: bool, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: pass @@ -745,7 +745,7 @@ async def logout_post( self, logout_challenge: str, options: APIOptions, - session: Optional[SessionContainer] = None, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + user_context: Dict[str, Any], ) -> Union[FrontendRedirectResponse, ErrorOAuth2Response, GeneralErrorResponse]: pass diff --git a/supertokens_python/recipe/oauth2provider/recipe_implementation.py b/supertokens_python/recipe/oauth2provider/recipe_implementation.py index 28395e9e..cdf34ccf 100644 --- a/supertokens_python/recipe/oauth2provider/recipe_implementation.py +++ b/supertokens_python/recipe/oauth2provider/recipe_implementation.py @@ -103,13 +103,13 @@ async def get_login_request( async def accept_login_request( self, challenge: str, - acr: Optional[str] = None, - amr: Optional[List[str]] = None, - context: Optional[Any] = None, - extend_session_lifespan: Optional[bool] = None, - identity_provider_session_id: Optional[str] = None, - subject: str = "", - user_context: Dict[str, Any] = {}, + acr: Optional[str], + amr: Optional[List[str]], + context: Optional[Any], + extend_session_lifespan: Optional[bool], + identity_provider_session_id: Optional[str], + subject: str, + user_context: Dict[str, Any], ) -> RedirectResponse: response = await self.querier.send_put_request( NormalisedURLPath("/recipe/oauth/auth/requests/login/accept"), @@ -135,7 +135,7 @@ async def reject_login_request( self, challenge: str, error: ErrorOAuth2Response, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> RedirectResponse: response = await self.querier.send_put_request( NormalisedURLPath("/recipe/oauth/auth/requests/login/reject"), @@ -154,7 +154,7 @@ async def reject_login_request( ) async def get_consent_request( - self, challenge: str, user_context: Dict[str, Any] = {} + self, challenge: str, user_context: Dict[str, Any] ) -> ConsentRequest: response = await self.querier.send_get_request( NormalisedURLPath("/recipe/oauth/auth/requests/consent"), @@ -167,16 +167,16 @@ async def get_consent_request( async def accept_consent_request( self, challenge: str, - context: Optional[Any] = None, - grant_access_token_audience: Optional[List[str]] = None, - grant_scope: Optional[List[str]] = None, - handled_at: Optional[str] = None, - tenant_id: str = "", - rsub: str = "", - session_handle: str = "", - initial_access_token_payload: Optional[Dict[str, Any]] = None, - initial_id_token_payload: Optional[Dict[str, Any]] = None, - user_context: Dict[str, Any] = {}, + context: Optional[Any], + grant_access_token_audience: Optional[List[str]], + grant_scope: Optional[List[str]], + handled_at: Optional[str], + tenant_id: str, + rsub: str, + session_handle: str, + initial_access_token_payload: Optional[Dict[str, Any]], + initial_id_token_payload: Optional[Dict[str, Any]], + user_context: Dict[str, Any], ) -> RedirectResponse: response = await self.querier.send_put_request( NormalisedURLPath("/recipe/oauth/auth/requests/consent/accept"), @@ -227,7 +227,7 @@ async def authorization( params: Dict[str, str], cookies: Optional[str], session: Optional[SessionContainer], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response]: # we handle this in the backend SDK level if params.get("prompt") == "none": @@ -352,10 +352,11 @@ async def authorization( ) consent_res = await self.accept_consent_request( - user_context=user_context, challenge=consent_request.challenge, + context=None, grant_access_token_audience=consent_request.requested_access_token_audience, grant_scope=consent_request.requested_scope, + handled_at=None, tenant_id=session.get_tenant_id(), rsub=session.get_recipe_user_id().get_as_string(), session_handle=session.get_handle(), @@ -363,6 +364,7 @@ async def authorization( payloads.get("accessToken") if payloads else None ), initial_id_token_payload=payloads.get("idToken") if payloads else None, + user_context=user_context, ) return RedirectResponse( @@ -375,7 +377,7 @@ async def token_exchange( self, authorization_header: Optional[str], body: Dict[str, Optional[str]], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[TokenInfo, ErrorOAuth2Response]: request_body = { "iss": await OpenIdRecipe.get_issuer(user_context), @@ -511,10 +513,10 @@ async def token_exchange( async def get_oauth2_clients( self, - page_size: Optional[int] = None, - pagination_token: Optional[str] = None, - client_name: Optional[str] = None, - user_context: Dict[str, Any] = {}, + page_size: Optional[int], + pagination_token: Optional[str], + client_name: Optional[str], + user_context: Dict[str, Any], ) -> Union[GetOAuth2ClientsOkResult, ErrorOAuth2Response]: body: Dict[str, Any] = {} if page_size is not None: @@ -545,7 +547,7 @@ async def get_oauth2_clients( ) async def get_oauth2_client( - self, client_id: str, user_context: Dict[str, Any] = {} + self, client_id: str, user_context: Dict[str, Any] ) -> Union[GetOAuth2ClientOkResult, ErrorOAuth2Response]: response = await self.querier.send_get_request( NormalisedURLPath("/recipe/oauth/clients"), @@ -567,7 +569,7 @@ async def get_oauth2_client( async def create_oauth2_client( self, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[CreateOAuth2ClientOkResult, ErrorOAuth2Response]: response = await self.querier.send_post_request( NormalisedURLPath("/recipe/oauth/clients"), @@ -583,7 +585,7 @@ async def create_oauth2_client( async def update_oauth2_client( self, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[UpdateOAuth2ClientOkResult, ErrorOAuth2Response]: response = await self.querier.send_put_request( NormalisedURLPath("/recipe/oauth/clients"), @@ -601,7 +603,7 @@ async def update_oauth2_client( async def delete_oauth2_client( self, client_id: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[DeleteOAuth2ClientOkResult, ErrorOAuth2Response]: response = await self.querier.send_post_request( NormalisedURLPath("/recipe/oauth/clients/remove"), @@ -618,9 +620,9 @@ async def delete_oauth2_client( async def validate_oauth2_access_token( self, token: str, - requirements: Optional[OAuth2TokenValidationRequirements] = None, - check_database: Optional[bool] = None, - user_context: Dict[str, Any] = {}, + requirements: Optional[OAuth2TokenValidationRequirements], + check_database: Optional[bool], + user_context: Dict[str, Any], ) -> Dict[str, Any]: # Verify token signature using session recipe's JWKS session_recipe = SessionRecipe.get_instance() @@ -677,7 +679,7 @@ async def get_requested_scopes( session_handle: Optional[str], scope_param: List[str], client_id: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> List[str]: _ = recipe_user_id _ = session_handle @@ -692,7 +694,7 @@ async def build_access_token_payload( client: OAuth2Client, session_handle: Optional[str], scopes: List[str], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Dict[str, Any]: if user is None or session_handle is None: return {} @@ -709,7 +711,7 @@ async def build_id_token_payload( client: OAuth2Client, session_handle: Optional[str], scopes: List[str], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Dict[str, Any]: if user is None or session_handle is None: return {} @@ -726,7 +728,7 @@ async def build_user_info( access_token_payload: Dict[str, Any], scopes: List[str], tenant_id: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Dict[str, Any]: return await self._get_default_user_info_payload( user, access_token_payload, scopes, tenant_id, user_context @@ -734,26 +736,26 @@ async def build_user_info( async def get_frontend_redirection_url( self, - input: Union[ + params: Union[ FrontendRedirectionURLTypeLogin, FrontendRedirectionURLTypeTryRefresh, FrontendRedirectionURLTypeLogoutConfirmation, FrontendRedirectionURLTypePostLogoutFallback, ], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> str: website_domain = self.app_info.get_origin( None, user_context ).get_as_string_dangerous() website_base_path = self.app_info.api_base_path.get_as_string_dangerous() - if isinstance(input, FrontendRedirectionURLTypeLogin): - query_params: Dict[str, str] = {"loginChallenge": input.login_challenge} - if input.tenant_id != "public": # DEFAULT_TENANT_ID is "public" - query_params["tenantId"] = input.tenant_id - if input.hint is not None: - query_params["hint"] = input.hint - if input.force_fresh_auth: + if isinstance(params, FrontendRedirectionURLTypeLogin): + query_params: Dict[str, str] = {"loginChallenge": params.login_challenge} + if params.tenant_id != "public": # DEFAULT_TENANT_ID is "public" + query_params["tenantId"] = params.tenant_id + if params.hint is not None: + query_params["hint"] = params.hint + if params.force_fresh_auth: query_params["forceFreshAuth"] = "true" query_string = "&".join( @@ -761,30 +763,30 @@ async def get_frontend_redirection_url( ) return f"{website_domain}{website_base_path}?{query_string}" - elif isinstance(input, FrontendRedirectionURLTypeTryRefresh): - return f"{website_domain}{website_base_path}/try-refresh?loginChallenge={input.login_challenge}" + elif isinstance(params, FrontendRedirectionURLTypeTryRefresh): + return f"{website_domain}{website_base_path}/try-refresh?loginChallenge={params.login_challenge}" - elif isinstance(input, FrontendRedirectionURLTypePostLogoutFallback): + elif isinstance(params, FrontendRedirectionURLTypePostLogoutFallback): return f"{website_domain}{website_base_path}" - else: # isinstance(input, FrontendRedirectionURLTypeLogoutConfirmation) - return f"{website_domain}{website_base_path}/oauth/logout?logoutChallenge={input.logout_challenge}" + else: # isinstance(params, FrontendRedirectionURLTypeLogoutConfirmation) + return f"{website_domain}{website_base_path}/oauth/logout?logoutChallenge={params.logout_challenge}" async def revoke_token( self, - input: Union[ + params: Union[ RevokeTokenUsingAuthorizationHeader, RevokeTokenUsingClientIDAndClientSecret, ], - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Optional[ErrorOAuth2Response]: - request_body = {"token": input.token} + request_body = {"token": params.token} - if isinstance(input, RevokeTokenUsingAuthorizationHeader): - request_body["authorizationHeader"] = input.authorization_header + if isinstance(params, RevokeTokenUsingAuthorizationHeader): + request_body["authorizationHeader"] = params.authorization_header else: - request_body["client_id"] = input.client_id - request_body["client_secret"] = input.client_secret + request_body["client_id"] = params.client_id + request_body["client_secret"] = params.client_secret res = await self.querier.send_post_request( NormalisedURLPath("/recipe/oauth/token/revoke"), @@ -804,7 +806,7 @@ async def revoke_token( async def revoke_tokens_by_client_id( self, client_id: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ): await self.querier.send_post_request( NormalisedURLPath("/recipe/oauth/session/revoke"), @@ -815,7 +817,7 @@ async def revoke_tokens_by_client_id( async def revoke_tokens_by_session_handle( self, session_handle: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ): await self.querier.send_post_request( NormalisedURLPath("/recipe/oauth/session/revoke"), @@ -826,8 +828,8 @@ async def revoke_tokens_by_session_handle( async def introspect_token( self, token: str, - scopes: Optional[List[str]] = None, - user_context: Dict[str, Any] = {}, + scopes: Optional[List[str]], + user_context: Dict[str, Any], ) -> Union[ActiveTokenResponse, InactiveTokenResponse]: # Determine if the token is an access token by checking if it doesn't start with "st_rt" is_access_token = not token.startswith("st_rt") @@ -869,8 +871,8 @@ async def end_session( self, params: Dict[str, str], should_try_refresh: bool, - session: Optional[SessionContainer] = None, - user_context: Dict[str, Any] = {}, + session: Optional[SessionContainer], + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response]: # NOTE: The API response has 3 possible cases: # @@ -946,7 +948,7 @@ async def end_session( async def accept_logout_request( self, challenge: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ) -> Union[RedirectResponse, ErrorOAuth2Response]: resp = await self.querier.send_put_request( NormalisedURLPath("/recipe/oauth/auth/requests/logout/accept"), @@ -977,7 +979,7 @@ async def accept_logout_request( async def reject_logout_request( self, challenge: str, - user_context: Dict[str, Any] = {}, + user_context: Dict[str, Any], ): resp = await self.querier.send_put_request( NormalisedURLPath("/recipe/oauth/auth/requests/logout/reject"),