Skip to content

Commit

Permalink
webapps.galaxy.services.user.get_users_for_index: test + factor out
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Feb 28, 2024
1 parent cc63e75 commit 802a303
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 24 deletions.
71 changes: 47 additions & 24 deletions lib/galaxy/webapps/galaxy/services/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,31 +205,11 @@ def get_index(
f_name: Optional[str],
f_any: Optional[str],
) -> List[Union[UserModel, LimitedUserModel]]:
rval = []
stmt = select(User)

if f_email and (trans.user_is_admin or trans.app.config.expose_user_email):
stmt = stmt.filter(User.email.like(f"%{f_email}%"))

if f_name and (trans.user_is_admin or trans.app.config.expose_user_name):
stmt = stmt.filter(User.username.like(f"%{f_name}%"))

if f_any:
if trans.user_is_admin:
stmt = stmt.filter(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%")))
else:
if trans.app.config.expose_user_email and trans.app.config.expose_user_name:
stmt = stmt.filter(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%")))
elif trans.app.config.expose_user_email:
stmt = stmt.filter(User.email.like(f"%{f_any}%"))
elif trans.app.config.expose_user_name:
stmt = stmt.filter(User.username.like(f"%{f_any}%"))

# check for early return conditions
if deleted:
# only admins can see deleted users
if not trans.user_is_admin:
# only admins can see deleted users
return []
stmt = stmt.filter(User.deleted == true())
else:
# special case: user can see only their own user
# special case2: if the galaxy admin has specified that other user email/names are
Expand All @@ -241,8 +221,19 @@ def get_index(
):
item = trans.user.to_dict()
return [item]
stmt = stmt.filter(User.deleted == false())
for user in trans.sa_session.scalars(stmt).all():

users = get_users_for_index(
deleted,
trans.sa_session,
f_email,
f_name,
f_any,
trans.user_is_admin,
trans.app.config.expose_user_email,
trans.app.config.expose_user_name,
)
rval = []
for user in users:
item = user.to_dict()
# If NOT configured to expose_email, do not expose email UNLESS the user is self, or
# the user is an admin
Expand All @@ -261,3 +252,35 @@ def get_index(
# TODO: move into api_values
rval.append(item)
return rval


def get_users_for_index(
session,
deleted: bool,
f_email: Optional[str] = None,
f_name: Optional[str] = None,
f_any: Optional[str] = None,
is_admin: bool = False,
expose_user_email: bool = False,
expose_user_name: bool = False,
):
stmt = select(User)
if f_email and (is_admin or expose_user_email):
stmt = stmt.where(User.email.like(f"%{f_email}%"))
if f_name and (is_admin or expose_user_name):
stmt = stmt.where(User.username.like(f"%{f_name}%"))
if f_any:
if is_admin:
stmt = stmt.where(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%")))
else:
if expose_user_email and expose_user_name:
stmt = stmt.where(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%")))
elif expose_user_email:
stmt = stmt.where(User.email.like(f"%{f_any}%"))
elif expose_user_name:
stmt = stmt.where(User.username.like(f"%{f_any}%"))
if deleted:
stmt = stmt.where(User.deleted == true())
else:
stmt = stmt.where(User.deleted == false())
return session.scalars(stmt).all()
1 change: 1 addition & 0 deletions test/unit/data/data_access/test_role.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
)
from . import verify_items


def test_get_npns_roles(session, make_role):
r1 = make_role(deleted=True)
r2 = make_role(type=m.Role.types.PRIVATE)
Expand Down
31 changes: 31 additions & 0 deletions test/unit/data/data_access/test_user.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from galaxy.managers import users as lib
from galaxy.webapps.galaxy.services.users import get_users_for_index
from . import verify_items


Expand Down Expand Up @@ -27,6 +28,36 @@ def test_get_users_by_ids(session, make_random_users):
verify_items(users2, 3, (u1, u2, u3))


def test_get_users_for_index(session, make_user):
u1 = make_user(email="a", username="b")
u2 = make_user(email="c", username="d")
u3 = make_user(email="e", username="f")
u4 = make_user(email="g", username="h")
u5 = make_user(email="i", username="z")
u6 = make_user(email="z", username="i")

users = get_users_for_index(session, False, f_email="a", expose_user_email=True)
verify_items(users, 1, [u1])
users = get_users_for_index(session, False, f_email="c", is_admin=True)
verify_items(users, 1, [u2])
users = get_users_for_index(session, False, f_name="f", expose_user_name=True)
verify_items(users, 1, [u3])
users = get_users_for_index(session, False, f_name="h", is_admin=True)
verify_items(users, 1, [u4])
users = get_users_for_index(session, False, f_any="i", is_admin=True)
verify_items(users, 2, [u5, u6])
users = get_users_for_index(session, False, f_any="i", expose_user_email=True, expose_user_name=True)
verify_items(users, 2, [u5, u6])
users = get_users_for_index(session, False, f_any="i", expose_user_email=True)
verify_items(users, 1, [u5])
users = get_users_for_index(session, False, f_any="i", expose_user_name=True)
verify_items(users, 1, [u6])

u1.deleted = True
users = get_users_for_index(session, True)
verify_items(users, 1, [u1])


# TODO: factor out
# def test_get_users_by_role(session, make_user, make_role, make_user_role_association):
# user1, user2, user3 = make_user(), make_user(), make_user()
Expand Down

0 comments on commit 802a303

Please sign in to comment.