Skip to content

Commit

Permalink
Remove cbv from tool_shed.api2.users
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Dec 16, 2023
1 parent e0a868e commit 57c00a1
Showing 1 changed file with 46 additions and 39 deletions.
85 changes: 46 additions & 39 deletions lib/tool_shed/webapp/api2/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,13 @@ class UiChangePasswordRequest(BaseModel):
INVALID_LOGIN_OR_PASSWORD = "Invalid login or password"


@router.cbv
class FastAPIUsers:
app: ToolShedApp = depends(ToolShedApp)
user_manager: UserManager = depends(UserManager)
api_key_manager: ApiKeyManager = depends(ApiKeyManager)

@router.get(
"/api/users",
description="index users",
operation_id="users__index",
)
def index(self, trans: SessionRequestContext = DependsOnTrans) -> List[User]:
def index(trans: SessionRequestContext = DependsOnTrans) -> List[User]:
deleted = False
return index(trans.app, deleted)

Expand All @@ -119,15 +114,15 @@ def index(self, trans: SessionRequestContext = DependsOnTrans) -> List[User]:
operation_id="users__create",
require_admin=True,
)
def create(self, trans: SessionRequestContext = DependsOnTrans, request: CreateUserRequest = Body(...)) -> User:
def create(trans: SessionRequestContext = DependsOnTrans, request: CreateUserRequest = Body(...)) -> User:
return api_create_user(trans, request)

@router.get(
"/api/users/current",
description="show current user",
operation_id="users__current",
)
def current(self, trans: SessionRequestContext = DependsOnTrans) -> User:
def current(trans: SessionRequestContext = DependsOnTrans) -> User:
user = trans.user
if not user:
raise ObjectNotFound()
Expand All @@ -139,7 +134,7 @@ def current(self, trans: SessionRequestContext = DependsOnTrans) -> User:
description="show a user",
operation_id="users__show",
)
def show(self, trans: SessionRequestContext = DependsOnTrans, encoded_user_id: str = UserIdPathParam) -> User:
def show(trans: SessionRequestContext = DependsOnTrans, encoded_user_id: str = UserIdPathParam) -> User:
user = suc.get_user(trans.app, encoded_user_id)
if user is None:
raise ObjectNotFound()
Expand All @@ -152,21 +147,25 @@ def show(self, trans: SessionRequestContext = DependsOnTrans, encoded_user_id: s
operation_id="users__get_or_create_api_key",
)
def get_or_create_api_key(
self, trans: SessionRequestContext = DependsOnTrans, encoded_user_id: str = UserIdPathParam
trans: SessionRequestContext = DependsOnTrans,
encoded_user_id: str = UserIdPathParam,
api_key_manager: ApiKeyManager = depends(ApiKeyManager),
) -> str:
user = self._get_user(trans, encoded_user_id)
return self.api_key_manager.get_or_create_api_key(user)
user = _get_user(trans, encoded_user_id)
return api_key_manager.get_or_create_api_key(user)

@router.post(
"/api/users/{encoded_user_id}/api_key",
summary="Creates a new API key for the user",
operation_id="users__create_api_key",
)
def create_api_key(
self, trans: SessionRequestContext = DependsOnTrans, encoded_user_id: str = UserIdPathParam
trans: SessionRequestContext = DependsOnTrans,
encoded_user_id: str = UserIdPathParam,
api_key_manager: ApiKeyManager = depends(ApiKeyManager),
) -> str:
user = self._get_user(trans, encoded_user_id)
return self.api_key_manager.create_api_key(user).key
user = _get_user(trans, encoded_user_id)
return api_key_manager.create_api_key(user).key

