-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathusers.py
375 lines (286 loc) · 12.4 KB
/
users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
"""
Holds handling functions for user operations.
Uses a floating instance of the database client that is instanciated in
the `config.db` module like all other `util` modules.
"""
from typing import Dict, Any, List
import pymongo.errors as pymongo_exceptions
import pymongo.results as pymongo_results
from models import exceptions
from models.auth import Token
import models.users as user_models
import models.events as event_models
import models.commons as common_models
from config.db import get_database, get_database_client_name
import util.events as event_utils
# instantiate the main collection to use for this util file for convenience
def users_collection():
return get_database()[get_database_client_name()]["users"]
async def register_user(
user_reg_form: user_models.UserRegistrationForm
) -> user_models.UserAuthenticationResponse:
"""
Register a user registration form to the database and return it's user ID.
"""
user_object = await get_valid_user_from_reg_form(user_reg_form)
# insert id into column
try:
users_collection().insert_one(user_object.dict())
except pymongo_exceptions.DuplicateKeyError as dupe_error:
detail = "Invalid user insertion: duplicate email"
raise exceptions.DuplicateDataException(detail=detail) from dupe_error
# return user_id if success
user_id = user_object.get_id()
return user_id
async def get_valid_user_from_reg_form(
user_reg_form: user_models.UserRegistrationForm) -> user_models.User:
"""
Casts an incoming user registration form into a `User` object,
effectively validating the user, and setting the password.
"""
user_type = user_reg_form.get_user_type()
user_object = user_models.User(**user_reg_form.dict(), user_type=user_type)
pre_hash_user_password = user_reg_form.password
user_object.set_password(pre_hash_user_password)
return user_object
async def get_user_info_by_identifier(
identifier: user_models.UserIdentifier) -> user_models.User:
"""
Returns a User object by it's given identifier.
UserNotFoundException returns 404 if no user is found.
"""
query = identifier.get_database_query()
# query to database
user_document = users_collection().find_one(query)
if not user_document:
raise exceptions.UserNotFoundException
# cast the database response into a User object
return user_models.User(**user_document)
async def check_if_user_exists_by_id(user_id: common_models.UserId) -> None:
"""
Checks if the user exists solely by ID and raises an
exception if it does, else returns None silently.
"""
user_identifier = user_models.UserIdentifier(user_id=user_id)
await get_user_info_by_identifier(user_identifier)
async def delete_user(identifier: user_models.UserIdentifier) -> None:
"""
Deletes a user by it's identifier.
Raises an error if the user does not exist or if the credentials
don't match the identifier
"""
query = identifier.get_database_query()
response = users_collection().delete_one(query)
if response.deleted_count == 0:
detail = "User not found and could not be deleted"
raise exceptions.UserNotFoundException(detail=detail)
async def login_user(
login_form: user_models.UserLoginForm
) -> user_models.UserAuthenticationResponse:
"""
Validates user login attempt based off
identifier and password.
Will raise 404 UserNotFoundException if user does
not exist, or 422 InvalidPasswordException
if the user does exist but password is invalid.
"""
user = await get_user_info_by_identifier(login_form.identifier)
password_matches = await check_user_password_matches(login_form, user)
if not password_matches:
raise exceptions.InvalidPasswordException
auth_token = await get_auth_token_from_user_data(user)
login_response = user_models.UserAuthenticationResponse(jwt=auth_token)
return login_response
async def check_user_password_matches(login_form: user_models.UserLoginForm,
user: user_models.User) -> bool:
"""
Compares the password of the user loging form and the user object,
returning the boolean outcome.
"""
return user.check_password(login_form.password)
async def update_user(
user_update_form: user_models.UserUpdateForm
) -> user_models.UserUpdateResponse:
"""
Updates user entries in database if UserUpdateForm fields are valid.
"""
user = await get_user_info_by_identifier(user_update_form.identifier)
await update_user_data_database(user, user_update_form)
return user_models.UserUpdateResponse(user_id=user.get_id())
async def update_user_data_database(
user: user_models.User,
user_update_form: user_models.UserUpdateForm) -> None:
"""
Updates user data within database,
based off fields specified in UserUpdateForm
"""
if user_update_form.dict().get("password"):
await set_update_form_pass_to_hashed(user, user_update_form)
values_to_update = await get_dict_of_values_to_update(user_update_form)
update_dict = await format_update_dict(values_to_update)
identifier_dict = user_update_form.identifier.get_database_query()
users_collection().update_one(identifier_dict, update_dict)
async def set_update_form_pass_to_hashed(
user: user_models.User,
user_update_form: user_models.UserUpdateForm) -> None:
"""
Takes a given user and Update form and hashes
the update form password accordingly
fixme: This is ugly code.
"""
unhashed_pass = user_update_form.dict().get("password")
user.set_password(unhashed_pass)
user_update_form.password = user.dict().get("password")
async def format_update_dict(
values_to_update: Dict[str, Any]) -> Dict[str, Any]:
"""
Formats a dict of data so it can be passed into a PyMongo update function
"""
return {"$set": values_to_update}
async def get_dict_of_values_to_update(
update_form: user_models.UserUpdateForm) -> Dict[str, Any]:
"""
Given a `UserUpdateForm`, returns a dict with all of the values
to be updated with a `dict.update()` call to the original user data dict.
"""
form_dict_items = update_form.dict().items()
# could be simplified to be just `v` but this makes our intent crystal clear
valid_value = lambda v: v is not None
forbidden_keys_set = {"identifier"}
valid_key = lambda k: k not in forbidden_keys_set
values_to_update_dict = {
key: value
for key, value in form_dict_items
if valid_key(key) and valid_value(value)
}
return values_to_update_dict
async def get_auth_token_from_user_data(user: user_models.User) -> str:
"""
Given a User object, returns an encoded JWT string with the
user's identifier data (UserID) in it's payload.
"""
user_id = user.get_id()
encoded_jwt_str = await get_auth_token_from_user_id(user_id)
return encoded_jwt_str
async def get_auth_token_from_user_id(user_id: common_models.UserId) -> str:
"""
Returns an encoded token string with the given user_id in it's payload.
"""
payload_dict = {'user_id': user_id}
encoded_jwt_str = Token.get_enc_token_str_from_dict(payload_dict)
return encoded_jwt_str
async def get_events_from_user_identifier(
identifier: user_models.UserIdentifier) -> List[common_models.EventId]:
"""
Gets list of all v
"""
identifier_dict = identifier.get_database_query()
user_dict = users_collection().find_one(identifier_dict)
return user_dict["events_visible"]
async def user_add_event(
add_event_form: user_models.UserAddEventForm,
user_id: common_models.UserId) -> user_models.UserAddEventResponse:
"""
Adds an event to a validated User's events_visible field
"""
# at this point token is already validated
event_id = add_event_form.event_id
# this exists just to validate that event is in database
await event_utils.get_event_by_id(event_id)
await add_event_to_user_visible(user_id, event_id)
return user_models.UserAddEventResponse(event_id=event_id)
async def add_event_to_user_visible(user_id: user_models.UserId,
event_id: event_models.EventId):
user_identifier = user_models.UserIdentifier(user_id=user_id)
identifier_dict = user_identifier.get_database_query()
if event_id not in await get_events_from_user_identifier(user_identifier):
users_collection().update_one(identifier_dict,
{"$push": {
"events_visible": event_id
}})
else:
raise exceptions.DuplicateDataException(
"Event already in user's events_visible field")
async def archive_user_event(user_id: user_models.UserId,
event: event_models.Event) -> None:
"""
Given a UserId and an event, moves the event to the appropriate list
after checking it's validity.
"""
user_identifier = user_models.UserIdentifier(user_id=user_id)
user = await get_user_info_by_identifier(user_identifier)
event_should_be_archived = lambda x: x in {"expired", "cancelled"}
event_id = event.get_id()
event_status = event.status if isinstance(event.status,
str) else event.status.name
if event_should_be_archived(event_status):
await archive_event_id_from_user(user, event_id)
async def archive_event_id_from_user(user: user_models.User,
event_id: common_models.EventId) -> None:
"""
Updates the user's event list to archive the event
and make it not visible, also updating the new data
in the database.
"""
# update user
update_dict = {
"$pull": {
"events_visible": event_id
},
"$push": {
"events_archived": event_id
},
}
user_identifier = user_models.UserIdentifier(user_id=user.get_id())
identifier_query_dict = user_identifier.get_database_query()
users_collection().update_one(identifier_query_dict, update_dict)
async def add_id_to_created_events_list(
user_id: user_models.UserId, event_id: event_models.EventId) -> None:
"""
Given a UserId, adds the EventId to the user's list of `created_events`.
Checks that the user exists first, then appends the ID to the list.
"""
await check_if_user_exists_by_id(user_id)
await append_event_id_to_events_created_list(user_id, event_id)
async def append_event_id_to_events_created_list( # pylint: disable=invalid-name
user_id: user_models.UserId, event_id: event_models.EventId) -> None:
"""
Will go into the database and actually append the event_id given to
the user document (queried by user_id)
"""
user_identifier = user_models.UserIdentifier(user_id=user_id)
query = user_identifier.get_database_query()
update_dict = await get_dict_to_add_event_id_to_events_created_list(
event_id)
result = users_collection().update_one(query, update_dict)
await check_update_one_result_ok(result)
async def get_dict_to_add_event_id_to_events_created_list( # pylint: disable=invalid-name
event_id: event_models.EventId) -> Dict[str, Any]:
"""
Returns the dict to be used in the collection.update call for a user
that appends the incoming EventId to the `events_created` list.
"""
return {"$push": {"events_created": event_id}}
async def check_update_one_result_ok(
result: pymongo_results.UpdateResult) -> None:
"""
Given the update result for an `update_one` call,
checks that it was valid.
Returns None in valid case, raises database exception if not.
"""
try:
assert result.matched_count == 1
assert result.modified_count == 1
except AssertionError as update_error:
detail = "Update one call failed on appending event_id to created list"
raise exceptions.DatabaseError(detail=detail) from update_error
async def check_if_admin_by_id(user_id: user_models.UserId) -> bool:
"""
Given a user_id will return the boolean respopnse of checking
if the user is an admin or not.
Will raise 404 if user does not exist.
"""
user_identifier = user_models.UserIdentifier(user_id=user_id)
user = await get_user_info_by_identifier(user_identifier)
# pylint: disable=no-member
return user.user_type == user_models.UserTypeEnum.ADMIN.name