Skip to content

Commit

Permalink
Update user.import with saml_id code
Browse files Browse the repository at this point in the history
  • Loading branch information
reiterl committed Jul 19, 2023
1 parent f1fddd3 commit e22aa85
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 7 deletions.
43 changes: 36 additions & 7 deletions openslides_backend/action/actions/user/import_.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def update_instance(self, instance: Dict[str, Any]) -> Dict[str, Any]:
data = self.result.get("rows", [])
for entry in data:
# Revert username-info and default-password-info
for field in ("username", "default_password"):
for field in ("username", "default_password", "saml_id"):
if field in entry["data"]:
if field == "username" and "id" in entry["data"][field]:
entry["data"]["id"] = entry["data"][field]["id"]
Expand All @@ -48,7 +48,7 @@ def update_instance(self, instance: Dict[str, Any]) -> Dict[str, Any]:
search_data_list = [
{
field: entry["data"].get(field, "")
for field in ("username", "first_name", "last_name", "email")
for field in ("username", "first_name", "last_name", "email", "saml_id")
}
for entry in data
]
Expand All @@ -60,38 +60,64 @@ def update_instance(self, instance: Dict[str, Any]) -> Dict[str, Any]:
self.error = False
for payload_index, entry in enumerate(data):
if entry["state"] == ImportState.NEW:
if not entry["data"].get("username"):
if not entry["data"].get("username") and not entry["data"].get(
"saml_id"
):
self.error = True
entry["state"] = ImportState.ERROR
entry["messages"].append(
"Error: Want to create user, but missing username in import data."
)
elif self.check_username_for_duplicate(
elif entry["data"].get(
"username"
) and self.check_username_for_duplicate(
entry["data"]["username"], payload_index
):
self.error = True
entry["state"] = ImportState.ERROR
entry["messages"].append(
"Error: want to create a new user, but username already exists."
)
elif entry["data"].get("saml_id") and self.check_saml_id_for_duplicate(
entry["data"]["saml_id"], payload_index
):
self.error = True
entry["state"] = ImportState.ERROR
entry["messages"].append(
"Error: want to create a new user, but saml_id already exists."
)
else:
create_action_payload.append(entry["data"])
elif entry["state"] == ImportState.DONE:
search_data = self.get_search_data(payload_index)
if not entry["data"].get("username"):
if not entry["data"].get("username") and not entry["data"].get(
"saml_id"
):
self.error = True
entry["state"] = ImportState.ERROR
entry["messages"].append(
"Error: Want to update user, but missing username in import data."
)
elif not self.check_username_for_duplicate(
elif entry["data"].get(
"username"
) and not self.check_username_for_duplicate(
entry["data"]["username"], payload_index
):
self.error = True
entry["state"] = ImportState.ERROR
entry["messages"].append(
"Error: want to update, but missing user in db."
)
elif entry["data"].get(
"saml_id"
) and not self.check_saml_id_for_duplicate(
entry["data"]["saml_id"], payload_index
):
self.error = True
entry["state"] = ImportState.ERROR
entry["messages"].append(
"Error: want to update, but missing user in db."
)
elif search_data is None:
self.error = True
entry["state"] = ImportState.ERROR
Expand All @@ -105,7 +131,10 @@ def update_instance(self, instance: Dict[str, Any]) -> Dict[str, Any]:
"Error: want to update, but found search data doesn't match."
)
else:
del entry["data"]["username"]
for field in ("username", "saml_id"):
if field in entry["data"]:
del entry["data"][field]

update_action_payload.append(entry["data"])
else:
self.error = True
Expand Down
85 changes: 85 additions & 0 deletions tests/system/action/user/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,91 @@ def get_action_worker_data(
}
}

def test_import_with_saml_id(self) -> None:
self.set_models(
self.get_action_worker_data(
6,
ImportState.NEW,
{"saml_id": {"value": "testsaml", "info": ImportState.NEW}},
)
)
response = self.request("user.import", {"id": 6, "import": True})
self.assert_status_code(response, 200)
self.assert_model_exists(
"user/2",
{
"username": "testsaml",
"saml_id": "testsaml",
},
)

def test_import_saml_id_error_new_and_saml_id_exists(self) -> None:
"""Set saml_id 'testsaml' to user 1, add the import user 1 will be
found and the import should result in an error."""
self.set_models(
{
"user/1": {"saml_id": "testsaml"},
**self.get_action_worker_data(
6,
ImportState.NEW,
{"saml_id": {"value": "testsaml", "info": ImportState.NEW}},
),
}
)
response = self.request("user.import", {"id": 6, "import": True})
self.assert_status_code(response, 200)
entry = response.json["results"][0][0]["rows"][0]
assert entry["state"] == ImportState.ERROR
assert entry["messages"] == [
"Error: want to create a new user, but saml_id already exists."
]

def test_import_done_with_saml_id(self) -> None:
self.set_models(
{
"user/2": {"username": "test", "saml_id": "testsaml"},
**self.get_action_worker_data(
6,
ImportState.DONE,
{
"saml_id": {"value": "testsaml", "info": ImportState.DONE},
"id": 2,
"first_name": "Hugo",
},
),
}
)
response = self.request("user.import", {"id": 6, "import": True})
self.assert_status_code(response, 200)
self.assert_model_exists(
"user/2",
{
"username": "test",
"saml_id": "testsaml",
"first_name": "Hugo",
},
)

def test_import_done_error_missing_user(self) -> None:
self.set_models(
{
**self.get_action_worker_data(
6,
ImportState.DONE,
{
"saml_id": {"value": "testsaml", "info": ImportState.NEW},
"first_name": "Hugo",
"id": 2,
},
),
}
)
response = self.request("user.import", {"id": 6, "import": True})
self.assert_status_code(response, 200)
entry = response.json["results"][0][0]["rows"][0]
assert entry["state"] == ImportState.ERROR
assert entry["messages"] == ["Error: want to update, but missing user in db."]

def test_import_error_at_state_new(self) -> None:
self.set_models(
{
Expand Down

0 comments on commit e22aa85

Please sign in to comment.