Skip to content

Commit

Permalink
Merge branch 'main' into wip/080E-limc
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum authored Aug 12, 2024
2 parents e6a0a88 + 1d6b105 commit b9c0881
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 128 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
repos:

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.2
rev: v0.5.7
hooks:
- id: ruff
# args: [
Expand Down
39 changes: 33 additions & 6 deletions dsp_permissions_scripts/ap/ap_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,62 @@
from dsp_permissions_scripts.ap.ap_get import create_admin_route_object_from_ap
from dsp_permissions_scripts.ap.ap_get import create_ap_from_admin_route_object
from dsp_permissions_scripts.ap.ap_model import Ap
from dsp_permissions_scripts.ap.ap_model import ApValue
from dsp_permissions_scripts.models.errors import ApiError
from dsp_permissions_scripts.models.group import Group
from dsp_permissions_scripts.utils.dsp_client import DspClient
from dsp_permissions_scripts.utils.get_logger import get_logger
from dsp_permissions_scripts.utils.helpers import KNORA_ADMIN_ONTO_NAMESPACE
from dsp_permissions_scripts.utils.project import get_project_iri_and_onto_iris_by_shortcode

logger = get_logger(__name__)


def _update_ap_on_server(ap: Ap, dsp_client: DspClient) -> Ap:
def _update_ap_scope_on_server(ap: Ap, dsp_client: DspClient) -> Ap:
iri = quote_plus(ap.iri, safe="")
payload = {"hasPermissions": create_admin_route_object_from_ap(ap)["hasPermissions"]}
try:
response = dsp_client.put(f"/admin/permissions/{iri}/hasPermissions", data=payload)
except ApiError as err:
err.message = f"Could not update Administrative Permission {ap.iri}"
err.message = f"Could not update scope of Administrative Permission {ap.iri}"
raise err from None
ap_updated: dict[str, Any] = response["administrative_permission"]
ap_object_updated = create_ap_from_admin_route_object(ap_updated)
return ap_object_updated


def apply_updated_aps_on_server(aps: list[Ap], host: str, dsp_client: DspClient) -> None:
def apply_updated_scopes_of_aps_on_server(aps: list[Ap], host: str, dsp_client: DspClient) -> None:
if not aps:
logger.warning(f"There are no APs to update on {host}")
return
logger.info(f"****** Updating {len(aps)} Administrative Permissions on {host}... ******")
logger.info(f"****** Updating scopes of {len(aps)} Administrative Permissions on {host}... ******")
for ap in aps:
try:
_ = _update_ap_on_server(ap, dsp_client)
_ = _update_ap_scope_on_server(ap, dsp_client)
logger.info(f"Successfully updated AP {ap.iri}")
except ApiError as err:
logger.error(err)
logger.info(f"Finished updating {len(aps)} Administrative Permissions on {host}")
logger.info(f"Finished updating scopes of {len(aps)} Administrative Permissions on {host}")