@router.delete(
"/api/users/{encoded_user_id}/api_key",
Expand All @@ -175,32 +174,24 @@ def create_api_key(
operation_id="users__delete_api_key",
)
def delete_api_key(
self,
trans: SessionRequestContext = DependsOnTrans,
encoded_user_id: str = UserIdPathParam,
api_key_manager: ApiKeyManager = depends(ApiKeyManager),
):
user = self._get_user(trans, encoded_user_id)
self.api_key_manager.delete_api_key(user)
user = _get_user(trans, encoded_user_id)
api_key_manager.delete_api_key(user)
return Response(status_code=status.HTTP_204_NO_CONTENT)

def _get_user(self, trans: SessionRequestContext, encoded_user_id: str):
if encoded_user_id == "current":
user = trans.user
else:
user = suc.get_user(trans.app, encoded_user_id)
if user is None:
raise ObjectNotFound()
if not (trans.user_is_admin or trans.user == user):
raise InsufficientPermissionsException()
return user

@router.post(
"/api_internal/register",
description="register a user",
operation_id="users__internal_register",
)
def register(
self, trans: SessionRequestContext = DependsOnTrans, request: UiRegisterRequest = Body(...)
trans: SessionRequestContext = DependsOnTrans,
request: UiRegisterRequest = Body(...),
app: ToolShedApp = depends(ToolShedApp),
user_manager: UserManager = depends(UserManager),
) -> UiRegisterResponse:
honeypot_field = request.bear_field
if honeypot_field != "":
Expand All @@ -210,17 +201,17 @@ def register(
username = request.username
if username == "repos":
raise RequestParameterInvalidException("Cannot create a user with the username 'repos'")
self.user_manager.create(email=request.email, username=username, password=request.password)
if self.app.config.user_activation_on:
is_activation_sent = self.user_manager.send_activation_email(trans, request.email, username)
user_manager.create(email=request.email, username=username, password=request.password)
if app.config.user_activation_on:
is_activation_sent = user_manager.send_activation_email(trans, request.email, username)
if is_activation_sent:
return UiRegisterResponse(email=request.email, activation_sent=True)
else:
return UiRegisterResponse(
email=request.email,
activation_sent=False,
activation_error=True,
contact_email=self.app.config.error_email_to,
contact_email=app.config.error_email_to,
)
else:
return UiRegisterResponse(email=request.email)
Expand All @@ -232,15 +223,17 @@ def register(
status_code=status.HTTP_204_NO_CONTENT,
)
def change_password(
self, trans: SessionRequestContext = DependsOnTrans, request: UiChangePasswordRequest = Body(...)
trans: SessionRequestContext = DependsOnTrans,
request: UiChangePasswordRequest = Body(...),
user_manager: UserManager = depends(UserManager),
):
password = request.password
current = request.current
if trans.user is None:
raise InsufficientPermissionsException("Must be logged into use this functionality")
user_id = trans.user.id
token = None
user, message = self.user_manager.change_password(
user, message = user_manager.change_password(
trans, password=password, current=current, token=token, confirm=password, id=user_id
)
if not user:
Expand All @@ -253,13 +246,15 @@ def change_password(
operation_id="users__internal_login",
)
def internal_login(
self, trans: SessionRequestContext = DependsOnTrans, request: UiLoginRequest = Body(...)
trans: SessionRequestContext = DependsOnTrans,
request: UiLoginRequest = Body(...),
user_manager: UserManager = depends(UserManager),
) -> UiLoginResponse:
log.info(f"top of internal_login {trans.session_csrf_token}")
ensure_csrf_token(trans, request)
login = request.login
password = request.password
user = self.user_manager.get_user_by_identity(login)
user = user_manager.get_user_by_identity(login)
if user is None:
raise InsufficientPermissionsException(INVALID_LOGIN_OR_PASSWORD)
elif user.deleted:
Expand All @@ -281,13 +276,25 @@ def internal_login(
operation_id="users__internal_logout",
)
def internal_logout(
self, trans: SessionRequestContext = DependsOnTrans, request: UiLogoutRequest = Body(...)
trans: SessionRequestContext = DependsOnTrans, request: UiLogoutRequest = Body(...)
) -> UiLogoutResponse:
ensure_csrf_token(trans, request)
handle_user_logout(trans, logout_all=request.logout_all)
return UiLogoutResponse()


def _get_user(trans: SessionRequestContext, encoded_user_id: str):
if encoded_user_id == "current":
user = trans.user
else:
user = suc.get_user(trans.app, encoded_user_id)
if user is None:
raise ObjectNotFound()
if not (trans.user_is_admin or trans.user == user):
raise InsufficientPermissionsException()
return user


def ensure_csrf_token(trans: SessionRequestContext, request: HasCsrfToken):
session_csrf_token = request.session_csrf_token
if not trans.session_csrf_token:
Expand Down

0 comments on commit 57c00a1

Please sign in to comment.