Skip to content

Commit

Permalink
Merge pull request from GHSA-gprj-3p75-f996
Browse files Browse the repository at this point in the history
globus: apply identity_provider restriction in `check_blocked_users`
  • Loading branch information
yuvipanda committed Jun 11, 2024
2 parents 79db03c + 04d11f8 commit d1aea05
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions oauthenticator/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,30 +297,39 @@ async def check_allowed(self, username, auth_model):
if auth_model is None:
return True

if await super().check_allowed(username, auth_model):
return True

if self.allowed_globus_groups:
user_groups = set(auth_model["auth_state"]["globus_groups"])
if user_groups & self.allowed_globus_groups:
return True
self.log.warning(f"{username} not in an allowed Globus Group")

# users should be explicitly allowed via config, otherwise they aren't
return False

async def check_blocked_users(self, username, authentication):
"""Check if the user should be blocked
Called _before_ checking if the user should be allowed
"""
# any restrictions on access go here - allow config only _grants_ access,
# restrictions belong in the `block` stage
# before considering allowing a username by being recognized in a list
# of usernames or similar, we must ensure that the authenticated user is
# from an allowed identity provider domain.
if self.identity_provider:
# It's possible for identity provider domains to be namespaced
# https://docs.globus.org/api/auth/specification/#identity_provider_namespaces
user_info = auth_model["auth_state"][self.user_auth_state_key]
user_info = authentication["auth_state"][self.user_auth_state_key]
user_domain = user_info.get(self.username_claim).split('@', 1)[-1]
if user_domain != self.identity_provider:
message = f"This site is restricted to {self.identity_provider} accounts. Link your account at app.globus.org/account."
self.log.warning(message)
raise web.HTTPError(403, message)

if await super().check_allowed(username, auth_model):
return True

if self.allowed_globus_groups:
user_groups = set(auth_model["auth_state"]["globus_groups"])
if user_groups & self.allowed_globus_groups:
return True
self.log.warning(f"{username} not in an allowed Globus Group")

# users should be explicitly allowed via config, otherwise they aren't
return False
return super().check_blocked_users(username, authentication)

async def update_auth_model(self, auth_model):
"""
Expand Down

0 comments on commit d1aea05

Please sign in to comment.