def create_new_ap_on_server(
forGroup: Group,
shortcode: str,
hasPermissions: list[ApValue],
dsp_client: DspClient,
) -> Ap | None:
proj_iri, _ = get_project_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
payload = {
"forGroup": forGroup.val.replace("knora-admin:", KNORA_ADMIN_ONTO_NAMESPACE),
"forProject": proj_iri,
"hasPermissions": [
{"additionalInformation": None, "name": ap_val.value, "permissionCode": None} for ap_val in hasPermissions
],
}
try:
response = dsp_client.post("/admin/permissions/ap", data=payload)
logger.info(f"Successfully created new AP for group {forGroup.val}")
return create_ap_from_admin_route_object(response["administrative_permission"])
except ApiError:
logger.error(f"Could not create new AP for group {forGroup}")
return None
2 changes: 1 addition & 1 deletion dsp_permissions_scripts/doap/doap_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def create_doap_from_admin_route_response(permission: dict[str, Any]) -> Doap:
relative_group_iri = None
doap = Doap(
target=DoapTarget(
project=permission["forProject"],
project_iri=permission["forProject"],
group=Group(val=relative_group_iri) if relative_group_iri else None,
resource_class=permission.get("forResourceClass"),
property=permission.get("forProperty"),
Expand Down
23 changes: 22 additions & 1 deletion dsp_permissions_scripts/doap/doap_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,28 @@ class Doap(BaseModel):


class DoapTarget(BaseModel):
project: str
project_iri: str
group: Group | None = None
resource_class: str | None = None
property: str | None = None

@model_validator(mode="after")
def assert_correct_combination(self) -> Self:
# asserts that DOAP is only defined for Group or ResourceClass or Property
# or a combination of ResourceClass and Property
match (self.group, self.resource_class, self.property):
case (None, None, None):
raise ValueError("At least one of group, resource_class or property must be set")
case (_, None, None) | (None, _, _):
pass
case _:
raise ValueError("Invalid combination of group, resource_class and property")
return self


class NewDoapTarget(BaseModel):
"""Represents the target of a DOAP that is yet to be created."""

group: Group | None = None
resource_class: str | None = None
property: str | None = None
Expand Down
32 changes: 29 additions & 3 deletions dsp_permissions_scripts/doap/doap_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from dsp_permissions_scripts.doap.doap_get import create_doap_from_admin_route_response
from dsp_permissions_scripts.doap.doap_model import Doap
from dsp_permissions_scripts.doap.doap_model import NewDoapTarget
from dsp_permissions_scripts.models.errors import ApiError
from dsp_permissions_scripts.models.scope import PermissionScope
from dsp_permissions_scripts.utils.dsp_client import DspClient
from dsp_permissions_scripts.utils.get_logger import get_logger
from dsp_permissions_scripts.utils.helpers import KNORA_ADMIN_ONTO_NAMESPACE
from dsp_permissions_scripts.utils.project import get_project_iri_and_onto_iris_by_shortcode
from dsp_permissions_scripts.utils.scope_serialization import create_admin_route_object_from_scope

logger = get_logger(__name__)
Expand All @@ -23,15 +26,38 @@ def _update_doap_scope_on_server(doap_iri: str, scope: PermissionScope, dsp_clie
return new_doap


def apply_updated_doaps_on_server(doaps: list[Doap], host: str, dsp_client: DspClient) -> None:
def apply_updated_scopes_of_doaps_on_server(doaps: list[Doap], host: str, dsp_client: DspClient) -> None:
if not doaps:
logger.warning(f"There are no DOAPs to update on {host}")
return
logger.info(f"****** Updating {len(doaps)} DOAPs on {host}... ******")
logger.info(f"****** Updating scopes of {len(doaps)} DOAPs on {host}... ******")
for d in doaps:
try:
_ = _update_doap_scope_on_server(d.doap_iri, d.scope, dsp_client)
logger.info(f"Successfully updated DOAP {d.doap_iri}")
except ApiError as err:
logger.error(err)
logger.info(f"Finished updating {len(doaps)} DOAPs on {host}")
logger.info(f"Finished updating scopes of {len(doaps)} DOAPs on {host}")


def create_new_doap_on_server(
target: NewDoapTarget,
shortcode: str,
scope: PermissionScope,
dsp_client: DspClient,
) -> Doap | None:
proj_iri, _ = get_project_iri_and_onto_iris_by_shortcode(shortcode, dsp_client)
payload = {
"forGroup": target.group.val.replace("knora-admin:", KNORA_ADMIN_ONTO_NAMESPACE) if target.group else None,
"forProject": proj_iri,
"forProperty": target.property,
"forResourceClass": target.resource_class,
"hasPermissions": create_admin_route_object_from_scope(scope),
}
try:
response = dsp_client.post("/admin/permissions/doap", data=payload)
logger.info(f"Successfully created new DOAP for target {target}")
return create_doap_from_admin_route_response(response["default_object_access_permission"])
except ApiError:
logger.error(f"Could not create new DOAP for target {target}")
return None
40 changes: 28 additions & 12 deletions dsp_permissions_scripts/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
from dsp_permissions_scripts.ap.ap_model import Ap
from dsp_permissions_scripts.ap.ap_model import ApValue
from dsp_permissions_scripts.ap.ap_serialize import serialize_aps_of_project
from dsp_permissions_scripts.ap.ap_set import apply_updated_aps_on_server
from dsp_permissions_scripts.ap.ap_set import apply_updated_scopes_of_aps_on_server
from dsp_permissions_scripts.ap.ap_set import create_new_ap_on_server
from dsp_permissions_scripts.doap.doap_get import get_doaps_of_project
from dsp_permissions_scripts.doap.doap_model import Doap
from dsp_permissions_scripts.doap.doap_model import NewDoapTarget
from dsp_permissions_scripts.doap.doap_serialize import serialize_doaps_of_project
from dsp_permissions_scripts.doap.doap_set import apply_updated_doaps_on_server
from dsp_permissions_scripts.doap.doap_set import apply_updated_scopes_of_doaps_on_server
from dsp_permissions_scripts.doap.doap_set import create_new_doap_on_server
from dsp_permissions_scripts.models import group
from dsp_permissions_scripts.models.host import Hosts
from dsp_permissions_scripts.models.scope import PUBLIC
from dsp_permissions_scripts.models.scope import PermissionScope
from dsp_permissions_scripts.oap.oap_get import get_all_oaps_of_project
from dsp_permissions_scripts.oap.oap_model import Oap
from dsp_permissions_scripts.oap.oap_model import OapRetrieveConfig
Expand All @@ -32,9 +36,9 @@ def modify_aps(aps: list[Ap]) -> list[Ap]:
"""Adapt this sample to your needs."""
modified_aps = []
for ap in copy.deepcopy(aps):
if ap.forGroup == group.UNKNOWN_USER:
if ApValue.ProjectAdminGroupAllPermission not in ap.hasPermissions:
ap.add_permission(ApValue.ProjectAdminGroupAllPermission)
if ap.forGroup == group.PROJECT_ADMIN:
if ApValue.ProjectAdminRightsAllPermission not in ap.hasPermissions:
ap.add_permission(ApValue.ProjectAdminRightsAllPermission)
modified_aps.append(ap)
return modified_aps

Expand All @@ -43,7 +47,7 @@ def modify_doaps(doaps: list[Doap]) -> list[Doap]:
"""Adapt this sample to your needs."""
modified_doaps = []
for doap in copy.deepcopy(doaps):
if doap.target.group == group.UNKNOWN_USER:
if doap.target.group == group.PROJECT_ADMIN:
doap.scope = PUBLIC
modified_doaps.append(doap)
return modified_doaps
Expand Down Expand Up @@ -75,14 +79,20 @@ def update_aps(host: str, shortcode: str, dsp_client: DspClient) -> None:
remaining_aps = delete_ap_of_group_on_server(
host=host,
existing_aps=project_aps,
forGroup=group.UNKNOWN_USER,
forGroup=group.PROJECT_MEMBER,
dsp_client=dsp_client,
)
_ = create_new_ap_on_server(
forGroup=group.CREATOR,
shortcode=shortcode,
hasPermissions=[ApValue.ProjectResourceCreateAllPermission],
dsp_client=dsp_client,
)
modified_aps = modify_aps(remaining_aps)
if not modified_aps:
logger.info("There are no APs to update.")
return
apply_updated_aps_on_server(modified_aps, host, dsp_client)
apply_updated_scopes_of_aps_on_server(modified_aps, host, dsp_client)
project_aps_updated = get_aps_of_project(shortcode, dsp_client)
serialize_aps_of_project(
project_aps=project_aps_updated,
Expand All @@ -101,11 +111,17 @@ def update_doaps(host: str, shortcode: str, dsp_client: DspClient) -> None:
mode="original",
host=host,
)
_ = create_new_doap_on_server(
target=NewDoapTarget(group=group.CREATOR),
shortcode=shortcode,
scope=PermissionScope.create(CR=[group.SYSTEM_ADMIN]),
dsp_client=dsp_client,
)
project_doaps_modified = modify_doaps(doaps=project_doaps)
if not project_doaps_modified:
logger.info("There are no DOAPs to update.")
return
apply_updated_doaps_on_server(project_doaps_modified, host, dsp_client)
apply_updated_scopes_of_doaps_on_server(project_doaps_modified, host, dsp_client)
project_doaps_updated = get_doaps_of_project(shortcode, dsp_client)
serialize_doaps_of_project(
project_doaps=project_doaps_updated,
Expand Down Expand Up @@ -142,14 +158,14 @@ def main() -> None:
and one to update the Object Access Permissions of a project.
All must first be adapted to your needs.
"""
host = Hosts.get_host("test")
shortcode = "F18E"
host = Hosts.get_host("localhost")
shortcode = "4123"
log_start_of_script(host, shortcode)
dsp_client = login(host)

oap_config = OapRetrieveConfig(
retrieve_resources="specified_res_classes",
specified_res_classes=["my-data-model:ImageThing"],
specified_res_classes=["testonto:ImageThing"],
retrieve_values="specified_props",
specified_props=["knora-api:hasStillImageFileValue"],
)
Expand Down
Loading

0 comments on commit b9c0881

Please sign in to comment.