Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring usermanager.py #1461

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 103 additions & 32 deletions mxcubeweb/core/components/user/usermanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ class BaseUserManager(ComponentBase):
def __init__(self, app, config):
super().__init__(app, config)

def get_observers(self):
def get_observers(self) -> list[User]:
"""
Return users that are in observer mode: logged in
(authenticated and active) but not in control of the beamline.
"""
return [
user
for user in User.query.all()
if ((not user.in_control) and user.is_authenticated and user.is_active)
]

def get_operator(self):
def get_operator(self) -> User:
"""Return user (operator) that is controlling the beamline."""
user = None

for _u in User.query.all():
Expand All @@ -36,10 +41,15 @@ def get_operator(self):

return user

def is_operator(self):
def is_operator(self) -> bool:
"""Return True if the current_user is an operator."""
return getattr(current_user, "in_control", False)

def active_logged_in_users(self, exclude_inhouse=False):
def active_logged_in_users(self, exclude_inhouse: bool = False) -> list[User]:
"""
Return a list of active logged in users. With or without inhouse,
based on the exclude_inhouse parameter.
"""
self.update_active_users()

if exclude_inhouse:
Expand All @@ -51,7 +61,8 @@ def active_logged_in_users(self, exclude_inhouse=False):

return users

def get_user(self, username):
def get_user(self, username: str) -> User | None:
"""Return user model instance based on username."""
user = None

for _u in User.query.all():
Expand All @@ -60,7 +71,8 @@ def get_user(self, username):

return user

def set_operator(self, username):
def set_operator(self, username: str) -> User | None:
"""Set the user with the given username to be an operator."""
user = None

for _u in User.query.all():
Expand All @@ -72,7 +84,12 @@ def set_operator(self, username):

return user

