From 211daee5cb77f3950e895e2d74714081210c5d4b Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Mon, 18 Nov 2024 17:00:47 +0100 Subject: [PATCH 01/22] Identify classes and atrributes namespaces Signed-off-by: HUG0-D --- cimgen/cimgen.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/cimgen/cimgen.py b/cimgen/cimgen.py index dfea746a..7de641a7 100644 --- a/cimgen/cimgen.py +++ b/cimgen/cimgen.py @@ -20,6 +20,8 @@ def asJson(self): jsonObject = {} if self.about() is not None: jsonObject["about"] = self.about() + if self.namespace() is not None: + jsonObject["namespace"] = self.namespace() if self.comment() is not None: jsonObject["comment"] = self.comment() if self.dataType() is not None: @@ -51,6 +53,12 @@ def about(self): else: return None + def namespace(self): + if "$rdf:about" in self.jsonDefinition: + return self.jsonDefinition["$rdf:about"][: -len(self.about())] + else: + return None + # Capitalized True/False is valid in python but not in json. # Do not use this function in combination with json.load() def is_used(self) -> bool: @@ -211,6 +219,7 @@ def __init__(self, rdfsEntry): self.super = rdfsEntry.subClassOf() self.subclasses = [] self.stereotype = rdfsEntry.stereotype() + self.namespace = rdfsEntry.namespace() def attributes(self): return self.attribute_list @@ -429,6 +438,7 @@ def _write_python_files(elem_dict, lang_pack, output_path, version): "class_location": lang_pack.get_class_location(class_name, elem_dict, version), "class_name": class_name, "class_origin": elem_dict[class_name].origins(), + "class_namespace": _get_namespace(elem_dict[class_name].namespace), "enum_instances": elem_dict[class_name].enum_instances(), "is_an_enum_class": elem_dict[class_name].is_an_enum_class(), "is_a_primitive_class": elem_dict[class_name].is_a_primitive_class(), @@ -467,6 +477,7 @@ def _write_python_files(elem_dict, lang_pack, output_path, version): attribute["is_primitive_attribute"] = _get_bool_string(attribute_type == "primitive") attribute["is_datatype_attribute"] = _get_bool_string(attribute_type == "datatype") attribute["attribute_class"] = attribute_class + attribute["attribute_namespace"] = _get_namespace(attribute["namespace"]) class_details["attributes"].sort(key=lambda d: d["label"]) _write_files(class_details, output_path, version) @@ -749,6 +760,14 @@ def _get_attribute_type(attribute: dict, class_infos: CIMComponentDefinition) -> return attribute_type +def _get_namespace(parsed_namespace: str) -> str: + if parsed_namespace == "#": + namespace = cim_namespace + else: + namespace = parsed_namespace + return namespace + + def _get_bool_string(bool_value: bool) -> str: """Convert boolean value into a string which is usable in both Python and Json. From 35d013eec2aca2f0aaa9cf75cba70090338bf347 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Mon, 18 Nov 2024 17:02:30 +0100 Subject: [PATCH 02/22] add attribute_namespace to json_extra to handle custom attributes Signed-off-by: HUG0-D --- cimgen/languages/modernpython/templates/class_template.mustache | 1 + 1 file changed, 1 insertion(+) diff --git a/cimgen/languages/modernpython/templates/class_template.mustache b/cimgen/languages/modernpython/templates/class_template.mustache index 59561703..18dd1141 100644 --- a/cimgen/languages/modernpython/templates/class_template.mustache +++ b/cimgen/languages/modernpython/templates/class_template.mustache @@ -35,6 +35,7 @@ class {{class_name}}({{sub_class_of}}): {{/attr_origin}} ], "is_used": {{#is_used}}True{{/is_used}}{{^is_used}}False{{/is_used}}, + "attribute_namespace": "{{attribute_namespace}}", # NOSONAR "is_class_attribute": {{#is_class_attribute}}True{{/is_class_attribute}}{{^is_class_attribute}}False{{/is_class_attribute}}, "is_datatype_attribute": {{#is_datatype_attribute}}True{{/is_datatype_attribute}}{{^is_datatype_attribute}}False{{/is_datatype_attribute}}, "is_enum_attribute": {{#is_enum_attribute}}True{{/is_enum_attribute}}{{^is_enum_attribute}}False{{/is_enum_attribute}}, From d68cad4a4eed728e4cd22dce54020c111acc1bd2 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Mon, 18 Nov 2024 17:09:48 +0100 Subject: [PATCH 03/22] Extended function cgmes_attributes_in_profile to get json_extras of the attributes Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/base.py | 39 ++++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index f6f75703..def496a1 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -133,6 +133,7 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, for f in fields(parent): shortname = f.name qualname = f"{parent.apparent_name()}.{shortname}" # type: ignore + infos = dict() if f not in self.cgmes_attribute_names_in_profile(profile) or shortname in seen_attrs: # Wrong profile or already found from a parent. continue @@ -144,19 +145,31 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, # The attribute does not have extra metadata. It might be a custom atttribute # without it, or a base type (int...). # Use the class namespace. - namespace = self.namespace - elif (attr_ns := extra.get("namespace", None)) is None: - # The attribute has some extras, but not namespace. - # Use the class namespace. - namespace = self.namespace - else: - # The attribute has an explicit namesapce - namespace = attr_ns - - qual_attrs[qualname] = CgmesAttribute( - value=getattr(self, shortname), - namespace=namespace, - ) + infos["namespace"] = self.namespace + elif extra.get("is_used"): + if (extra.get("attribute_namespace", None)) is None: + # The attribute has some extras, but not namespace. + # Use the class namespace. + infos["namespace"] = self.namespace + + else: + # The attribute has an explicit namesapce + infos["namespace"] = extra.get("attribute_namespace", self.namespace) + # adding the extras, used for xml generation + extra_info = { + "attr_name": qualname, + "is_class_attribute": extra.get("is_class_attribute"), + "is_enum_attribute": extra.get("is_enum_attribute"), + "is_list_attribute": extra.get("is_list_attribute"), + "is_primitive_attribute": extra.get("is_primitive_attribute"), + "is_datatype_attribute": extra.get("is_datatype_attribute"), + "attribute_class": extra.get("attribute_class"), + } + infos.update(extra_info) + + infos["value"] = getattr(self, shortname) + + qual_attrs[qualname] = CgmesAttribute(infos) seen_attrs.add(shortname) return qual_attrs From aa73c07c7d63fca9e63d36726fab0370b3f313b1 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Mon, 18 Nov 2024 17:18:56 +0100 Subject: [PATCH 04/22] Added function to export the python object to an xml fragment Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/base.py | 77 ++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index def496a1..0000e45f 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -1,8 +1,9 @@ # Drop in dataclass replacement, allowing easier json dump and validation in the future. import importlib +from lxml import etree from dataclasses import Field, fields from functools import cached_property -from typing import Any, TypeAlias, TypedDict +from typing import Any, TypeAlias, TypedDict, Optional from pydantic.dataclasses import dataclass @@ -174,6 +175,80 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, return qual_attrs + def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Optional[etree.Element]: + """Creates an etree element of self with all non-empty attributes of the profile_to_export + that are not already defined in the recommanded profile + This can then be used to generate the xml file of the profile_to_export + Args: + profile_to_export (Profile): Profile for which we want to obtain the xml tree (eg. Profile.EQ) + id (Optional[str], optional): "_mRID", optional: some objects don't have mRID attribute. Defaults to None. + Returns: + Optional[etree.Element]: etree describing self for the profile_to_export, None if nothing to export + """ + profile_attributes = self.cgmes_attributes_in_profile(profile_to_export) + is_recommanded_profile = self.recommended_profile.value == profile_to_export.value + + if not is_recommanded_profile: + # remove attributes that are also present in "recommended_profile" + attributes_main = self.cgmes_attributes_in_profile(self.recommended_profile) + for key in attributes_main.keys(): + if key in profile_attributes: + del profile_attributes[key] + profile_attributes = self._remove_empty_attributes(profile_attributes) + + if "mRID" in self.to_dict(): + obj_id = "_" + self.mRID + else: + obj_id = id + + # if no attribute to export or no mRID, return None + if profile_attributes == {} or obj_id is None: + root = None + else: + nsmap = NAMESPACES + # Create root element + root = etree.Element("{" + self.namespace + "}" + self.resource_name, nsmap=nsmap) + + # Add the ID ass attribute to the root + rdf_namespace = f"""{{{nsmap["rdf"]}}}""" + if is_recommanded_profile: + root.set(rdf_namespace + "ID", obj_id) + else: + root.set(rdf_namespace + "about", "#" + obj_id) + + for field_name, attribute in profile_attributes.items(): + # add all attributes relevant to the profile as SubElements + attr_namespace = attribute["namespace"] + element_name = f"{{{attr_namespace}}}{field_name}" + + if attribute["is_class_attribute"]: + # class_attributes are exported as rdf: resource #_mRID_of_target + element = etree.SubElement(root, element_name) + element.set(rdf_namespace + "resource", "#" + attribute["value"]) + elif attribute["is_enum_attribute"]: + element = etree.SubElement(root, element_name) + element.set(rdf_namespace + "resource", nsmap["cim"] + attribute["value"]) + else: + element = etree.SubElement(root, element_name) + element.text = str(attribute["value"]) + return root + + @staticmethod + def _remove_empty_attributes(attributes: dict) -> dict: + for key, attribute in list(attributes.items()): + # Remove empty attributes + if attribute["value"] is None or attribute["value"] == "": + del attributes[key] + else: + # Make bool lower str for XML + if ( + "is_datatype_attribute" in attribute + and attribute["attribute_class"] + and attribute["attribute_class"].name == "Boolean" + ): + attribute["value"] = str(attribute["value"]).lower() + return attributes + def __str__(self) -> str: """Returns the string representation of this resource.""" return "\n".join([f"{k}={v}" for k, v in sorted(self.to_dict().items())]) From 8bc3eaf8e39044b390b3d512fef0349450f3ea89 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Mon, 18 Nov 2024 17:45:36 +0100 Subject: [PATCH 05/22] Added function in base to create a class instance from an etree element. Useful to create a profile reader Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/base.py | 79 +++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index 0000e45f..99552bb1 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -253,6 +253,85 @@ def __str__(self) -> str: """Returns the string representation of this resource.""" return "\n".join([f"{k}={v}" for k, v in sorted(self.to_dict().items())]) + def _parse_xml_fragment(self, xml_fragment: str) -> dict: + """parses an xml fragment into a dict defining the class attributes + + Args: + xml_fragment (str): xml string defining an instance of the current class + + Returns: + attribute_dict (dict): a dictionnary of attributes to create/update the class instance + """ + attribute_dict = {} + xml_tree = etree.fromstring(xml_fragment) + + # raise an error if the xml does not describe the expected class + if not xml_tree.tag.endswith(self.resource_name): + raise (KeyError(f"The fragment does not correspond to the class {self.resource_name}")) + + # parsing the mRID + for key, value in xml_tree.attrib.items(): + if key.endswith("ID") or key.endswith("about"): + if value.startswith("#"): + value = value[1:] + if value.startswith("_"): + value = value[1:] + if hasattr(self, "mRID") and value is not None: + attribute_dict["mRID"] = value + + # parsing attributes defined in class + class_attributes = self.cgmes_attributes_in_profile(None) + for key, class_attribute in class_attributes.items(): + xml_attribute = xml_tree.findall(".//{*}" + key) + if len(xml_attribute) != 1: + continue + xml_attribute = xml_attribute[0] + attr = key.rsplit(".")[-1] + attr_value = None + + # class attributes are pointing to another class/instance defined in .attrib + if class_attribute["is_class_attribute"]: + if len(xml_attribute.keys()) == 1: + attr_value = xml_attribute.values()[0] + if attr_value.startswith("#"): + attr_value = attr_value[1:] + + # enum attributes are defined in .attrib and has a prefix ending in "#" + elif class_attribute["is_enum_attribute"]: + if len(xml_attribute.keys()) == 1: + attr_value = xml_attribute.values()[0] + if "#" in attr_value: + attr_value = attr_value.rsplit("#")[-1] + + elif class_attribute["is_list_attribute"]: + # other attributes types are defined in .text + attr_value = xml_attribute + attr_value = self.key.append(attr_value) + else: + attr_value = xml_attribute.text + # primitive classes are described in "cim_data_type" allowing to retrieve the data type + if class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: + if class_attribute["attribute_class"].type == bool: + attr_value = {"true": True, "false": False}.get(attr_value, None) + else: + attr_value = class_attribute["attribute_class"].type(attr_value) + if hasattr(self, attr) and attr_value is not None: + attribute_dict[attr] = attr_value + + return attribute_dict + + @classmethod + def from_xml(cls, xml_fragment: str): + """ + Returns an instance of the class from an xml fragment defining the attributes written in the form: + ... + example: creating an instance by parsing a fragment from the EQ profile + """ + attribute_dict = cls()._parse_xml_fragment(xml_fragment) + + # Instantiate the class with the dictionary + return cls(**attribute_dict) + @staticmethod def get_extra_prop(field: Field, prop: str) -> Any: # The field is defined as a pydantic field, not a dataclass field, From ef3dd60c1374c2ea695109d21349916239d4f9ad Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Mon, 18 Nov 2024 17:46:45 +0100 Subject: [PATCH 06/22] update_from_xml allows to update a class instance by parsing additionnal profiles Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/base.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index 99552bb1..58782953 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -320,6 +320,17 @@ def _parse_xml_fragment(self, xml_fragment: str) -> dict: return attribute_dict + def update_from_xml(self, xml_fragment: str): + """ + Updates the instance by parsing an xml fragment defining the attributes of this instance + example: updating the instance by parsing the corresponding fragment from the SSH profile + """ + attribute_dict = self._parse_xml_fragment(xml_fragment) + + if attribute_dict["mRID"] == self.mRID: + for key, value in attribute_dict.items(): + setattr(self, key, value) + @classmethod def from_xml(cls, xml_fragment: str): """ From 657b73800d18683dc0a9faa58abf8ceb6eca12f6 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Tue, 19 Nov 2024 11:56:57 +0100 Subject: [PATCH 07/22] Added reader to parse profiles and create python object using the base.py functions Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/reader.py | 143 ++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 cimgen/languages/modernpython/utils/reader.py diff --git a/cimgen/languages/modernpython/utils/reader.py b/cimgen/languages/modernpython/utils/reader.py new file mode 100644 index 00000000..45115ecf --- /dev/null +++ b/cimgen/languages/modernpython/utils/reader.py @@ -0,0 +1,143 @@ +from lxml import etree +import importlib +import logging +from .profile import Profile +from pydantic import BaseModel, Field +from typing import Dict, Optional, Literal + + +logger = logging.getLogger(__name__) + + +class Reader(BaseModel): + cgmes_version_path: str + custom_namespaces: Optional[Dict[str, str]] = None + custom_folder: Optional[Dict[str, str]] = None + start_dict: Optional[Dict] = None + logger_grouped: Dict[str, Dict[str, int]] = Field(default_factory=lambda: {"errors": {}, "info": {}}) + import_result: Dict = Field(default_factory=lambda: {"meta_info": {}, "topology": {}}) + + def cim_import(self, xml_files): + + self.import_result["meta_info"] = dict(namespaces=self._get_namespaces(xml_files[0]), urls={}) + namespace_rdf = self._get_rdf_namespace() + + bases = ["{" + self.import_result["meta_info"]["namespaces"]["cim"] + "}"] + if self.custom_namespaces: + for custom_namespace in self.custom_namespaces.values(): + bases.append("{" + custom_namespace + "}") + bases = tuple(bases) + + for xml_file in xml_files: + self._instantiate_classes(xml_file=xml_file, bases=bases, namespace_rdf=namespace_rdf) + return self.import_result + + def _instantiate_classes(self, xml_file: str, bases: tuple, namespace_rdf: str): + + context = etree.iterparse(xml_file, ("start", "end")) + _, root = next(context) + level = 0 + + for event, elem in context: + if event == "end": + level -= 1 + if event == "start": + level += 1 + + prefix = next((namespace for namespace in bases if elem.tag.startswith(namespace)), None) + if event == "start" and prefix is not None and level == 1: + tag, uuid = self._extract_tag_uuid(elem, prefix, namespace_rdf) + if uuid is not None: + self._process_element(tag, uuid, prefix, elem) + # Check which package is read + elif event == "end": + self._check_metadata(elem) + + def _process_element(self, tag, uuid, prefix, elem): + topology = self.import_result["topology"] + elem_str = etree.tostring(elem, encoding="utf8") + try: + # Import the module for the CGMES object. + module_name = self._get_path_to_module(prefix) + "." + tag + module = importlib.import_module(module_name) + + klass = getattr(module, tag) + if uuid in topology: + obj = topology[uuid] + obj.update_from_xml(elem_str) + info_msg = "CIM object {} updated".format(module_name.split(".")[-1]) + else: + topology[uuid] = klass().from_xml(elem_str) + info_msg = "CIM object {} created".format(module_name.split(".")[-1]) + self._log_message("info", info_msg) + except ModuleNotFoundError: + error_msg = "Module {} not implemented".format(tag) + self._log_message("errors", error_msg) + except Exception as e: + error_msg = "Could not create/update {}, {}".format(uuid, e) + self._log_message("errors", error_msg) + + def _check_metadata(self, elem): + if "Model.profile" in elem.tag: + for package_key in [e.value for e in Profile]: + if package_key in elem.text: + break + # the author of all imported files should be the same, avoid multiple entries + elif "author" in self.import_result["meta_info"].keys(): + pass + # extract author + elif "Model.createdBy" in elem.tag: + self.import_result["meta_info"]["author"] = elem.text + elif "Model.modelingAuthoritySet" in elem.tag: + self.import_result["meta_info"]["author"] = elem.text + + # Returns a map of prefix to namespace for the given XML file. + @staticmethod + def _get_namespaces(source) -> str: + namespaces = {} + events = ("end", "start-ns", "end-ns") + for event, elem in etree.iterparse(source, events): + if event == "start-ns": + prefix, ns = elem + namespaces[prefix] = ns + elif event == "end": + break + + # Reset stream + if hasattr(source, "seek"): + source.seek(0) + + return namespaces + + def _extract_tag_uuid(self, elem, prefix: str, namespace_rdf: str) -> tuple: + tag = elem.tag[len(prefix) :] + uuid = elem.get("{%s}ID" % namespace_rdf) + if uuid is None: + uuid = elem.get("{%s}about" % namespace_rdf) + if uuid is not None: + uuid = uuid[1:] + if uuid is not None: + if not uuid.startswith("_"): + uuid = "_" + uuid + return tag, uuid + + # Returns the RDF Namespace from the namespaces dictionary + def _get_rdf_namespace(self) -> str: + try: + namespace = self.import_result["meta_info"]["namespaces"]["rdf"] + except KeyError: + namespace = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" + logger.warning("No rdf namespace found. Using %s" % namespace) + return namespace + + def _get_path_to_module(self, prefix: str) -> str: + namespace = prefix[1:-1] + if self.custom_folder and namespace in self.custom_folder: + path_to_module = self.custom_folder[namespace] + else: + path_to_module = self.cgmes_version_path + return path_to_module + + def _log_message(self, log_type: Literal["errors", "info"], message: str): + self.logger_grouped[log_type].setdefault(message, 0) + self.logger_grouped[log_type][message] += 1 From 502e3279275295df3a21b479d29b8372911767a6 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Tue, 19 Nov 2024 11:58:36 +0100 Subject: [PATCH 08/22] Added writer to export profiles using the .to_xml base function Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/writer.py | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 cimgen/languages/modernpython/utils/writer.py diff --git a/cimgen/languages/modernpython/utils/writer.py b/cimgen/languages/modernpython/utils/writer.py new file mode 100644 index 00000000..147cc9d2 --- /dev/null +++ b/cimgen/languages/modernpython/utils/writer.py @@ -0,0 +1,87 @@ +from lxml import etree +from .base import Base +from .constants import NAMESPACES +from .profile import BaseProfile, Profile +from typing import Optional + + +class Writer: + """Class for writing CIM RDF/XML files.""" + + def __init__(self, objects: dict[str, Base]): + """Constructor. + + :param objects: Mapping of rdfid to CIM object. + """ + self.objects = objects + + def write( + self, outputfile: str, model_id: str, class_profile_map: dict[str, BaseProfile], custom_namespaces: dict + ) -> dict[BaseProfile, str]: + """Write CIM RDF/XML files. + + This function writes CIM objects into one or more RDF/XML files separated by profiles. + + Each CIM object will be written to its corresponding profile file depending on class_profile_map. + But some objects to more than one file if some attribute profiles are not the same as the class profile. + + :param outputfile: Stem of the output file, resulting files: _.xml. + :param model_id: Stem of the model IDs, resulting IDs: _. + :param class_profile_map: Mapping of CIM type to profile. + :return: Mapping of profile to outputfile. + """ + profile_list: list[BaseProfile] = list(Profile) + profile_list += {p for p in class_profile_map.values() if p not in profile_list} + profile_file_map: dict[BaseProfile, str] = {} + for profile in profile_list: + profile_name = profile.long_name + full_file_name = outputfile + "_" + profile.long_name + ".xml" + output = self._generate(profile, model_id + "_" + profile_name, custom_namespaces) + if output: + output.write(full_file_name, pretty_print=True, xml_declaration=True, encoding="UTF-8") + profile_file_map[profile] = full_file_name + return profile_file_map + + def _generate(self, profile: BaseProfile, model_id: str, custom_namespaces) -> Optional[etree.ElementTree]: + """Write CIM objects as RDF/XML data to a string. + + This function creates RDF/XML tree corresponding to one profile. + + :param profile: Only data for this profile should be written. + :param model_id: Stem of the model IDs, resulting IDs: _. + :return: etree of the profile + """ + FullModel = { + "id": model_id, + "Model": {"modelingAuthoritySet": "www.sogno.energy"}, + } + for uri in profile.uris: + FullModel["Model"].update({"profile": uri}) + + nsmap = NAMESPACES + nsmap.update(custom_namespaces) + + rdf_namespace = f"""{{{nsmap["rdf"]}}}""" + md_namespace = f"""{{{nsmap["md"]}}}""" + + root = etree.Element(rdf_namespace + "RDF", nsmap=nsmap) + + # FullModel header + model = etree.Element(md_namespace + "FullModel", nsmap=nsmap) + model.set(rdf_namespace + "about", "#" + FullModel["id"]) + for key, value in FullModel["Model"].items(): + element = etree.SubElement(model, md_namespace + "Model." + key) + element.text = value + root.append(model) + + count = 0 + for id, obj in self.objects.items(): + obj_etree = obj.to_xml(profile_to_export=profile, id=id) + if obj_etree is not None: + root.append(obj_etree) + count += 1 + if count > 0: + output = etree.ElementTree(root) + else: + output = None + return output From f668f2ab6223aca7d73f7cb1980a673b2c6f037e Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Tue, 19 Nov 2024 14:28:34 +0100 Subject: [PATCH 09/22] Improved readability of Reader Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/reader.py | 108 +++++++++++++----- 1 file changed, 77 insertions(+), 31 deletions(-) diff --git a/cimgen/languages/modernpython/utils/reader.py b/cimgen/languages/modernpython/utils/reader.py index 45115ecf..332f31fd 100644 --- a/cimgen/languages/modernpython/utils/reader.py +++ b/cimgen/languages/modernpython/utils/reader.py @@ -10,15 +10,34 @@ class Reader(BaseModel): + """Parses profiles to create the corresponding python objects + + Args: + cgmes_version_path (str): Path to the cgmes resources folder containing the class definition + custom_namespaces (Optional[Dict[str, str]]): {"namespace_prefix": "namespace_uri"} + custom_folder (Optional[Dict[str, str]]): {"namespace_uri": "path_to_custom_resources_folder"} + """ + cgmes_version_path: str custom_namespaces: Optional[Dict[str, str]] = None custom_folder: Optional[Dict[str, str]] = None - start_dict: Optional[Dict] = None logger_grouped: Dict[str, Dict[str, int]] = Field(default_factory=lambda: {"errors": {}, "info": {}}) import_result: Dict = Field(default_factory=lambda: {"meta_info": {}, "topology": {}}) - def cim_import(self, xml_files): + def parse_profiles(self, xml_files: list[str], start_dict: Optional[Dict] = None): + """Parses all profiles contained in xml_files and returns a list containing + all the objects defined in the profiles "_mRID": Object\n + Errors encounterd in the parsing can be recovered in Reader.logger_grouped + Args: + xml_files (list): list with the path to all the profiles to parse + start_dict (Optional[Dict]): To parse profiles on top of an existing list dict(meta_info, topology) + + Returns: + list: ["topology": dict of all the objects defined in the profiles {"_mRID": Object}, "meta_info"] + """ + if start_dict is not None: + self.import_result = start_dict self.import_result["meta_info"] = dict(namespaces=self._get_namespaces(xml_files[0]), urls={}) namespace_rdf = self._get_rdf_namespace() @@ -33,7 +52,13 @@ def cim_import(self, xml_files): return self.import_result def _instantiate_classes(self, xml_file: str, bases: tuple, namespace_rdf: str): + """creates/updates the python objects with the information of xml_file + Args: + xml_file (str): Path to the profile + bases (tuple): contains the possible namespaces uris defining the classes, can be custom + namespace_rdf (str): rdf namespace uri + """ context = etree.iterparse(xml_file, ("start", "end")) _, root = next(context) level = 0 @@ -44,34 +69,67 @@ def _instantiate_classes(self, xml_file: str, bases: tuple, namespace_rdf: str): if event == "start": level += 1 - prefix = next((namespace for namespace in bases if elem.tag.startswith(namespace)), None) - if event == "start" and prefix is not None and level == 1: - tag, uuid = self._extract_tag_uuid(elem, prefix, namespace_rdf) + class_namespace = next((namespace for namespace in bases if elem.tag.startswith(namespace)), None) + if event == "start" and class_namespace is not None and level == 1: + class_name, uuid = self._extract_classname_uuid(elem, class_namespace, namespace_rdf) if uuid is not None: - self._process_element(tag, uuid, prefix, elem) + self._process_element(class_name, uuid, class_namespace, elem) # Check which package is read elif event == "end": self._check_metadata(elem) - def _process_element(self, tag, uuid, prefix, elem): + @staticmethod + def _extract_classname_uuid(elem, class_namespace: str, namespace_rdf: str) -> tuple: + """Extracts class name and instance uuid ("_mRID") + + Args: + elem (etree.Element): description of the instance for the given profile + class_namespace (str): namespace uri defining the class + namespace_rdf (str): rdf namespace uri + + Returns: + tuple: (class_name: example "ACLineSgement", instance_uuid: "_mRID") + """ + class_name = elem.tag[len(class_namespace) :] + uuid = elem.get("{%s}ID" % namespace_rdf) + if uuid is None: + uuid = elem.get("{%s}about" % namespace_rdf) + if uuid is not None: + uuid = uuid[1:] + if uuid is not None: + if not uuid.startswith("_"): + uuid = "_" + uuid + return class_name, uuid + + def _process_element(self, class_name: str, uuid: str, class_namespace: str, elem): + """Creates or updates (if an object with the same uuid exists) + an instance of the class based on the fragment of the profile + + Args: + class_name (str): Name of the class of the instance to create/update (example: ACLineSegment) + uuid (str): _mRID + class_namespace (str): namespace defining the class + elem (etree.Element): description of the instance for the given profile + """ topology = self.import_result["topology"] elem_str = etree.tostring(elem, encoding="utf8") try: # Import the module for the CGMES object. - module_name = self._get_path_to_module(prefix) + "." + tag + module_name = self._get_path_to_module(class_namespace) + "." + class_name module = importlib.import_module(module_name) - klass = getattr(module, tag) - if uuid in topology: + klass = getattr(module, class_name) + if uuid not in topology: + topology[uuid] = klass().from_xml(elem_str) + info_msg = "CIM object {} created".format(module_name.split(".")[-1]) + else: obj = topology[uuid] obj.update_from_xml(elem_str) info_msg = "CIM object {} updated".format(module_name.split(".")[-1]) - else: - topology[uuid] = klass().from_xml(elem_str) - info_msg = "CIM object {} created".format(module_name.split(".")[-1]) self._log_message("info", info_msg) + except ModuleNotFoundError: - error_msg = "Module {} not implemented".format(tag) + error_msg = "Module {} not implemented".format(class_name) self._log_message("errors", error_msg) except Exception as e: error_msg = "Could not create/update {}, {}".format(uuid, e) @@ -91,15 +149,15 @@ def _check_metadata(self, elem): elif "Model.modelingAuthoritySet" in elem.tag: self.import_result["meta_info"]["author"] = elem.text - # Returns a map of prefix to namespace for the given XML file. + # Returns a map of class_namespace to namespace for the given XML file. @staticmethod def _get_namespaces(source) -> str: namespaces = {} events = ("end", "start-ns", "end-ns") for event, elem in etree.iterparse(source, events): if event == "start-ns": - prefix, ns = elem - namespaces[prefix] = ns + class_namespace, ns = elem + namespaces[class_namespace] = ns elif event == "end": break @@ -109,18 +167,6 @@ def _get_namespaces(source) -> str: return namespaces - def _extract_tag_uuid(self, elem, prefix: str, namespace_rdf: str) -> tuple: - tag = elem.tag[len(prefix) :] - uuid = elem.get("{%s}ID" % namespace_rdf) - if uuid is None: - uuid = elem.get("{%s}about" % namespace_rdf) - if uuid is not None: - uuid = uuid[1:] - if uuid is not None: - if not uuid.startswith("_"): - uuid = "_" + uuid - return tag, uuid - # Returns the RDF Namespace from the namespaces dictionary def _get_rdf_namespace(self) -> str: try: @@ -130,8 +176,8 @@ def _get_rdf_namespace(self) -> str: logger.warning("No rdf namespace found. Using %s" % namespace) return namespace - def _get_path_to_module(self, prefix: str) -> str: - namespace = prefix[1:-1] + def _get_path_to_module(self, class_namespace: str) -> str: + namespace = class_namespace[1:-1] if self.custom_folder and namespace in self.custom_folder: path_to_module = self.custom_folder[namespace] else: From bae046cb80eed01dcf7da2d66ab57bf7e150bda7 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Tue, 19 Nov 2024 15:06:49 +0100 Subject: [PATCH 10/22] Improved readability of Writer Added ability to add attributes to profile Model header Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/writer.py | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/cimgen/languages/modernpython/utils/writer.py b/cimgen/languages/modernpython/utils/writer.py index 147cc9d2..0388d88d 100644 --- a/cimgen/languages/modernpython/utils/writer.py +++ b/cimgen/languages/modernpython/utils/writer.py @@ -1,22 +1,28 @@ from lxml import etree -from .base import Base +from pydantic import BaseModel from .constants import NAMESPACES from .profile import BaseProfile, Profile -from typing import Optional +from typing import Dict, Optional -class Writer: - """Class for writing CIM RDF/XML files.""" +class Writer(BaseModel): + """Class for writing CIM RDF/XML files - def __init__(self, objects: dict[str, Base]): - """Constructor. + Args: + objects (dict): Mapping of rdfid to CIM object + Model_metadata (Optional[Dict[str, str]]): any additional data to add in header + default = {"modelingAuthoritySet": "www.sogno.energy" } + """ - :param objects: Mapping of rdfid to CIM object. - """ - self.objects = objects + objects: Dict + Model_metadata: Dict[str, str] = {} def write( - self, outputfile: str, model_id: str, class_profile_map: dict[str, BaseProfile], custom_namespaces: dict + self, + outputfile: str, + model_id: str, + class_profile_map: Dict[str, BaseProfile], + custom_namespaces: Dict = {}, ) -> dict[BaseProfile, str]: """Write CIM RDF/XML files. @@ -28,6 +34,7 @@ def write( :param outputfile: Stem of the output file, resulting files: _.xml. :param model_id: Stem of the model IDs, resulting IDs: _. :param class_profile_map: Mapping of CIM type to profile. + :param custom_namespaces: Optional[Dict[str, str]]: {"namespace_prefix": "namespace_uri"} :return: Mapping of profile to outputfile. """ profile_list: list[BaseProfile] = list(Profile) @@ -42,18 +49,23 @@ def write( profile_file_map[profile] = full_file_name return profile_file_map - def _generate(self, profile: BaseProfile, model_id: str, custom_namespaces) -> Optional[etree.ElementTree]: + def _generate( + self, profile: BaseProfile, model_id: str, custom_namespaces: Dict = {} + ) -> Optional[etree.ElementTree]: """Write CIM objects as RDF/XML data to a string. This function creates RDF/XML tree corresponding to one profile. :param profile: Only data for this profile should be written. :param model_id: Stem of the model IDs, resulting IDs: _. + :param custom_namespaces: Optional[Dict[str, str]]: {"namespace_prefix": "namespace_uri"} :return: etree of the profile """ + Model = {"modelingAuthoritySet": "www.sogno.energy"} + Model.update(self.Model_metadata) FullModel = { "id": model_id, - "Model": {"modelingAuthoritySet": "www.sogno.energy"}, + "Model": Model, } for uri in profile.uris: FullModel["Model"].update({"profile": uri}) From a8e078f1f7d8981e698dc9af179c5f8a484ba728 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Tue, 19 Nov 2024 15:49:53 +0100 Subject: [PATCH 11/22] Fix sonar issues Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cimgen/languages/modernpython/utils/reader.py b/cimgen/languages/modernpython/utils/reader.py index 332f31fd..6324af17 100644 --- a/cimgen/languages/modernpython/utils/reader.py +++ b/cimgen/languages/modernpython/utils/reader.py @@ -172,7 +172,7 @@ def _get_rdf_namespace(self) -> str: try: namespace = self.import_result["meta_info"]["namespaces"]["rdf"] except KeyError: - namespace = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" + namespace = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" # NOSONAR logger.warning("No rdf namespace found. Using %s" % namespace) return namespace From 778658a883076151e018a4d31d7239262e8bc41c Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Wed, 20 Nov 2024 10:32:07 +0100 Subject: [PATCH 12/22] Fix sonar issues Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/reader.py | 22 ++++++++----------- cimgen/languages/modernpython/utils/writer.py | 16 +++++++------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/cimgen/languages/modernpython/utils/reader.py b/cimgen/languages/modernpython/utils/reader.py index 6324af17..05ef833d 100644 --- a/cimgen/languages/modernpython/utils/reader.py +++ b/cimgen/languages/modernpython/utils/reader.py @@ -60,7 +60,6 @@ def _instantiate_classes(self, xml_file: str, bases: tuple, namespace_rdf: str): namespace_rdf (str): rdf namespace uri """ context = etree.iterparse(xml_file, ("start", "end")) - _, root = next(context) level = 0 for event, elem in context: @@ -70,7 +69,7 @@ def _instantiate_classes(self, xml_file: str, bases: tuple, namespace_rdf: str): level += 1 class_namespace = next((namespace for namespace in bases if elem.tag.startswith(namespace)), None) - if event == "start" and class_namespace is not None and level == 1: + if event == "start" and class_namespace is not None and level == 2: class_name, uuid = self._extract_classname_uuid(elem, class_namespace, namespace_rdf) if uuid is not None: self._process_element(class_name, uuid, class_namespace, elem) @@ -96,9 +95,8 @@ def _extract_classname_uuid(elem, class_namespace: str, namespace_rdf: str) -> t uuid = elem.get("{%s}about" % namespace_rdf) if uuid is not None: uuid = uuid[1:] - if uuid is not None: - if not uuid.startswith("_"): - uuid = "_" + uuid + if uuid is not None and not uuid.startswith("_"): + uuid = "_" + uuid return class_name, uuid def _process_element(self, class_name: str, uuid: str, class_namespace: str, elem): @@ -141,17 +139,15 @@ def _check_metadata(self, elem): if package_key in elem.text: break # the author of all imported files should be the same, avoid multiple entries - elif "author" in self.import_result["meta_info"].keys(): - pass - # extract author - elif "Model.createdBy" in elem.tag: - self.import_result["meta_info"]["author"] = elem.text - elif "Model.modelingAuthoritySet" in elem.tag: - self.import_result["meta_info"]["author"] = elem.text + elif "author" not in self.import_result["meta_info"].keys(): + if "Model.createdBy" in elem.tag: + self.import_result["meta_info"]["author"] = elem.text + elif "Model.modelingAuthoritySet" in elem.tag: + self.import_result["meta_info"]["author"] = elem.text # Returns a map of class_namespace to namespace for the given XML file. @staticmethod - def _get_namespaces(source) -> str: + def _get_namespaces(source) -> Dict: namespaces = {} events = ("end", "start-ns", "end-ns") for event, elem in etree.iterparse(source, events): diff --git a/cimgen/languages/modernpython/utils/writer.py b/cimgen/languages/modernpython/utils/writer.py index 0388d88d..8af935b0 100644 --- a/cimgen/languages/modernpython/utils/writer.py +++ b/cimgen/languages/modernpython/utils/writer.py @@ -15,7 +15,7 @@ class Writer(BaseModel): """ objects: Dict - Model_metadata: Dict[str, str] = {} + model_metadata: Dict[str, str] = {} def write( self, @@ -61,14 +61,14 @@ def _generate( :param custom_namespaces: Optional[Dict[str, str]]: {"namespace_prefix": "namespace_uri"} :return: etree of the profile """ - Model = {"modelingAuthoritySet": "www.sogno.energy"} - Model.update(self.Model_metadata) - FullModel = { + model = {"modelingAuthoritySet": "www.sogno.energy"} + model.update(self.model_metadata) + fullmodel = { "id": model_id, - "Model": Model, + "Model": model, } for uri in profile.uris: - FullModel["Model"].update({"profile": uri}) + fullmodel["Model"].update({"profile": uri}) nsmap = NAMESPACES nsmap.update(custom_namespaces) @@ -80,8 +80,8 @@ def _generate( # FullModel header model = etree.Element(md_namespace + "FullModel", nsmap=nsmap) - model.set(rdf_namespace + "about", "#" + FullModel["id"]) - for key, value in FullModel["Model"].items(): + model.set(rdf_namespace + "about", "#" + fullmodel["id"]) + for key, value in fullmodel["Model"].items(): element = etree.SubElement(model, md_namespace + "Model." + key) element.text = value root.append(model) From 362080e02d370cedd17e070f1f677e0faa5e4b95 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Wed, 20 Nov 2024 10:39:03 +0100 Subject: [PATCH 13/22] Fix sonar issues Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/reader.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cimgen/languages/modernpython/utils/reader.py b/cimgen/languages/modernpython/utils/reader.py index 05ef833d..d1583150 100644 --- a/cimgen/languages/modernpython/utils/reader.py +++ b/cimgen/languages/modernpython/utils/reader.py @@ -140,9 +140,7 @@ def _check_metadata(self, elem): break # the author of all imported files should be the same, avoid multiple entries elif "author" not in self.import_result["meta_info"].keys(): - if "Model.createdBy" in elem.tag: - self.import_result["meta_info"]["author"] = elem.text - elif "Model.modelingAuthoritySet" in elem.tag: + if any(author_field in elem.tag for author_field in ("Model.createdBy", "Model.modelingAuthoritySet")): self.import_result["meta_info"]["author"] = elem.text # Returns a map of class_namespace to namespace for the given XML file. From 4f00acda7e20e26a88110c479e20b7a4ef4c4c31 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Wed, 20 Nov 2024 12:15:31 +0100 Subject: [PATCH 14/22] Refactor functions to_xml and _parse_xml_fragment to split in different subfunction and improve readability Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/base.py | 182 ++++++++++---------- 1 file changed, 95 insertions(+), 87 deletions(-) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index 58782953..3c0a3d1d 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -135,27 +135,14 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, shortname = f.name qualname = f"{parent.apparent_name()}.{shortname}" # type: ignore infos = dict() - if f not in self.cgmes_attribute_names_in_profile(profile) or shortname in seen_attrs: - # Wrong profile or already found from a parent. - continue - else: + if f in self.cgmes_attribute_names_in_profile(profile) and shortname not in seen_attrs: # Namespace finding # "class namespace" means the first namespace defined in the inheritance tree. # This can go up to Base, which will give the default cim NS. - if (extra := getattr(f.default, "json_schema_extra", None)) is None: - # The attribute does not have extra metadata. It might be a custom atttribute - # without it, or a base type (int...). - # Use the class namespace. - infos["namespace"] = self.namespace - elif extra.get("is_used"): - if (extra.get("attribute_namespace", None)) is None: - # The attribute has some extras, but not namespace. - # Use the class namespace. - infos["namespace"] = self.namespace - - else: - # The attribute has an explicit namesapce - infos["namespace"] = extra.get("attribute_namespace", self.namespace) + + infos["namespace"] = self.namespace + extra = getattr(f.default, "json_schema_extra", None) + if extra is not None and extra.get("is_used"): # adding the extras, used for xml generation extra_info = { "attr_name": qualname, @@ -166,6 +153,9 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, "is_datatype_attribute": extra.get("is_datatype_attribute"), "attribute_class": extra.get("attribute_class"), } + if (extra.get("attribute_namespace", None)) is not None: + # The attribute has an explicit namesapce + extra_info["namespace"] = extra.get("attribute_namespace", self.namespace) infos.update(extra_info) infos["value"] = getattr(self, shortname) @@ -185,16 +175,7 @@ def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Op Returns: Optional[etree.Element]: etree describing self for the profile_to_export, None if nothing to export """ - profile_attributes = self.cgmes_attributes_in_profile(profile_to_export) - is_recommanded_profile = self.recommended_profile.value == profile_to_export.value - - if not is_recommanded_profile: - # remove attributes that are also present in "recommended_profile" - attributes_main = self.cgmes_attributes_in_profile(self.recommended_profile) - for key in attributes_main.keys(): - if key in profile_attributes: - del profile_attributes[key] - profile_attributes = self._remove_empty_attributes(profile_attributes) + profile_attributes = self._get_attributes_to_export(profile_to_export) if "mRID" in self.to_dict(): obj_id = "_" + self.mRID @@ -205,34 +186,31 @@ def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Op if profile_attributes == {} or obj_id is None: root = None else: - nsmap = NAMESPACES # Create root element + nsmap = NAMESPACES root = etree.Element("{" + self.namespace + "}" + self.resource_name, nsmap=nsmap) - # Add the ID ass attribute to the root - rdf_namespace = f"""{{{nsmap["rdf"]}}}""" - if is_recommanded_profile: - root.set(rdf_namespace + "ID", obj_id) + # Add the ID as attribute to the root + if self.recommended_profile.value == profile_to_export.value: + root.set(f"""{{{nsmap["rdf"]}}}""" + "ID", obj_id) else: - root.set(rdf_namespace + "about", "#" + obj_id) - - for field_name, attribute in profile_attributes.items(): - # add all attributes relevant to the profile as SubElements - attr_namespace = attribute["namespace"] - element_name = f"{{{attr_namespace}}}{field_name}" - - if attribute["is_class_attribute"]: - # class_attributes are exported as rdf: resource #_mRID_of_target - element = etree.SubElement(root, element_name) - element.set(rdf_namespace + "resource", "#" + attribute["value"]) - elif attribute["is_enum_attribute"]: - element = etree.SubElement(root, element_name) - element.set(rdf_namespace + "resource", nsmap["cim"] + attribute["value"]) - else: - element = etree.SubElement(root, element_name) - element.text = str(attribute["value"]) + root.set(f"""{{{nsmap["rdf"]}}}""" + "about", "#" + obj_id) + + root = self._add_attribute_to_etree(attributes=profile_attributes, root=root, nsmap=nsmap) return root + def _get_attributes_to_export(self, profile_to_export: BaseProfile) -> dict: + attributes_to_export = self.cgmes_attributes_in_profile(profile_to_export) + is_recommanded_profile = self.recommended_profile.value == profile_to_export.value + if not is_recommanded_profile: + # remove attributes that are also present in "recommended_profile" + attributes_main = self.cgmes_attributes_in_profile(self.recommended_profile) + for key in attributes_main.keys(): + if key in attributes_to_export: + del attributes_to_export[key] + attributes_to_export = self._remove_empty_attributes(attributes_to_export) + return attributes_to_export + @staticmethod def _remove_empty_attributes(attributes: dict) -> dict: for key, attribute in list(attributes.items()): @@ -249,6 +227,26 @@ def _remove_empty_attributes(attributes: dict) -> dict: attribute["value"] = str(attribute["value"]).lower() return attributes + @staticmethod + def _add_attribute_to_etree(attributes: dict, root: etree.Element, nsmap: dict) -> etree.Element: + rdf_namespace = f"""{{{nsmap["rdf"]}}}""" + for field_name, attribute in attributes.items(): + # add all attributes relevant to the profile as SubElements + attr_namespace = attribute["namespace"] + element_name = f"{{{attr_namespace}}}{field_name}" + + if attribute["is_class_attribute"]: + # class_attributes are exported as rdf: resource #_mRID_of_target + element = etree.SubElement(root, element_name) + element.set(rdf_namespace + "resource", "#" + attribute["value"]) + elif attribute["is_enum_attribute"]: + element = etree.SubElement(root, element_name) + element.set(rdf_namespace + "resource", nsmap["cim"] + attribute["value"]) + else: + element = etree.SubElement(root, element_name) + element.text = str(attribute["value"]) + return root + def __str__(self) -> str: """Returns the string representation of this resource.""" return "\n".join([f"{k}={v}" for k, v in sorted(self.to_dict().items())]) @@ -269,15 +267,7 @@ def _parse_xml_fragment(self, xml_fragment: str) -> dict: if not xml_tree.tag.endswith(self.resource_name): raise (KeyError(f"The fragment does not correspond to the class {self.resource_name}")) - # parsing the mRID - for key, value in xml_tree.attrib.items(): - if key.endswith("ID") or key.endswith("about"): - if value.startswith("#"): - value = value[1:] - if value.startswith("_"): - value = value[1:] - if hasattr(self, "mRID") and value is not None: - attribute_dict["mRID"] = value + attribute_dict.update(self._extract_mRID_from_etree(xml_tree=xml_tree)) # parsing attributes defined in class class_attributes = self.cgmes_attributes_in_profile(None) @@ -287,39 +277,57 @@ def _parse_xml_fragment(self, xml_fragment: str) -> dict: continue xml_attribute = xml_attribute[0] attr = key.rsplit(".")[-1] - attr_value = None - - # class attributes are pointing to another class/instance defined in .attrib - if class_attribute["is_class_attribute"]: - if len(xml_attribute.keys()) == 1: - attr_value = xml_attribute.values()[0] - if attr_value.startswith("#"): - attr_value = attr_value[1:] - - # enum attributes are defined in .attrib and has a prefix ending in "#" - elif class_attribute["is_enum_attribute"]: - if len(xml_attribute.keys()) == 1: - attr_value = xml_attribute.values()[0] - if "#" in attr_value: - attr_value = attr_value.rsplit("#")[-1] - - elif class_attribute["is_list_attribute"]: - # other attributes types are defined in .text - attr_value = xml_attribute - attr_value = self.key.append(attr_value) - else: - attr_value = xml_attribute.text - # primitive classes are described in "cim_data_type" allowing to retrieve the data type - if class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: - if class_attribute["attribute_class"].type == bool: - attr_value = {"true": True, "false": False}.get(attr_value, None) - else: - attr_value = class_attribute["attribute_class"].type(attr_value) + + attr_value = self._extract_attr_value_from_etree(class_attribute, xml_attribute) if hasattr(self, attr) and attr_value is not None: attribute_dict[attr] = attr_value return attribute_dict + def _extract_mRID_from_etree(self, xml_tree: etree.Element) -> dict: + """Parsing the mRID from etree attributes""" + mRID_dict = {} + for key, value in xml_tree.attrib.items(): + if key.endswith("ID") or key.endswith("about"): + if value.startswith("#"): + value = value[1:] + if value.startswith("_"): + value = value[1:] + if hasattr(self, "mRID") and value is not None: + mRID_dict = {"mRID": value} + return mRID_dict + + def _extract_attr_value_from_etree(self, class_attribute: dict, xml_attribute: dict): + """Parsing the attribute value from etree attributes""" + attr_value = None + # class attributes are pointing to another class/instance defined in .attrib + if class_attribute["is_class_attribute"]: + if len(xml_attribute.keys()) == 1: + attr_value = xml_attribute.values()[0] + if attr_value.startswith("#"): + attr_value = attr_value[1:] + + # enum attributes are defined in .attrib and has a prefix ending in "#" + elif class_attribute["is_enum_attribute"]: + if len(xml_attribute.keys()) == 1: + attr_value = xml_attribute.values()[0] + if "#" in attr_value: + attr_value = attr_value.rsplit("#")[-1] + + elif class_attribute["is_list_attribute"]: + # other attributes types are defined in .text + attr_value = xml_attribute + attr_value = self.key.append(attr_value) + else: + attr_value = xml_attribute.text + # primitive classes are described in "cim_data_type" allowing to retrieve the data type + if class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: + if class_attribute["attribute_class"].type == bool: + attr_value = {"true": True, "false": False}.get(attr_value, None) + else: + attr_value = class_attribute["attribute_class"].type(attr_value) + return attr_value + def update_from_xml(self, xml_fragment: str): """ Updates the instance by parsing an xml fragment defining the attributes of this instance From 339e5d7079380adb3bfdaf1588e2bb8bd23ff55f Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Wed, 20 Nov 2024 13:36:20 +0100 Subject: [PATCH 15/22] Fix sonar issues Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/base.py | 86 ++++++++++----------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index 3c0a3d1d..9fbfce50 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -135,33 +135,35 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, shortname = f.name qualname = f"{parent.apparent_name()}.{shortname}" # type: ignore infos = dict() - if f in self.cgmes_attribute_names_in_profile(profile) and shortname not in seen_attrs: - # Namespace finding - # "class namespace" means the first namespace defined in the inheritance tree. - # This can go up to Base, which will give the default cim NS. - - infos["namespace"] = self.namespace - extra = getattr(f.default, "json_schema_extra", None) - if extra is not None and extra.get("is_used"): - # adding the extras, used for xml generation - extra_info = { - "attr_name": qualname, - "is_class_attribute": extra.get("is_class_attribute"), - "is_enum_attribute": extra.get("is_enum_attribute"), - "is_list_attribute": extra.get("is_list_attribute"), - "is_primitive_attribute": extra.get("is_primitive_attribute"), - "is_datatype_attribute": extra.get("is_datatype_attribute"), - "attribute_class": extra.get("attribute_class"), - } - if (extra.get("attribute_namespace", None)) is not None: - # The attribute has an explicit namesapce - extra_info["namespace"] = extra.get("attribute_namespace", self.namespace) - infos.update(extra_info) - - infos["value"] = getattr(self, shortname) - - qual_attrs[qualname] = CgmesAttribute(infos) - seen_attrs.add(shortname) + + if f not in self.cgmes_attribute_names_in_profile(profile) or shortname in seen_attrs: + continue + + # Namespace finding + # "class namespace" means the first namespace defined in the inheritance tree. + # This can go up to Base, which will give the default cim NS. + infos["namespace"] = self.namespace + extra = getattr(f.default, "json_schema_extra", None) + if extra is not None and extra.get("is_used"): + # adding the extras, used for xml generation + extra_info = { + "attr_name": qualname, + "is_class_attribute": extra.get("is_class_attribute"), + "is_enum_attribute": extra.get("is_enum_attribute"), + "is_list_attribute": extra.get("is_list_attribute"), + "is_primitive_attribute": extra.get("is_primitive_attribute"), + "is_datatype_attribute": extra.get("is_datatype_attribute"), + "attribute_class": extra.get("attribute_class"), + } + if (extra.get("attribute_namespace", None)) is not None: + # The attribute has an explicit namesapce + extra_info["namespace"] = extra.get("attribute_namespace", self.namespace) + infos.update(extra_info) + + infos["value"] = getattr(self, shortname) + + qual_attrs[qualname] = CgmesAttribute(infos) + seen_attrs.add(shortname) return qual_attrs @@ -267,7 +269,7 @@ def _parse_xml_fragment(self, xml_fragment: str) -> dict: if not xml_tree.tag.endswith(self.resource_name): raise (KeyError(f"The fragment does not correspond to the class {self.resource_name}")) - attribute_dict.update(self._extract_mRID_from_etree(xml_tree=xml_tree)) + attribute_dict.update(self._extract_mrid_from_etree(xml_tree=xml_tree)) # parsing attributes defined in class class_attributes = self.cgmes_attributes_in_profile(None) @@ -284,9 +286,9 @@ def _parse_xml_fragment(self, xml_fragment: str) -> dict: return attribute_dict - def _extract_mRID_from_etree(self, xml_tree: etree.Element) -> dict: + def _extract_mrid_from_etree(self, xml_tree: etree.Element) -> dict: """Parsing the mRID from etree attributes""" - mRID_dict = {} + mrid_dict = {} for key, value in xml_tree.attrib.items(): if key.endswith("ID") or key.endswith("about"): if value.startswith("#"): @@ -294,25 +296,23 @@ def _extract_mRID_from_etree(self, xml_tree: etree.Element) -> dict: if value.startswith("_"): value = value[1:] if hasattr(self, "mRID") and value is not None: - mRID_dict = {"mRID": value} - return mRID_dict + mrid_dict = {"mRID": value} + return mrid_dict - def _extract_attr_value_from_etree(self, class_attribute: dict, xml_attribute: dict): + def _extract_attr_value_from_etree(self, class_attribute: dict, xml_attribute: etree.Element): """Parsing the attribute value from etree attributes""" attr_value = None # class attributes are pointing to another class/instance defined in .attrib - if class_attribute["is_class_attribute"]: - if len(xml_attribute.keys()) == 1: - attr_value = xml_attribute.values()[0] - if attr_value.startswith("#"): - attr_value = attr_value[1:] + if class_attribute["is_class_attribute"] and len(xml_attribute.keys()) == 1: + attr_value = xml_attribute.values()[0] + if attr_value.startswith("#"): + attr_value = attr_value[1:] # enum attributes are defined in .attrib and has a prefix ending in "#" - elif class_attribute["is_enum_attribute"]: - if len(xml_attribute.keys()) == 1: - attr_value = xml_attribute.values()[0] - if "#" in attr_value: - attr_value = attr_value.rsplit("#")[-1] + elif class_attribute["is_enum_attribute"] and len(xml_attribute.keys()) == 1: + attr_value = xml_attribute.values()[0] + if "#" in attr_value: + attr_value = attr_value.rsplit("#")[-1] elif class_attribute["is_list_attribute"]: # other attributes types are defined in .text From dabbcf182e15908f66b8c14a27e793718c48aa02 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Wed, 20 Nov 2024 13:50:34 +0100 Subject: [PATCH 16/22] Fix comments Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index 9fbfce50..272f85cb 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -315,13 +315,14 @@ def _extract_attr_value_from_etree(self, class_attribute: dict, xml_attribute: e attr_value = attr_value.rsplit("#")[-1] elif class_attribute["is_list_attribute"]: - # other attributes types are defined in .text attr_value = xml_attribute attr_value = self.key.append(attr_value) else: + # other attributes types are defined in .text attr_value = xml_attribute.text - # primitive classes are described in "cim_data_type" allowing to retrieve the data type + if class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: + # primitive classes are described in "attribute_class" allowing to retrieve the data type if class_attribute["attribute_class"].type == bool: attr_value = {"true": True, "false": False}.get(attr_value, None) else: From da4ec07bd25b2f326afdc2fb8c4293661e04a31e Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Mon, 25 Nov 2024 09:12:05 +0100 Subject: [PATCH 17/22] Removed extra indentation levels Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/base.py | 29 +++++++++------------ 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index 272f85cb..29f500ab 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -219,14 +219,12 @@ def _remove_empty_attributes(attributes: dict) -> dict: # Remove empty attributes if attribute["value"] is None or attribute["value"] == "": del attributes[key] - else: - # Make bool lower str for XML - if ( - "is_datatype_attribute" in attribute - and attribute["attribute_class"] - and attribute["attribute_class"].name == "Boolean" - ): - attribute["value"] = str(attribute["value"]).lower() + elif ( + "is_datatype_attribute" in attribute + and attribute["attribute_class"] + and attribute["attribute_class"].name == "Boolean" + ): + attribute["value"] = str(attribute["value"]).lower() return attributes @staticmethod @@ -312,21 +310,20 @@ def _extract_attr_value_from_etree(self, class_attribute: dict, xml_attribute: e elif class_attribute["is_enum_attribute"] and len(xml_attribute.keys()) == 1: attr_value = xml_attribute.values()[0] if "#" in attr_value: - attr_value = attr_value.rsplit("#")[-1] + attr_value = attr_value.split("#")[-1] elif class_attribute["is_list_attribute"]: attr_value = xml_attribute attr_value = self.key.append(attr_value) + elif class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: + attr_value = xml_attribute.text + if class_attribute["attribute_class"].type == bool: + attr_value = {"true": True, "false": False}.get(attr_value, None) + else: + attr_value = class_attribute["attribute_class"].type(attr_value) else: # other attributes types are defined in .text attr_value = xml_attribute.text - - if class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: - # primitive classes are described in "attribute_class" allowing to retrieve the data type - if class_attribute["attribute_class"].type == bool: - attr_value = {"true": True, "false": False}.get(attr_value, None) - else: - attr_value = class_attribute["attribute_class"].type(attr_value) return attr_value def update_from_xml(self, xml_fragment: str): From 55550f604ead3ce9cb83c0c7419982656bd4db8e Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Wed, 27 Nov 2024 09:45:23 +0100 Subject: [PATCH 18/22] Add enum wrapped_comments Signed-off-by: HUG0-D --- cimgen/cimgen.py | 7 +++++++ .../modernpython/templates/enum_template.mustache | 5 +++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/cimgen/cimgen.py b/cimgen/cimgen.py index 7de641a7..3f8cd797 100644 --- a/cimgen/cimgen.py +++ b/cimgen/cimgen.py @@ -414,6 +414,13 @@ def _parse_rdf(input_dic, version): # NOSONAR for instance in enum_instances: clarse = _get_rid_of_hash(instance["type"]) if clarse and clarse in classes_map: + if instance.get("comment"): + instance["wrapped_comment"] = wrap_and_clean( + instance["comment"], + width=100, + initial_indent="# ", + subsequent_indent=(" # "), + ) classes_map[clarse].add_enum_instance(instance) else: logger.info("Class {} for enum instance {} not found.".format(clarse, instance)) diff --git a/cimgen/languages/modernpython/templates/enum_template.mustache b/cimgen/languages/modernpython/templates/enum_template.mustache index d0b3826c..1c51ff62 100644 --- a/cimgen/languages/modernpython/templates/enum_template.mustache +++ b/cimgen/languages/modernpython/templates/enum_template.mustache @@ -7,9 +7,10 @@ from enum import Enum class {{class_name}}(str, Enum): """ - {{{class_comment}}} # noqa: E501 + {{{wrapped_class_comment}}} """ {{#enum_instances}} - {{label}} = "{{label}}"{{#comment}} # {{comment}}{{/comment}} # noqa: E501 + {{label}} = "{{label}}"{{#comment}} + {{wrapped_comment}}{{/comment}} {{/enum_instances}} From 91269754655dcd9bdc908b23eec14b5bd45dace9 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Wed, 27 Nov 2024 10:10:49 +0100 Subject: [PATCH 19/22] Fix json_dump error by setting attribute_class as str Fix attribute_namespace to namespace to be coherant with pycgmes Signed-off-by: HUG0-D --- cimgen/languages/modernpython/lang_pack.py | 9 -------- .../templates/class_template.mustache | 9 +++----- cimgen/languages/modernpython/utils/base.py | 21 ++++++++----------- 3 files changed, 12 insertions(+), 27 deletions(-) diff --git a/cimgen/languages/modernpython/lang_pack.py b/cimgen/languages/modernpython/lang_pack.py index 64cb4104..ce564050 100644 --- a/cimgen/languages/modernpython/lang_pack.py +++ b/cimgen/languages/modernpython/lang_pack.py @@ -98,14 +98,6 @@ def _get_python_type(datatype): return "float" -def _set_imports(attributes): - import_set = set() - for attribute in attributes: - if attribute["is_datatype_attribute"] or attribute["is_primitive_attribute"]: - import_set.add(attribute["attribute_class"]) - return sorted(import_set) - - def _set_datatype_attributes(attributes) -> dict: datatype_attributes = {} datatype_attributes["python_type"] = "None" @@ -136,7 +128,6 @@ def run_template(output_path, class_details): template = class_template_file class_details["setDefault"] = _set_default class_details["setType"] = _set_type - class_details["imports"] = _set_imports(class_details["attributes"]) resource_file = _create_file(output_path, class_details, template) _write_templated_file(resource_file, class_details, template["filename"]) diff --git a/cimgen/languages/modernpython/templates/class_template.mustache b/cimgen/languages/modernpython/templates/class_template.mustache index 18dd1141..da6ee721 100644 --- a/cimgen/languages/modernpython/templates/class_template.mustache +++ b/cimgen/languages/modernpython/templates/class_template.mustache @@ -10,9 +10,6 @@ from pydantic.dataclasses import dataclass from ..utils.profile import BaseProfile, Profile from {{class_location}} import {{sub_class_of}} -{{#imports}} -from .{{.}} import {{.}} -{{/imports}} @dataclass @@ -35,17 +32,17 @@ class {{class_name}}({{sub_class_of}}): {{/attr_origin}} ], "is_used": {{#is_used}}True{{/is_used}}{{^is_used}}False{{/is_used}}, - "attribute_namespace": "{{attribute_namespace}}", # NOSONAR + "namespace": "{{attribute_namespace}}", # NOSONAR "is_class_attribute": {{#is_class_attribute}}True{{/is_class_attribute}}{{^is_class_attribute}}False{{/is_class_attribute}}, "is_datatype_attribute": {{#is_datatype_attribute}}True{{/is_datatype_attribute}}{{^is_datatype_attribute}}False{{/is_datatype_attribute}}, "is_enum_attribute": {{#is_enum_attribute}}True{{/is_enum_attribute}}{{^is_enum_attribute}}False{{/is_enum_attribute}}, "is_list_attribute": {{#is_list_attribute}}True{{/is_list_attribute}}{{^is_list_attribute}}False{{/is_list_attribute}}, "is_primitive_attribute": {{#is_primitive_attribute}}True{{/is_primitive_attribute}}{{^is_primitive_attribute}}False{{/is_primitive_attribute}}, {{#is_datatype_attribute}} - "attribute_class": {{attribute_class}}, + "attribute_class": "{{attribute_class}}", {{/is_datatype_attribute}} {{#is_primitive_attribute}} - "attribute_class": {{attribute_class}}, + "attribute_class": "{{attribute_class}}", {{/is_primitive_attribute}} }, ) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index 29f500ab..deaf4712 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -144,7 +144,7 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, # This can go up to Base, which will give the default cim NS. infos["namespace"] = self.namespace extra = getattr(f.default, "json_schema_extra", None) - if extra is not None and extra.get("is_used"): + if extra and extra.get("is_used"): # adding the extras, used for xml generation extra_info = { "attr_name": qualname, @@ -155,9 +155,9 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, "is_datatype_attribute": extra.get("is_datatype_attribute"), "attribute_class": extra.get("attribute_class"), } - if (extra.get("attribute_namespace", None)) is not None: + if extra.get("namespace"): # The attribute has an explicit namesapce - extra_info["namespace"] = extra.get("attribute_namespace", self.namespace) + extra_info["namespace"] = extra.get("namespace", self.namespace) infos.update(extra_info) infos["value"] = getattr(self, shortname) @@ -219,11 +219,7 @@ def _remove_empty_attributes(attributes: dict) -> dict: # Remove empty attributes if attribute["value"] is None or attribute["value"] == "": del attributes[key] - elif ( - "is_datatype_attribute" in attribute - and attribute["attribute_class"] - and attribute["attribute_class"].name == "Boolean" - ): + elif attribute.get("attribute_class") and attribute["attribute_class"] == "Boolean": attribute["value"] = str(attribute["value"]).lower() return attributes @@ -278,7 +274,7 @@ def _parse_xml_fragment(self, xml_fragment: str) -> dict: xml_attribute = xml_attribute[0] attr = key.rsplit(".")[-1] - attr_value = self._extract_attr_value_from_etree(class_attribute, xml_attribute) + attr_value = self._extract_attr_value_from_etree(attr, class_attribute, xml_attribute) if hasattr(self, attr) and attr_value is not None: attribute_dict[attr] = attr_value @@ -297,7 +293,7 @@ def _extract_mrid_from_etree(self, xml_tree: etree.Element) -> dict: mrid_dict = {"mRID": value} return mrid_dict - def _extract_attr_value_from_etree(self, class_attribute: dict, xml_attribute: etree.Element): + def _extract_attr_value_from_etree(self, attr_name: str, class_attribute: dict, xml_attribute: etree.Element): """Parsing the attribute value from etree attributes""" attr_value = None # class attributes are pointing to another class/instance defined in .attrib @@ -317,10 +313,11 @@ def _extract_attr_value_from_etree(self, class_attribute: dict, xml_attribute: e attr_value = self.key.append(attr_value) elif class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: attr_value = xml_attribute.text - if class_attribute["attribute_class"].type == bool: + if self.__dataclass_fields__[attr_name].type == bool: attr_value = {"true": True, "false": False}.get(attr_value, None) else: - attr_value = class_attribute["attribute_class"].type(attr_value) + # types are int, float or str (date, time and datetime treated as str) + attr_value = self.__dataclass_fields__[attr_name].type(attr_value) else: # other attributes types are defined in .text attr_value = xml_attribute.text From 7a095f3d0bbf0275ae7fe7843871e5c73a52caf8 Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Wed, 27 Nov 2024 10:30:46 +0100 Subject: [PATCH 20/22] Changed var name for writer Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/writer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cimgen/languages/modernpython/utils/writer.py b/cimgen/languages/modernpython/utils/writer.py index 8af935b0..19d4d1e9 100644 --- a/cimgen/languages/modernpython/utils/writer.py +++ b/cimgen/languages/modernpython/utils/writer.py @@ -15,7 +15,7 @@ class Writer(BaseModel): """ objects: Dict - model_metadata: Dict[str, str] = {} + writer_metadata: Dict[str, str] = {} def write( self, @@ -61,11 +61,11 @@ def _generate( :param custom_namespaces: Optional[Dict[str, str]]: {"namespace_prefix": "namespace_uri"} :return: etree of the profile """ - model = {"modelingAuthoritySet": "www.sogno.energy"} - model.update(self.model_metadata) + writer_info = {"modelingAuthoritySet": "www.sogno.energy"} + writer_info.update(self.writer_metadata) fullmodel = { "id": model_id, - "Model": model, + "Model": writer_info, } for uri in profile.uris: fullmodel["Model"].update({"profile": uri}) From f531c061f5a8573078b08817bbcd85b27f89bb9c Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Mon, 16 Dec 2024 17:24:34 +0100 Subject: [PATCH 21/22] Added subfolder within resources to store primitives, enum and datatypes Signed-off-by: HUG0-D --- cimgen/languages/modernpython/lang_pack.py | 10 +++++++++- .../modernpython/templates/datatype_template.mustache | 4 ++-- .../modernpython/templates/primitive_template.mustache | 4 ++-- cimgen/languages/modernpython/utils/datatypes.py | 6 +++--- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/cimgen/languages/modernpython/lang_pack.py b/cimgen/languages/modernpython/lang_pack.py index ce564050..58c5c1fc 100644 --- a/cimgen/languages/modernpython/lang_pack.py +++ b/cimgen/languages/modernpython/lang_pack.py @@ -133,7 +133,15 @@ def run_template(output_path, class_details): def _create_file(output_path, class_details, template) -> str: - resource_file = Path(output_path) / "resources" / (class_details["class_name"] + template["ext"]) + if ( + class_details["is_a_primitive_class"] + or class_details["is_a_datatype_class"] + or class_details["is_an_enum_class"] + ): + class_category = "types" + else: + class_category = "" + resource_file = Path(output_path) / "resources" / class_category / (class_details["class_name"] + template["ext"]) resource_file.parent.mkdir(exist_ok=True) return str(resource_file) diff --git a/cimgen/languages/modernpython/templates/datatype_template.mustache b/cimgen/languages/modernpython/templates/datatype_template.mustache index 6d690217..a6d4d6a9 100644 --- a/cimgen/languages/modernpython/templates/datatype_template.mustache +++ b/cimgen/languages/modernpython/templates/datatype_template.mustache @@ -2,8 +2,8 @@ Generated from the CGMES files via cimgen: https://github.com/sogno-platform/cimgen """ -from ..utils.datatypes import CIMDatatype -from ..utils.profile import Profile +from ...utils.datatypes import CIMDatatype +from ...utils.profile import Profile {{#isFixed_imports}} from .{{.}} import {{.}} {{/isFixed_imports}} diff --git a/cimgen/languages/modernpython/templates/primitive_template.mustache b/cimgen/languages/modernpython/templates/primitive_template.mustache index be44ab00..2a1a758f 100644 --- a/cimgen/languages/modernpython/templates/primitive_template.mustache +++ b/cimgen/languages/modernpython/templates/primitive_template.mustache @@ -3,8 +3,8 @@ Generated from the CGMES files via cimgen: https://github.com/sogno-platform/cim """ from datetime import date, datetime, time -from ..utils.datatypes import Primitive -from ..utils.profile import Profile +from ...utils.datatypes import Primitive +from ...utils.profile import Profile {{class_name}} = Primitive( name="{{class_name}}", diff --git a/cimgen/languages/modernpython/utils/datatypes.py b/cimgen/languages/modernpython/utils/datatypes.py index db3d6ba6..a249c93f 100644 --- a/cimgen/languages/modernpython/utils/datatypes.py +++ b/cimgen/languages/modernpython/utils/datatypes.py @@ -6,9 +6,9 @@ from .config import cgmes_resource_config from .profile import BaseProfile -from ..resources.UnitMultiplier import UnitMultiplier -from ..resources.UnitSymbol import UnitSymbol -from ..resources.Currency import Currency +from ..resources.types.UnitMultiplier import UnitMultiplier +from ..resources.types.UnitSymbol import UnitSymbol +from ..resources.types.Currency import Currency @dataclass(config=cgmes_resource_config) From 275da4c57632190b40e9dfe014058c818279123c Mon Sep 17 00:00:00 2001 From: HUG0-D Date: Mon, 16 Dec 2024 17:38:11 +0100 Subject: [PATCH 22/22] Added function to get main profile of an attribute (useful in writer) Removed addition of "_" in uuid Signed-off-by: HUG0-D --- cimgen/languages/modernpython/utils/base.py | 51 ++++++++++++------- cimgen/languages/modernpython/utils/reader.py | 34 ++++++------- cimgen/languages/modernpython/utils/writer.py | 35 +++++++------ 3 files changed, 69 insertions(+), 51 deletions(-) diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index deaf4712..1e5dc8c8 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -90,6 +90,24 @@ def apparent_name(cls) -> str: """ return cls.__name__ + def get_attribute_main_profile(self, attr: str) -> BaseProfile | None: + """Get the profile for this attribute of the CIM object. + + This function searches for the profile of an attribute for the CIM type of an object. + If the main profile of the type is a possible profile of the attribute it should be choosen. + Otherwise, the first profile in the list of possible profiles ordered by profile number. + + :param attr: Attribute to check + :return: Attribute profile. + """ + attr_profiles_map = self.possible_attribute_profiles + profiles = attr_profiles_map.get(attr, []) + if self.recommended_profile in profiles: + return self.recommended_profile + if profiles: + return sorted(profiles)[0] + return None + def cgmes_attribute_names_in_profile(self, profile: BaseProfile | None) -> set[Field]: """ Returns all fields accross the parent tree which are in the profile in parameter. @@ -154,6 +172,7 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, "is_primitive_attribute": extra.get("is_primitive_attribute"), "is_datatype_attribute": extra.get("is_datatype_attribute"), "attribute_class": extra.get("attribute_class"), + "attribute_main_profile": self.get_attribute_main_profile(shortname), } if extra.get("namespace"): # The attribute has an explicit namesapce @@ -173,14 +192,14 @@ def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Op This can then be used to generate the xml file of the profile_to_export Args: profile_to_export (Profile): Profile for which we want to obtain the xml tree (eg. Profile.EQ) - id (Optional[str], optional): "_mRID", optional: some objects don't have mRID attribute. Defaults to None. + id (Optional[str], optional): "mRID", optional: some objects don't have mRID attribute. Defaults to None. Returns: Optional[etree.Element]: etree describing self for the profile_to_export, None if nothing to export """ profile_attributes = self._get_attributes_to_export(profile_to_export) if "mRID" in self.to_dict(): - obj_id = "_" + self.mRID + obj_id = self.mRID else: obj_id = id @@ -202,14 +221,11 @@ def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Op return root def _get_attributes_to_export(self, profile_to_export: BaseProfile) -> dict: - attributes_to_export = self.cgmes_attributes_in_profile(profile_to_export) - is_recommanded_profile = self.recommended_profile.value == profile_to_export.value - if not is_recommanded_profile: - # remove attributes that are also present in "recommended_profile" - attributes_main = self.cgmes_attributes_in_profile(self.recommended_profile) - for key in attributes_main.keys(): - if key in attributes_to_export: - del attributes_to_export[key] + attributes_to_export = {} + attributes_in_profile = self.cgmes_attributes_in_profile(profile_to_export) + for key, attribute in attributes_in_profile.items(): + if attribute["attribute_main_profile"] == profile_to_export: + attributes_to_export[key] = attribute attributes_to_export = self._remove_empty_attributes(attributes_to_export) return attributes_to_export @@ -217,7 +233,7 @@ def _get_attributes_to_export(self, profile_to_export: BaseProfile) -> dict: def _remove_empty_attributes(attributes: dict) -> dict: for key, attribute in list(attributes.items()): # Remove empty attributes - if attribute["value"] is None or attribute["value"] == "": + if attribute["value"] in [None, "", []]: del attributes[key] elif attribute.get("attribute_class") and attribute["attribute_class"] == "Boolean": attribute["value"] = str(attribute["value"]).lower() @@ -232,7 +248,7 @@ def _add_attribute_to_etree(attributes: dict, root: etree.Element, nsmap: dict) element_name = f"{{{attr_namespace}}}{field_name}" if attribute["is_class_attribute"]: - # class_attributes are exported as rdf: resource #_mRID_of_target + # class_attributes are exported as rdf: resource #mRID_of_target element = etree.SubElement(root, element_name) element.set(rdf_namespace + "resource", "#" + attribute["value"]) elif attribute["is_enum_attribute"]: @@ -287,8 +303,6 @@ def _extract_mrid_from_etree(self, xml_tree: etree.Element) -> dict: if key.endswith("ID") or key.endswith("about"): if value.startswith("#"): value = value[1:] - if value.startswith("_"): - value = value[1:] if hasattr(self, "mRID") and value is not None: mrid_dict = {"mRID": value} return mrid_dict @@ -309,8 +323,7 @@ def _extract_attr_value_from_etree(self, attr_name: str, class_attribute: dict, attr_value = attr_value.split("#")[-1] elif class_attribute["is_list_attribute"]: - attr_value = xml_attribute - attr_value = self.key.append(attr_value) + attr_value = eval(xml_attribute.text) elif class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: attr_value = xml_attribute.text if self.__dataclass_fields__[attr_name].type == bool: @@ -332,7 +345,11 @@ def update_from_xml(self, xml_fragment: str): if attribute_dict["mRID"] == self.mRID: for key, value in attribute_dict.items(): - setattr(self, key, value) + attr = getattr(self, key) + if isinstance(attr, list): + getattr(self, key).extend(value) + else: + setattr(self, key, value) @classmethod def from_xml(cls, xml_fragment: str): diff --git a/cimgen/languages/modernpython/utils/reader.py b/cimgen/languages/modernpython/utils/reader.py index d1583150..386ce1cd 100644 --- a/cimgen/languages/modernpython/utils/reader.py +++ b/cimgen/languages/modernpython/utils/reader.py @@ -14,19 +14,19 @@ class Reader(BaseModel): Args: cgmes_version_path (str): Path to the cgmes resources folder containing the class definition - custom_namespaces (Optional[Dict[str, str]]): {"namespace_prefix": "namespace_uri"} - custom_folder (Optional[Dict[str, str]]): {"namespace_uri": "path_to_custom_resources_folder"} + custom_namespaces (Optional[[str, str]]): {"namespace_prefix": "namespace_uri"} + custom_folder (Optional[str]): "path_to_custom_resources_folder" """ cgmes_version_path: str custom_namespaces: Optional[Dict[str, str]] = None - custom_folder: Optional[Dict[str, str]] = None + custom_folder: Optional[str] = None logger_grouped: Dict[str, Dict[str, int]] = Field(default_factory=lambda: {"errors": {}, "info": {}}) import_result: Dict = Field(default_factory=lambda: {"meta_info": {}, "topology": {}}) def parse_profiles(self, xml_files: list[str], start_dict: Optional[Dict] = None): """Parses all profiles contained in xml_files and returns a list containing - all the objects defined in the profiles "_mRID": Object\n + all the objects defined in the profiles "mRID": Object\n Errors encounterd in the parsing can be recovered in Reader.logger_grouped Args: @@ -34,7 +34,7 @@ def parse_profiles(self, xml_files: list[str], start_dict: Optional[Dict] = None start_dict (Optional[Dict]): To parse profiles on top of an existing list dict(meta_info, topology) Returns: - list: ["topology": dict of all the objects defined in the profiles {"_mRID": Object}, "meta_info"] + list: ["topology": dict of all the objects defined in the profiles {"mRID": Object}, "meta_info"] """ if start_dict is not None: self.import_result = start_dict @@ -72,14 +72,14 @@ def _instantiate_classes(self, xml_file: str, bases: tuple, namespace_rdf: str): if event == "start" and class_namespace is not None and level == 2: class_name, uuid = self._extract_classname_uuid(elem, class_namespace, namespace_rdf) if uuid is not None: - self._process_element(class_name, uuid, class_namespace, elem) + self._process_element(class_name, uuid, elem) # Check which package is read elif event == "end": self._check_metadata(elem) @staticmethod def _extract_classname_uuid(elem, class_namespace: str, namespace_rdf: str) -> tuple: - """Extracts class name and instance uuid ("_mRID") + """Extracts class name and instance uuid ("mRID") Args: elem (etree.Element): description of the instance for the given profile @@ -87,7 +87,7 @@ def _extract_classname_uuid(elem, class_namespace: str, namespace_rdf: str) -> t namespace_rdf (str): rdf namespace uri Returns: - tuple: (class_name: example "ACLineSgement", instance_uuid: "_mRID") + tuple: (class_name: example "ACLineSgement", instance_uuid: "mRID") """ class_name = elem.tag[len(class_namespace) :] uuid = elem.get("{%s}ID" % namespace_rdf) @@ -95,25 +95,22 @@ def _extract_classname_uuid(elem, class_namespace: str, namespace_rdf: str) -> t uuid = elem.get("{%s}about" % namespace_rdf) if uuid is not None: uuid = uuid[1:] - if uuid is not None and not uuid.startswith("_"): - uuid = "_" + uuid return class_name, uuid - def _process_element(self, class_name: str, uuid: str, class_namespace: str, elem): + def _process_element(self, class_name: str, uuid: str, elem): """Creates or updates (if an object with the same uuid exists) an instance of the class based on the fragment of the profile Args: class_name (str): Name of the class of the instance to create/update (example: ACLineSegment) - uuid (str): _mRID - class_namespace (str): namespace defining the class + uuid (str): mRID elem (etree.Element): description of the instance for the given profile """ topology = self.import_result["topology"] elem_str = etree.tostring(elem, encoding="utf8") try: # Import the module for the CGMES object. - module_name = self._get_path_to_module(class_namespace) + "." + class_name + module_name = self._get_path_to_module(class_name) module = importlib.import_module(module_name) klass = getattr(module, class_name) @@ -170,12 +167,11 @@ def _get_rdf_namespace(self) -> str: logger.warning("No rdf namespace found. Using %s" % namespace) return namespace - def _get_path_to_module(self, class_namespace: str) -> str: - namespace = class_namespace[1:-1] - if self.custom_folder and namespace in self.custom_folder: - path_to_module = self.custom_folder[namespace] + def _get_path_to_module(self, class_name: str) -> str: + if self.custom_folder and importlib.find_loader(self.custom_folder + "." + class_name): + path_to_module = self.custom_folder + "." + class_name else: - path_to_module = self.cgmes_version_path + path_to_module = self.cgmes_version_path + "." + class_name return path_to_module def _log_message(self, log_type: Literal["errors", "info"], message: str): diff --git a/cimgen/languages/modernpython/utils/writer.py b/cimgen/languages/modernpython/utils/writer.py index 19d4d1e9..036c47e1 100644 --- a/cimgen/languages/modernpython/utils/writer.py +++ b/cimgen/languages/modernpython/utils/writer.py @@ -21,24 +21,26 @@ def write( self, outputfile: str, model_id: str, - class_profile_map: Dict[str, BaseProfile], - custom_namespaces: Dict = {}, + class_profile_map: Dict[str, BaseProfile] = {}, + custom_namespaces: Dict[str, str] = {}, ) -> dict[BaseProfile, str]: """Write CIM RDF/XML files. - This function writes CIM objects into one or more RDF/XML files separated by profiles. - Each CIM object will be written to its corresponding profile file depending on class_profile_map. But some objects to more than one file if some attribute profiles are not the same as the class profile. - :param outputfile: Stem of the output file, resulting files: _.xml. - :param model_id: Stem of the model IDs, resulting IDs: _. - :param class_profile_map: Mapping of CIM type to profile. - :param custom_namespaces: Optional[Dict[str, str]]: {"namespace_prefix": "namespace_uri"} - :return: Mapping of profile to outputfile. + Args: + outputfile (str): Stem of the output file, resulting files: _.xml. + model_id (str): Stem of the model IDs, resulting IDs: _. + class_profile_map Optional[Dict[str, str]: Mapping of CIM type to profile. + custom_namespaces Optional[Dict[str, str]: {"namespace_prefix": "namespace_uri"} + + Returns: + Mapping of profile to outputfile. """ profile_list: list[BaseProfile] = list(Profile) - profile_list += {p for p in class_profile_map.values() if p not in profile_list} + if class_profile_map: + profile_list += {p for p in class_profile_map.values() if p not in profile_list} profile_file_map: dict[BaseProfile, str] = {} for profile in profile_list: profile_name = profile.long_name @@ -50,16 +52,19 @@ def write( return profile_file_map def _generate( - self, profile: BaseProfile, model_id: str, custom_namespaces: Dict = {} + self, profile: BaseProfile, model_id: str, custom_namespaces: Dict[str, str] = {} ) -> Optional[etree.ElementTree]: """Write CIM objects as RDF/XML data to a string. This function creates RDF/XML tree corresponding to one profile. - :param profile: Only data for this profile should be written. - :param model_id: Stem of the model IDs, resulting IDs: _. - :param custom_namespaces: Optional[Dict[str, str]]: {"namespace_prefix": "namespace_uri"} - :return: etree of the profile + Args: + profile (BaseProfile): Only data for this profile should be written. + model_id (str): Stem of the model IDs, resulting IDs: _. + custom_namespaces Optional[Dict[str, str]]: {"namespace_prefix": "namespace_uri"} + + Returns: + etree of the profile """ writer_info = {"modelingAuthoritySet": "www.sogno.energy"} writer_info.update(self.writer_metadata)