Skip to content

Commit

Permalink
Add support for creating PSAs for multiple users (#2483)
Browse files Browse the repository at this point in the history
This commit adds support for creating permset assignments for more than
one user. 


Co-authored-by: David Reed <[email protected]>
  • Loading branch information
jstvz and davidmreed authored Apr 15, 2021
1 parent e95a332 commit 2a52ee1
Show file tree
Hide file tree
Showing 2 changed files with 418 additions and 68 deletions.
120 changes: 94 additions & 26 deletions cumulusci/tasks/salesforce/users/permsets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from cumulusci.tasks.salesforce import BaseSalesforceApiTask
import json

from cumulusci.cli.ui import CliTable
from cumulusci.core.exceptions import CumulusCIException
from cumulusci.core.utils import process_list_arg
from cumulusci.tasks.salesforce import BaseSalesforceApiTask


class AssignPermissionSets(BaseSalesforceApiTask):
Expand All @@ -14,7 +17,7 @@ class AssignPermissionSets(BaseSalesforceApiTask):
"required": True,
},
"user_alias": {
"description": "Alias of target user (if not the current running user, the default)."
"description": "Target user aliases, separated by commas. Defaults to the current running user."
},
}

Expand All @@ -29,65 +32,130 @@ def _init_options(self, kwargs):
super()._init_options(kwargs)

self.options["api_names"] = process_list_arg(self.options["api_names"])
self.options["user_alias"] = process_list_arg(
self.options.get("user_alias") or []
)

def _run_task(self):
# Determine existing assignments
if "user_alias" not in self.options:
users = self._query_existing_assignments()
users_assigned_perms = {
user["Id"]: self._get_assigned_perms(user) for user in users
}
perms_by_id = self._get_perm_ids()

records_to_insert = []
for user_id, assigned_perms in users_assigned_perms.items():
records_to_insert.extend(
self._get_assignments(user_id, assigned_perms, perms_by_id)
)

self._insert_assignments(records_to_insert)

def _query_existing_assignments(self):
if not self.options["user_alias"]:
query = (
f"SELECT Id,(SELECT {self.assignment_lookup} FROM {self.assignment_child_relationship}) "
"FROM User "
f"WHERE Username = '{self.org_config.username}'"
)
else:
aliases = "','".join(self.options["user_alias"])
query = (
f"SELECT Id,(SELECT {self.assignment_lookup} FROM {self.assignment_child_relationship}) "
"FROM User "
f"""WHERE Alias = '{self.options["user_alias"]}'"""
f"""WHERE Alias IN ('{aliases}')"""
)

result = self.sf.query(query)
if result["totalSize"] != 1:
if result["totalSize"] == 0:
raise CumulusCIException(
"A single User was not found matching the specified alias."
"No Users were found matching the specified aliases."
)
user = result["records"][0]
return result["records"]

def _get_assigned_perms(self, user):
assigned_perms = {}
# PermissionSetLicenseAssignments actually returns None if there are no assignments instead of an empty list of records. Wow.
if user[self.assignment_child_relationship]:
assigned_perms = {
r[self.assignment_lookup]
for r in user[self.assignment_child_relationship]["records"]
}
return assigned_perms

# Find Ids for requested Perms
def _get_perm_ids(self):
api_names = "', '".join(self.options["api_names"])
perms = self.sf.query(
f"SELECT Id,{self.permission_name_field} FROM {self.permission_name} WHERE {self.permission_name_field} IN ('{api_names}')"
)
perms = {p[self.permission_name_field]: p["Id"] for p in perms["records"]}

# Raise for missing perms
for api_name in self.options["api_names"]:
if api_name not in perms:
raise CumulusCIException(
f"{self.permission_label} {api_name} was not found."
perms_by_ids = {
p["Id"]: p[self.permission_name_field] for p in perms["records"]
}

missing_perms = [
api_name
for api_name in self.options["api_names"]
if api_name not in perms_by_ids.values()
]
if missing_perms:
raise CumulusCIException(
f"The following {self.permission_label}s were not found: {', '.join(missing_perms)}."
)
return perms_by_ids

def _get_assignments(self, user_id, assigned_perms, perms_by_id):
assignments = []
for perm, perm_name in perms_by_id.items():
if perm not in assigned_perms:
self.logger.info(
f'Assigning {self.permission_label} "{perm_name}" to {user_id}.'
)

# Assign all not already assigned
for api_name in self.options["api_names"]:
if perms[api_name] not in assigned_perms:
self.logger.info(f'Assigning {self.permission_label} "{api_name}".')
assignment = {
"AssigneeId": user["Id"],
"attributes": {"type": self.assignment_name},
"AssigneeId": user_id,
self.assignment_lookup: perm,
}
assignment[self.assignment_lookup] = perms[api_name]
# Create the new assignment.
getattr(self.sf, self.assignment_name).create(assignment)
assignments.append(assignment)
else:
self.logger.warning(
f'{self.permission_label} "{api_name}" is already assigned.'
f'{self.permission_label} "{perm_name}" is already assigned to {user_id}.'
)
return assignments

def _insert_assignments(self, records_to_insert):
result_list = []
for i in range(0, len(records_to_insert), 200):
request_body = json.dumps(
{"allOrNone": False, "records": records_to_insert[i : i + 200]}
)
result = self.sf.restful(
"composite/sobjects", method="POST", data=request_body
)
result_list.extend(result)
self._process_composite_results(result_list)

def _process_composite_results(self, api_results):
results_table_data = [["Success", "ID", "Message"]]
for result in api_results:
result_row = [result["success"], result.get("id", "-")]
if not result["success"] and result["errors"]:
result_row.append(result["errors"][0]["message"])
else:
result_row.append("-")
results_table_data.append(result_row)

table = CliTable(
results_table_data,
bool_cols=["Success"],
title="Results",
wrap_cols=["Message"],
)
self.logger.info("\n" + str(table))

if not all([result["success"] for result in api_results]):
raise CumulusCIException(
f"Not all {self.assignment_child_relationship} were saved."
)


class AssignPermissionSetLicenses(AssignPermissionSets):
Expand Down
Loading

0 comments on commit 2a52ee1

Please sign in to comment.