def update_active_users(self):
def update_active_users(self) -> None:
"""
Check if any user have been inactive for a period longer than the
session lifetime. If so, deactivate the user in datastore and emit
the relvant signals `userChanged` and `observersChanged` to the client.
"""
for _u in User.query.all():
if (
_u.active
Expand All @@ -90,7 +107,14 @@ def update_active_users(self):

self.app.server.emit("observersChanged", namespace="/hwr")

def update_operator(self, new_login=False):
def update_operator(self, new_login: bool = False) -> None:
"""
Set the operator based on the logged in users. If no user is currently
in control, the first logged in user is set. Additionally, proposal
is set based on the operator selected_proposal field.

:param bool new_login: True if updating operator was invoked with new user logging in
"""
active_in_control = False

for _u in User.query.all():
Expand Down Expand Up @@ -119,10 +143,11 @@ def update_operator(self, new_login=False):
if _u.is_authenticated and _u.in_control:
if HWR.beamline.lims.loginType.lower() != "user":
self.app.lims.select_proposal(self.app.lims.get_proposal(_u))
elif _u.selected_proposal is not None:
elif _u.selected_proposal:
self.app.lims.select_proposal(_u.selected_proposal)

def is_inhouse_user(self, user_id):
def is_inhouse_user(self, user_id: str) -> bool:
"""Retrun True if the user_id is in the in-house user list."""
user_id_list = [
"%s%s" % (code, number)
for (code, number) in HWR.beamline.session.in_house_users
Expand All @@ -131,10 +156,18 @@ def is_inhouse_user(self, user_id):
return user_id in user_id_list

# Abstract method to be implemented by concrete implementation
def _login(self, login_id, password):
def _login(self, login_id: str, password: str):
pass

def login(self, login_id: str, password: str):
def login(self, login_id: str, password: str) -> None:
"""
Create new session for the user if it does not exist. Activate user in
data store. If a sample is loaded in sample changer but not mounted,
mount it and update the smaple list. Try update the operator.

:param str login_id: username
:param str password: password
dominikatrojanowska marked this conversation as resolved.
Show resolved Hide resolved
"""
try:
login_res = self._login(login_id, password)
except Exception:
Expand Down Expand Up @@ -170,6 +203,12 @@ def _signout(self):
pass

def signout(self):
"""
Signing out the current user: if the user was an operator, the queue
and samples restored to init values, the session is cleared, the user
is not an operator anymore. Log out and deactivte the user, and emit
'observersChanged' signal.
"""
self._signout()
user = current_user

Expand All @@ -191,15 +230,19 @@ def signout(self):
msg = "User %s signed out" % user.username
logging.getLogger("MX3.HWR").info(msg)

# change current_user.active to False
self.app.server.user_datastore.deactivate_user(user)
flask_security.logout_user()

self.app.server.emit("observersChanged", namespace="/hwr")

def is_authenticated(self):
def is_authenticated(self) -> bool:
"""Return True whether the current user is authenticated."""
return current_user.is_authenticated()

def force_signout_user(self, username):
def force_signout_user(self, username: str) -> None:
"""Force signout of the annonymous or non operating user with the given username.
"""
user = self.get_user(username)

if not user.in_control or current_user.is_anonymous:
Expand All @@ -208,7 +251,11 @@ def force_signout_user(self, username):
self.app.server.user_datastore.commit()
self.app.server.emit("forceSignout", room=socketio_sid, namespace="/hwr")

def login_info(self):
def login_info(self) -> dict:
"""
Return a dictionary with the login information to be displayed in the
application, such as: synchrotron and beamline names, proposal list etc.
"""
if not current_user.is_anonymous:
login_info = convert_to_dict(json.loads(current_user.limsdata))

Expand Down Expand Up @@ -246,7 +293,8 @@ def login_info(self):

return res

def update_user(self, user):
def update_user(self, user: User) -> None:
"""Update user information in datastore."""
self.app.server.user_datastore.put(user)
self.app.server.user_datastore.commit()

Expand All @@ -265,15 +313,24 @@ def _get_configured_roles(self, user):

return list(roles)

def db_create_user(self, user: str, password: str, lims_data: dict):
def db_create_user(self, user: str, password: str, lims_data: dict) -> User:
"""
Create new user in datastore. If the user already exists,
update the user information.

:param str user: representation of (patial) username and nickname of new user
:param str password: password
:param dict lims_data: dictionary with the lims data to be updated
dominikatrojanowska marked this conversation as resolved.
Show resolved Hide resolved
:return: User model instance existing / added to datastore
"""
sid = flask.session["sid"]
user_datastore = self.app.server.user_datastore
username = f"{user}-{str(uuid.uuid4())}"
if HWR.beamline.lims.loginType.lower() == "user":
username = f"{user}"
else:
username = f"{user}-{str(uuid.uuid4())}"

# Make sure that the roles staff and incontrol always
# exists
# Make sure that the roles staff and incontrol always exists
if not user_datastore.find_role("staff"):
user_datastore.create_role(name="staff")
user_datastore.create_role(name="incontrol")
Expand Down Expand Up @@ -304,7 +361,16 @@ def db_create_user(self, user: str, password: str, lims_data: dict):

return user_datastore.find_user(username=username)

def db_set_in_control(self, user, control):
def db_set_in_control(self, user: User, control: bool) -> None:
"""
Update users (their in_control field) in the datastore. If the passed
user becomes an operator (control=True), the remaining users'
in_control fields are set to False. If passed user stops being
an operator, only its in_control field is set to False.

:param User user: User model instance
:param bool control: the user becomes an operator (Ture) or not (False)
"""
user_datastore = self.app.server.user_datastore

if control:
Expand All @@ -317,7 +383,7 @@ def db_set_in_control(self, user, control):
user_datastore.put(_u)
else:
_u = user_datastore.find_user(username=user.username)
_u.in_control = control
_u.in_control = False
user_datastore.put(_u)

self.app.server.user_datastore.commit()
Expand All @@ -327,7 +393,15 @@ class UserManager(BaseUserManager):
def __init__(self, app, config):
super().__init__(app, config)

def _login(self, login_id: str, password: str):
def _login(self, login_id: str, password: str) -> dict:
"""
Check loging conditions (anonymous, local/remote, existing session)
and return the login response information.

:param str login_id: username
:param str password: password
:return: login response information
"""
login_res = self.app.lims.lims_login(login_id, password, create_session=False)
inhouse = self.is_inhouse_user(login_id)

Expand Down Expand Up @@ -383,17 +457,14 @@ def _login(self, login_id: str, password: str):
raise Exception("Remote access disabled")

# Only allow remote logins with existing sessions
if self.app.lims.lims_valid_login(login_res) and is_local_host():
if self.app.lims.lims_valid_login(login_res):
if not self.app.lims.lims_existing_session(login_res):
login_res = self.app.lims.create_lims_session(login_res)

msg = "[LOGIN] Valid login from local host (%s)" % str(info)
logging.getLogger("MX3.HWR").info(msg)
elif self.app.lims.lims_valid_login(
login_res
) and self.app.lims.lims_existing_session(login_res):
msg = "[LOGIN] Valid remote login from %s with existing session (%s)"
msg += msg % (remote_addr(), str(info))
if is_local_host():
msg = "[LOGIN] Valid login from local host (%s)" % str(info)
else:
msg = "[LOGIN] Valid remote login from %s with existing session (%s)"
msg += msg % (remote_addr(), str(info))
logging.getLogger("MX3.HWR").info(msg)
else:
logging.getLogger("MX3.HWR").info("Invalid login %s" % info)
Expand Down
Loading