diff --git a/README.rst b/README.rst index e871ac1b6..8d147c500 100644 --- a/README.rst +++ b/README.rst @@ -138,8 +138,8 @@ Call the generator with the appropriate target: .. code-block:: usage: aas-core-codegen [-h] --model_path MODEL_PATH --snippets_dir - SNIPPETS_DIR --output_dir OUTPUT_DIR --target - {csharp,cpp,golang,java,jsonschema,python,typescript,rdf_shacl,xsd,jsonld_context} + SNIPPETS_DIR --output_dir OUTPUT_DIR --target + {csharp,cpp,golang,java,jsonschema,python,typescript,rdf_shacl,xsd,jsonld_context,protobuf} [--version] Generate implementations and schemas based on an AAS meta-model. @@ -153,7 +153,7 @@ Call the generator with the appropriate target: specific code snippets --output_dir OUTPUT_DIR path to the generated code - --target {csharp,cpp,golang,java,jsonschema,python,typescript,rdf_shacl,xsd,jsonld_context} + --target {csharp,cpp,golang,java,jsonschema,python,typescript,rdf_shacl,xsd,jsonld_context,protobuf} target language or schema --version show the current version and exit diff --git a/aas_core_codegen/cpp/aas_common/__init__.py b/aas_core_codegen/cpp/aas_common/__init__.py index d7b34a8a9..24b395040 100644 --- a/aas_core_codegen/cpp/aas_common/__init__.py +++ b/aas_core_codegen/cpp/aas_common/__init__.py @@ -1,4 +1,5 @@ """Generate C++ code for common functions.""" + from aas_core_codegen.cpp.aas_common import _generate generate_header = _generate.generate_header diff --git a/aas_core_codegen/intermediate/_translate.py b/aas_core_codegen/intermediate/_translate.py index 52e596d1b..d94e1030d 100644 --- a/aas_core_codegen/intermediate/_translate.py +++ b/aas_core_codegen/intermediate/_translate.py @@ -1044,9 +1044,11 @@ def _to_arguments(parsed: Sequence[parse.Argument]) -> List[Argument]: Argument( name=parsed_arg.name, type_annotation=_to_type_annotation(parsed_arg.type_annotation), - default=_DefaultPlaceholder(parsed=parsed_arg.default) # type: ignore - if parsed_arg.default is not None - else None, + default=( + _DefaultPlaceholder(parsed=parsed_arg.default) # type: ignore + if parsed_arg.default is not None + else None + ), parsed=parsed_arg, ) for parsed_arg in parsed @@ -3446,9 +3448,11 @@ def _second_pass_to_stack_constructors_in_place( if ancestor is None: errors.append( Error( - cls.constructor.parsed.node - if cls.constructor.parsed is not None - else cls.parsed.node, + ( + cls.constructor.parsed.node + if cls.constructor.parsed is not None + else cls.parsed.node + ), f"In the constructor of the class {cls.name!r} " f"the super-constructor for " f"the class {statement.super_name!r} is invoked, " @@ -3461,9 +3465,11 @@ def _second_pass_to_stack_constructors_in_place( if id(ancestor) not in cls.inheritance_id_set: errors.append( Error( - cls.constructor.parsed.node - if cls.constructor.parsed is not None - else cls.parsed.node, + ( + cls.constructor.parsed.node + if cls.constructor.parsed is not None + else cls.parsed.node + ), f"In the constructor of the class {cls.name!r} " f"the super-constructor for " f"the class {statement.super_name!r} is invoked, " diff --git a/aas_core_codegen/java/xmlization/_generate.py b/aas_core_codegen/java/xmlization/_generate.py index 02f34dc1a..cd5309235 100644 --- a/aas_core_codegen/java/xmlization/_generate.py +++ b/aas_core_codegen/java/xmlization/_generate.py @@ -2031,7 +2031,7 @@ def _generate_serialize( public static void to( {I}IClass that, {I}XMLStreamWriter writer) throws SerializeException {{ -{I}VisitorWithWriter visitor = new VisitorWithWriter(); +{I}VisitorWithWriter visitor = new VisitorWithWriter(); {I}visitor.visit( {II}that, writer); }}""" diff --git a/aas_core_codegen/main.py b/aas_core_codegen/main.py index 3ffde3951..731171426 100644 --- a/aas_core_codegen/main.py +++ b/aas_core_codegen/main.py @@ -17,6 +17,7 @@ import aas_core_codegen.typescript.main as typescript_main import aas_core_codegen.xsd.main as xsd_main import aas_core_codegen.jsonld.main as jsonld_main +import aas_core_codegen.protobuf.main as protobuf_main from aas_core_codegen import run, specific_implementations from aas_core_codegen.common import LinenoColumner, assert_never @@ -36,6 +37,7 @@ class Target(enum.Enum): RDF_SHACL = "rdf_shacl" XSD = "xsd" JSONLD_CONTEXT = "jsonld_context" + PROTOBUF = "protobuf" class Parameters: @@ -164,6 +166,9 @@ def execute(params: Parameters, stdout: TextIO, stderr: TextIO) -> int: elif params.target is Target.JSONLD_CONTEXT: return jsonld_main.execute(context=run_context, stdout=stdout, stderr=stderr) + elif params.target is Target.PROTOBUF: + return protobuf_main.execute(run_context, stdout=stdout, stderr=stderr) + else: assert_never(params.target) diff --git a/aas_core_codegen/protobuf/__init__.py b/aas_core_codegen/protobuf/__init__.py new file mode 100644 index 000000000..08982afdf --- /dev/null +++ b/aas_core_codegen/protobuf/__init__.py @@ -0,0 +1 @@ +"""Generate ProtoBuf files based on the intermediate meta-model.""" diff --git a/aas_core_codegen/protobuf/common.py b/aas_core_codegen/protobuf/common.py new file mode 100644 index 000000000..9f68232e1 --- /dev/null +++ b/aas_core_codegen/protobuf/common.py @@ -0,0 +1,232 @@ +"""Provide common functions shared among different ProtoBuf code generation modules.""" + +import re +from typing import List, cast, Optional + +from icontract import ensure, require + +from aas_core_codegen import intermediate +from aas_core_codegen.common import Stripped, assert_never +from aas_core_codegen.protobuf import naming as proto_naming + + +@ensure(lambda result: result.startswith('"')) +@ensure(lambda result: result.endswith('"')) +def string_literal(text: str) -> Stripped: + """Generate a ProtoBuf string literal from the ``text``.""" + escaped = [] # type: List[str] + + for character in text: + code_point = ord(character) + + if character == "\a": + escaped.append("\\a") + elif character == "\b": + escaped.append("\\b") + elif character == "\f": + escaped.append("\\f") + elif character == "\n": + escaped.append("\\n") + elif character == "\r": + escaped.append("\\r") + elif character == "\t": + escaped.append("\\t") + elif character == "\v": + escaped.append("\\v") + elif character == '"': + escaped.append('\\"') + elif character == "\\": + escaped.append("\\\\") + elif code_point < 32: + # Non-printable ASCII characters + escaped.append(f"\\x{ord(character):x}") + elif 255 < code_point < 65536: + # Above ASCII + escaped.append(f"\\u{ord(character):04x}") + elif code_point >= 65536: + # Above Unicode Binary Multilingual Pane + escaped.append(f"\\U{ord(character):08x}") + else: + escaped.append(character) + + return Stripped('"{}"'.format("".join(escaped))) + + +def needs_escaping(text: str) -> bool: + """Check whether the ``text`` contains a character that needs escaping.""" + for character in text: + if character == "\a": + return True + elif character == "\b": + return True + elif character == "\f": + return True + elif character == "\n": + return True + elif character == "\r": + return True + elif character == "\t": + return True + elif character == "\v": + return True + elif character == '"': + return True + elif character == "\\": + return True + else: + pass + + return False + + +PRIMITIVE_TYPE_MAP = { + intermediate.PrimitiveType.BOOL: Stripped("bool"), + intermediate.PrimitiveType.INT: Stripped("int64"), + intermediate.PrimitiveType.FLOAT: Stripped("double"), + intermediate.PrimitiveType.STR: Stripped("string"), + intermediate.PrimitiveType.BYTEARRAY: Stripped("bytes"), +} + + +def _assert_all_primitive_types_are_mapped() -> None: + """Assert that we have explicitly mapped all the primitive types to ProtoBuf.""" + all_primitive_literals = set(literal.value for literal in PRIMITIVE_TYPE_MAP) + + mapped_primitive_literals = set( + literal.value for literal in intermediate.PrimitiveType + ) + + all_diff = all_primitive_literals.difference(mapped_primitive_literals) + mapped_diff = mapped_primitive_literals.difference(all_primitive_literals) + + messages = [] # type: List[str] + if len(mapped_diff) > 0: + messages.append( + f"More primitive maps are mapped than there were defined " + f"in the ``intermediate._types``: {sorted(mapped_diff)}" + ) + + if len(all_diff) > 0: + messages.append( + f"One or more primitive types in the ``intermediate._types`` were not " + f"mapped in PRIMITIVE_TYPE_MAP: {sorted(all_diff)}" + ) + + if len(messages) > 0: + raise AssertionError("\n\n".join(messages)) + + +_assert_all_primitive_types_are_mapped() + + +# fmt: off +@require( + lambda our_type_qualifier: + not (our_type_qualifier is not None) + or not our_type_qualifier.endswith('.') +) +# fmt: on +def generate_type( + type_annotation: intermediate.TypeAnnotationUnion, + our_type_qualifier: Optional[Stripped] = None, +) -> Stripped: + """ + Generate the ProtoBuf type for the given type annotation. + + ``our_type_prefix`` is appended to all our types, if specified. + """ + our_type_prefix = "" if our_type_qualifier is None else f"{our_type_qualifier}." + if isinstance(type_annotation, intermediate.PrimitiveTypeAnnotation): + return PRIMITIVE_TYPE_MAP[type_annotation.a_type] + + elif isinstance(type_annotation, intermediate.OurTypeAnnotation): + our_type = type_annotation.our_type + + if isinstance(our_type, intermediate.Enumeration): + return Stripped( + our_type_prefix + proto_naming.enum_name(type_annotation.our_type.name) + ) + + elif isinstance(our_type, intermediate.ConstrainedPrimitive): + return PRIMITIVE_TYPE_MAP[our_type.constrainee] + + elif isinstance(our_type, intermediate.Class): + return Stripped(our_type_prefix + proto_naming.class_name(our_type.name)) + + elif isinstance(type_annotation, intermediate.ListTypeAnnotation): + item_type = generate_type( + type_annotation=type_annotation.items, our_type_qualifier=our_type_qualifier + ) + + return Stripped(f"repeated {item_type}") + + elif isinstance(type_annotation, intermediate.OptionalTypeAnnotation): + value = generate_type( + type_annotation=type_annotation.value, our_type_qualifier=our_type_qualifier + ) + + # careful: do not generate "optional" keyword for list-type elements since otherwise we get invalid + # constructs like "optional repeated " + if isinstance(type_annotation.value, intermediate.ListTypeAnnotation): + return Stripped(f"{value}") + else: + return Stripped(f"optional {value}") + + else: + assert_never(type_annotation) + + raise AssertionError("Should not have gotten here") + + +INDENT = " " +INDENT2 = INDENT * 2 +INDENT3 = INDENT * 3 +INDENT4 = INDENT * 4 +INDENT5 = INDENT * 5 +INDENT6 = INDENT * 6 + +# noinspection RegExpSimplifiable +NAMESPACE_IDENTIFIER_RE = re.compile( + r"[a-zA-Z_][a-zA-Z_0-9]*(\.[a-zA-Z_][a-zA-Z_0-9]*)*" +) + + +class NamespaceIdentifier(str): + """Capture a namespace identifier.""" + + @require(lambda identifier: NAMESPACE_IDENTIFIER_RE.fullmatch(identifier)) + def __new__(cls, identifier: str) -> "NamespaceIdentifier": + return cast(NamespaceIdentifier, identifier) + + +WARNING = Stripped( + """\ +/* + * This code has been automatically generated by aas-core-codegen. + * Do NOT edit or append. + */""" +) + + +# fmt: off +@ensure( + lambda namespace, result: + not (namespace != "Aas") or len(result) == 1, + "Exactly one block of stripped text to be appended to the list of using directives " + "if this using directive is necessary" +) +@ensure( + lambda namespace, result: + not (namespace == "Aas") or len(result) == 0, + "Empty list if no directive is necessary" +) +# fmt: on +def generate_using_aas_directive_if_necessary( + namespace: NamespaceIdentifier, +) -> List[Stripped]: + """ + Generates the import directive for the AAS namespace. + + This method is not to be used because proto3 does not need namespaces. + """ + raise NotImplementedError("Not using the Aas namespace.") diff --git a/aas_core_codegen/protobuf/description.py b/aas_core_codegen/protobuf/description.py new file mode 100644 index 000000000..1f2e039f8 --- /dev/null +++ b/aas_core_codegen/protobuf/description.py @@ -0,0 +1,1102 @@ +"""Render descriptions to ProtoBuf documentation comments.""" + +import abc +import collections +import io +import itertools +import textwrap +import xml.sax.saxutils +from typing import ( + Tuple, + Optional, + List, + OrderedDict, + Union, + Sequence, + TypeVar, + Iterator, + Iterable, +) + +import docutils.nodes +import docutils.parsers.rst.roles +import docutils.utils +from icontract import require, ensure, DBC + +from aas_core_codegen import intermediate +from aas_core_codegen.common import ( + Stripped, + Error, + assert_never, + Identifier, + assert_union_of_descendants_exhaustive, + assert_union_without_excluded, +) +from aas_core_codegen.protobuf import ( + naming as proto_naming, +) +from aas_core_codegen.intermediate import ( + doc as intermediate_doc, + _translate as intermediate_translate, +) + + +class _Node(DBC): + """Represent a node in an AST of a documentation comment.""" + + @abc.abstractmethod + def accept(self, visitor: "_NodeVisitor") -> None: + """Accept the ``visitor`` and dispatch.""" + raise NotImplementedError() + + +class _Text(_Node): + """Represent a text node in a documentation comment.""" + + def __init__(self, content: str) -> None: + """Initialize with the given values.""" + self.content = content + + def accept(self, visitor: "_NodeVisitor") -> None: + """Accept the ``visitor`` and dispatch.""" + visitor.visit_text(self) + + def __repr__(self) -> str: + """Generate a string representation for easier debugging.""" + return f"{self.__class__.__name__}({self.content!r})" + + +class _List(_Node): + """ + Represent a sequence of nodes of a ProtoBuf documentation comment. + + This is necessary so that we can render a concatenation where there is no + enclosing element. + """ + + def __init__(self, items: List["_NodeUnion"]) -> None: + self.items = items + + def accept(self, visitor: "_NodeVisitor") -> None: + """Accept the ``visitor`` and dispatch.""" + visitor.visit_list(self) + + def __repr__(self) -> str: + """Generate a string representation for easier debugging.""" + if len(self.items) == 0: + return f"{self.__class__.__name__}([])" + + writer = io.StringIO() + writer.write(f"{self.__class__.__name__}(\n") + writer.write(" [\n") + + for i, item in enumerate(self.items): + if i > 0: + writer.write(",\n") + writer.write(textwrap.indent(repr(item), " ")) + + writer.write("\n ]\n)") + return writer.getvalue() + + +class _Element(_Node): + """Represent an element of a ProtoBuf documentation comment.""" + + def __init__( + self, + name: str, + attrs: Optional[OrderedDict[str, str]] = None, + children: Optional[_List] = None, + ) -> None: + self.name = name + self.attrs = collections.OrderedDict() if attrs is None else attrs + self.children = _List(items=[]) if children is None else children + + def accept(self, visitor: "_NodeVisitor") -> None: + """Accept the ``visitor`` and dispatch.""" + visitor.visit_element(self) + + def __repr__(self) -> str: + """Generate a string representation for easier debugging.""" + indented_name = textwrap.indent(repr(self.name), " ") + indented_attrs = textwrap.indent(repr(self.attrs), " ") + indented_children = textwrap.indent(repr(self.children), " ") + + return f"""\ +{self.__class__.__name__}( +{indented_name}, +{indented_attrs}, +{indented_children} +)""" + + +_NodeUnion = Union[_Text, _Element, _List] +assert_union_of_descendants_exhaustive(base_class=_Node, union=_NodeUnion) + + +class _NodeVisitor: + def visit(self, node: _Node) -> None: + """Visit *via* double-dispatch.""" + node.accept(self) + + def visit_text(self, node: _Text) -> None: + """Visit the text node.""" + pass + + def visit_list(self, node: _List) -> None: + """Visit the node list and its items recursively.""" + for item in node.items: + self.visit(item) + + def visit_element(self, node: _Element) -> None: + """Visit the element node and its children.""" + self.visit(node.children) + + +class _ElementRenderer(intermediate_doc.DocutilsElementTransformer[_NodeUnion]): + """Render descriptions as ProtoBuf docstring XML.""" + + def transform_text( + self, element: docutils.nodes.Text + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + return _Text(element.astext()), None + + def transform_reference_to_our_type_in_doc( + self, element: intermediate_doc.ReferenceToOurType + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + name: str + + if isinstance(element.our_type, intermediate.Enumeration): + name = proto_naming.enum_name(element.our_type.name) + + elif isinstance(element.our_type, intermediate.ConstrainedPrimitive): + # NOTE (mristin, 2021-12-17): + # We do not generate a class for constrained primitives, but we + # leave it as class name, as that is what we used for ``Verify*`` function. + name = proto_naming.class_name(element.our_type.name) + + elif isinstance(element.our_type, intermediate.Class): + if isinstance(element.our_type, intermediate.AbstractClass): + # NOTE (mristin, 2021-12-25): + # We do not generate ProtoBuf code for abstract classes, so we have to refer + # to the interface. + name = proto_naming.class_name(element.our_type.name) + + elif isinstance(element.our_type, intermediate.ConcreteClass): + # NOTE (mristin, 2021-12-25): + # Though a concrete class can have multiple descendants and the writer + # might actually want to refer to the *interface* instead of + # the concrete class, we do the best effort here and resolve it to the + # name of the concrete class. + name = proto_naming.class_name(element.our_type.name) + + else: + assert_never(element.our_type) + + else: + # NOTE (mristin, 2022-03-30): + # This is a very special case where we had problems with an interface. + # We leave this check here, just in case the bug resurfaces. + if isinstance(element.our_type, intermediate_translate._PlaceholderOurType): + return None, [ + f"Unexpected placeholder for our type: {element.our_type}; " + f"this is a bug" + ] + + assert_never(element.our_type) + + # NOTE (mristin, 2022-06-19): + # We need to prefix the cref in case there are naming conflicts. + prefixed_name = f"Aas.{name}" + + return ( + _Element( + name="see", attrs=collections.OrderedDict([("cref", prefixed_name)]) + ), + None, + ) + + def transform_reference_to_attribute_in_doc( + self, element: intermediate_doc.ReferenceToAttribute + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + cref: str + + if isinstance(element.reference, intermediate_doc.ReferenceToProperty): + name_of_our_type: str + + if isinstance(element.reference.cls, intermediate.AbstractClass): + # We do not generate ProtoBuf code for abstract classes, so we have to refer + # to the interface. + name_of_our_type = proto_naming.class_name(element.reference.cls.name) + elif isinstance(element.reference.cls, intermediate.ConcreteClass): + # NOTE (mristin, 2021-12-25): + # Though a concrete class can have multiple descendants and the writer + # might actually want to refer to the *interface* instead of + # the concrete class, we do the best effort here and resolve it to the + # name of the concrete class. + + name_of_our_type = proto_naming.class_name(element.reference.cls.name) + else: + assert_never(element.reference.cls) + + prop_name = proto_naming.property_name(element.reference.prop.name) + + cref = f"{name_of_our_type}.{prop_name}" + elif isinstance( + element.reference, intermediate_doc.ReferenceToEnumerationLiteral + ): + name_of_our_type = proto_naming.enum_name( + element.reference.enumeration.name + ) + literal_name = proto_naming.enum_literal_name( + element.reference.literal.name + ) + + cref = f"{name_of_our_type}.{literal_name}" + else: + # NOTE (mristin, 2022-03-30): + # This is a very special case where we had problems with an interface. + # We leave this check here, just in case the bug resurfaces. + if isinstance( + element.reference, + intermediate_translate._PlaceholderReferenceToAttribute, + ): + return None, [ + f"Unexpected placeholder " + f"for the attribute reference: {element.reference}; " + f"this is a bug" + ] + + assert_never(element.reference) + + # NOTE (mristin, 2022-06-19): + # We need to prefix the cref in case there are naming conflicts. + prefixed_cref = f"Aas.{cref}" + + return ( + _Element( + name="see", attrs=collections.OrderedDict([("cref", prefixed_cref)]) + ), + None, + ) + + def transform_reference_to_argument_in_doc( + self, element: intermediate_doc.ReferenceToArgument + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + arg_name = proto_naming.argument_name(Identifier(element.reference)) + + return ( + _Element( + name="paramref", attrs=collections.OrderedDict([("name", arg_name)]) + ), + None, + ) + + def transform_reference_to_constraint_in_doc( + self, element: intermediate_doc.ReferenceToConstraint + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + return _Text(content=f"Constraint {element.reference}"), None + + def transform_reference_to_constant_in_doc( + self, element: intermediate_doc.ReferenceToConstant + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + constant_as_prop_name = proto_naming.property_name(element.constant.name) + cref = f"Aas.Constants.{constant_as_prop_name}" + + return ( + _Element(name="see", attrs=collections.OrderedDict([("cref", cref)])), + None, + ) + + def transform_literal( + self, element: docutils.nodes.literal + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + return ( + _Element(name="c", children=_List(items=[_Text(content=element.astext())])), + None, + ) + + def _transform_children_of( + self, + element: docutils.nodes.Element, + ) -> Tuple[Optional[_List], Optional[List[str]]]: + """Transform the children to a Python list.""" + children = [] # type: List[_NodeUnion] + + errors = [] # type: List[str] + for child in element.children: + rendered_child, child_errors = self.transform(child) + if child_errors is not None: + errors.extend(child_errors) + else: + assert rendered_child is not None + children.append(rendered_child) + + if len(errors) > 0: + return None, errors + + return _List(items=children), None + + def transform_paragraph( + self, element: docutils.nodes.paragraph + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + children, errors = self._transform_children_of(element) + if errors is not None: + return None, errors + + assert children is not None + + return _Element(name="para", children=children), None + + def transform_emphasis( + self, element: docutils.nodes.emphasis + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + children, errors = self._transform_children_of(element) + if errors is not None: + return None, errors + + assert children is not None + + return _Element(name="em", children=children), None + + def transform_list_item( + self, element: docutils.nodes.list_item + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + children, errors = self._transform_children_of(element) + if errors is not None: + return None, errors + + assert children is not None + + return _Element(name="li", children=children), None + + def transform_bullet_list( + self, element: docutils.nodes.bullet_list + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + children, errors = self._transform_children_of(element) + if errors is not None: + return None, errors + + assert children is not None + + return _Element(name="ul", children=children), None + + def transform_note( + self, element: docutils.nodes.note + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + children, errors = self._transform_children_of(element) + + if errors is not None: + return None, errors + + assert children is not None + + return _Element(name="para", children=children), None + + def transform_reference( + self, element: docutils.nodes.reference + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + return self._transform_children_of(element) + + def transform_field_body( + self, element: docutils.nodes.field_body + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + return self._transform_children_of(element) + + def transform_document( + self, element: docutils.nodes.document + ) -> Tuple[Optional[_NodeUnion], Optional[List[str]]]: + return self._transform_children_of(element) + + +class _FlattenListVisitor(_NodeVisitor): + """Flatten all the node lists recursively and in-place.""" + + def visit_list(self, node: _List) -> None: + """Visit the node list and its items recursively.""" + new_items = [] # type: List[_NodeUnion] + + for item in node.items: + self.visit(item) + + if isinstance(item, _List): + new_items.extend(item.items) + else: + new_items.append(item) + + node.items = new_items + + +class _ConcatenateTextVisitor(_NodeVisitor): + """Concatenate the consecutive text elements in lists recursively and in-place.""" + + def visit_list(self, node: _List) -> None: + """Visit the node list and its items recursively.""" + new_items = [] # type: List[_NodeUnion] + + accumulator = [] # type: List[_Text] + for item in node.items: + if isinstance(item, _Text): + accumulator.append(item) + else: + self.visit(item) + + if len(accumulator) > 0: + new_items.append( + _Text( + content="".join( + text_element.content for text_element in accumulator + ) + ) + ) + accumulator = [] + + new_items.append(item) + + if len(accumulator) > 0: + new_items.append( + _Text( + content="".join( + text_element.content for text_element in accumulator + ) + ) + ) + + node.items = new_items + + +class _RemoveRedundantParaVisitor(_NodeVisitor): + """Remove the redundant ```` elements in-place.""" + + def visit_element(self, node: _Element) -> None: + self.visit(node.children) + + # noinspection PyUnresolvedReferences + if ( + node.name in ("summary", "remarks", "li", "param", "returns", "para") + and len(node.children.items) == 1 + and isinstance(node.children.items[0], _Element) + and node.children.items[0].name == "para" + ): + # noinspection PyUnresolvedReferences + node.children = node.children.items[0].children + + +def _compress_node_in_place(node: _NodeUnion) -> None: + """Remove redundant nodes for more readability in the rendered text.""" + flatten_list_visitor = _FlattenListVisitor() + flatten_list_visitor.visit(node) + + concatenate_text_visitor = _ConcatenateTextVisitor() + concatenate_text_visitor.visit(node) + + remove_redundant_para_visitor = _RemoveRedundantParaVisitor() + remove_redundant_para_visitor.visit(node) + + +def _render_summary_remarks( + description: intermediate.SummaryRemarksDescription, +) -> Tuple[Optional[_List], Optional[List[Error]]]: + """Render a description to our description node.""" + result_items = [] # type: List[_NodeUnion] + errors = [] # type: List[Error] + + element_renderer = _ElementRenderer() + + summary_node, summary_errors = element_renderer.transform(description.summary) + if summary_errors is not None: + errors.extend( + Error(description.parsed.node, message) for message in summary_errors + ) + else: + assert summary_node is not None + + result_items.append( + _Element(name="summary", children=_List(items=[summary_node])) + ) + + remark_nodes = [] # type: List[_NodeUnion] + for remark in description.remarks: + remark_node, remark_errors = element_renderer.transform(remark) + if remark_errors: + errors.extend( + Error(description.parsed.node, message) for message in remark_errors + ) + else: + assert remark_node is not None + remark_nodes.append(remark_node) + + if len(errors) > 0: + return None, errors + + if len(remark_nodes) > 0: + result_items.append( + _Element(name="remarks", children=_List(items=remark_nodes)) + ) + + return _List(items=result_items), None + + +def _render_summary_remarks_constraints( + description: intermediate.SummaryRemarksConstraintsDescription, +) -> Tuple[Optional[_List], Optional[List[Error]]]: + """Render a description where constraints are put in remarks.""" + result_items = [] # type: List[_NodeUnion] + errors = [] # type: List[Error] + + element_renderer = _ElementRenderer() + + summary_node, summary_errors = element_renderer.transform(description.summary) + if summary_errors is not None: + errors.extend( + Error(description.parsed.node, message) for message in summary_errors + ) + else: + assert summary_node is not None + result_items.append( + _Element(name="summary", children=_List(items=[summary_node])) + ) + + remark_nodes = [] # type: List[_NodeUnion] + for remark in description.remarks: + remark_node, remark_errors = element_renderer.transform(remark) + if remark_errors: + errors.extend( + Error(description.parsed.node, message) for message in remark_errors + ) + else: + assert remark_node is not None + remark_nodes.append(remark_node) + + constraint_nodes = [] # type: List[_NodeUnion] + for identifier, docutils_element in description.constraints_by_identifier.items(): + body, body_errors = element_renderer.transform(docutils_element) + if body_errors is not None: + errors.extend( + Error(description.parsed.node, message) for message in body_errors + ) + else: + assert body is not None + + # NOTE (mristin, 2022-07-21): + # We in-line the constraint prefix for better readability. + + # noinspection PyUnresolvedReferences + if ( + isinstance(body, _List) + and len(body.items) > 0 + and isinstance(body.items[0], _Element) + and body.items[0].name == "para" + ): + # noinspection PyUnresolvedReferences + body.items[0].children.items.insert( + 0, _Text(content=f"Constraint {identifier}:\n") + ) + + constraint_node = _Element(name="li", children=body) + else: + constraint_node = _Element( + name="li", + children=_List( + items=[ + _Element( + name="para", + children=_List( + items=[_Text(content=f"Constraint {identifier}:\n")] + ), + ), + body, + ] + ), + ) + + constraint_nodes.append(constraint_node) + + if len(errors) > 0: + return None, errors + + if len(constraint_nodes) > 0: + remark_nodes.append( + _Element(name="para", children=_List(items=[_Text(content="Constraints:")])) + ) + + ul_node = _Element(name="ul", children=_List(items=constraint_nodes)) + + remark_nodes.append(ul_node) + + if len(remark_nodes) > 0: + result_items.append( + _Element(name="remarks", children=_List(items=remark_nodes)) + ) + + return _List(items=result_items), None + + +@require(lambda line: "\n" not in line) +def _slash_slash_slash_line(line: str) -> str: + """Prepend ``///`` to the ``line``.""" + if len(line) == 0: + return "///" + + return f"/// {line}" + + +class _RelativeIndention: + """ + Represent the relative indention. + + Since the indention is *relative*, it can be either positive, neutral or negative. + """ + + @require(lambda direction: direction in (-1, 0, 1)) + def __init__(self, direction: int) -> None: + self.direction = direction + + def __repr__(self) -> str: + """Generate text representation for easier debugging.""" + return f"{self.__class__.__name__}({self.direction!r})" + + +class _TextBlock(DBC): + """ + Represent a block of text. + + This data structure is expected to be append-only mutable, where you keep adding + new parts to the block. The parts are later expected to be joined by an empty + string. + + All the text blocks are expected to be joined by empty strings. + """ + + def __init__(self, parts: List[str]) -> None: + """Initialize with the given values.""" + self.parts = parts + + def __repr__(self) -> str: + """Generate text representation for easier debugging.""" + return f"{self.__class__.__name__}({self.parts!r})" + + +class _EnforceNewLine(DBC): + """ + Enforce that the following text starts on a new line. + + If there is already a new line output before, this text directive has no influence. + """ + + def __repr__(self) -> str: + """Generate text representation for easier debugging.""" + return f"{self.__class__.__name__}()" + + +_TextDirective = Union[_RelativeIndention, _TextBlock, _EnforceNewLine] + + +class _ToTextDirectivesVisitor(_NodeVisitor): + """ + Convert the nodes to a text as represented by text control directives. + + The text is expected to be valid XML and properly escaped. + """ + + #: The resulting text control directives + directives: List[_TextDirective] + + def __init__(self) -> None: + self.directives = [] + + def _last_or_new_block(self) -> _TextBlock: + """Retrieve the last block, or initialize a new block, if no last block.""" + if len(self.directives) == 0 or not isinstance(self.directives[-1], _TextBlock): + self.directives.append(_TextBlock(parts=[])) + + assert isinstance(self.directives[-1], _TextBlock) + return self.directives[-1] + + def visit_text(self, node: _Text) -> None: + self._last_or_new_block().parts.append(xml.sax.saxutils.escape(node.content)) + + def visit_element(self, node: _Element) -> None: + """Visit the element node and its children.""" + if node.name in ("summary", "remarks", "para", "param", "returns"): + # NOTE (mristin, 2022-07-18): + # We render these tags without indention for better readability. + + start_element_writer = io.StringIO() + start_element_writer.write(f"<{node.name}") + if len(node.attrs) > 0: + for attr_name, attr_value in node.attrs.items(): + start_element_writer.write( + f" {attr_name}={xml.sax.saxutils.quoteattr(attr_value)}" + ) + start_element_writer.write(">") + + self.directives.append(_EnforceNewLine()) + + self.directives.append(_TextBlock(parts=[start_element_writer.getvalue()])) + + self.directives.append(_EnforceNewLine()) + + for item in node.children.items: + self.visit(item) + + self.directives.append(_EnforceNewLine()) + + self.directives.append(_TextBlock(parts=[f""])) + + elif node.name in ("ul", "li"): + # NOTE (mristin, 2022-07-18): + # We put the list elements on new lines and indent them. + assert ( + len(node.attrs) == 0 + ), f"Unexpected attributes in a node {node.name!r}" + + self.directives.append(_EnforceNewLine()) + + self.directives.append(_TextBlock(parts=[f"<{node.name}>"])) + + self.directives.append(_EnforceNewLine()) + + self.directives.append(_RelativeIndention(direction=1)) + + for item in node.children.items: + self.visit(item) + + self.directives.append(_EnforceNewLine()) + + self.directives.append(_RelativeIndention(direction=-1)) + + self.directives.append(_TextBlock(parts=[f""])) + + else: + # NOTE (mristin, 2022-07-18): + # We inline all the other elements. + + start_element_writer = io.StringIO() + start_element_writer.write(f"<{node.name}") + if len(node.attrs) > 0: + for attr_name, attr_value in node.attrs.items(): + start_element_writer.write( + f" {attr_name}={xml.sax.saxutils.quoteattr(attr_value)}" + ) + + if len(node.children.items) == 0: + start_element_writer.write(" />") + self._last_or_new_block().parts.append(start_element_writer.getvalue()) + + return + + start_element_writer.write(">") + self._last_or_new_block().parts.append(start_element_writer.getvalue()) + + for item in node.children.items: + self.visit(item) + + self._last_or_new_block().parts.append(f"") + + +_TextDirectiveExceptEnforceNewLine = Union[_RelativeIndention, _TextBlock] +assert_union_without_excluded( + original_union=_TextDirective, + subset_union=_TextDirectiveExceptEnforceNewLine, + excluded=[_EnforceNewLine], +) + +T = TypeVar("T") + + +def pairwise(iterable: Iterable[T]) -> Iterator[Tuple[T, T]]: + """Iterate pair-wise over the iterator.""" + a, b = itertools.tee(iterable) + next(b, None) + return zip(a, b) + + +# fmt: off +@ensure( + lambda result: + all( + len(directive.parts) > 0 + for directive in result + if isinstance(directive, _TextBlock) + ), + "No empty text blocks" +) +@ensure( + lambda result: + all( + not ( + isinstance(prev, _TextBlock) + and isinstance(current, _TextBlock) + ) + for prev, current in pairwise(result) + ), + "All text blocks are merged and there are no consecutive text blocks" +) +# fmt: on +def _compress_text_directives( + directives: Sequence[_TextDirective], +) -> List[_TextDirectiveExceptEnforceNewLine]: + """Merge consecutive text blocks and enforce the new lines.""" + # region Remove empty blocks + + directives_wo_empty_blocks = [ + directive + for directive in directives + if not (isinstance(directive, _TextBlock) and len(directive.parts) == 0) + ] + + # endregion + + # region Fulfill new-line enforcement + + directives_wo_enforce_new_line = ( + [] + ) # type: List[_TextDirectiveExceptEnforceNewLine] + + previous_text_block = None # type: Optional[_TextBlock] + + for directive in directives_wo_empty_blocks: + if isinstance(directive, _EnforceNewLine): + if previous_text_block is not None: + assert len(previous_text_block.parts) > 0 + + if not previous_text_block.parts[-1].endswith("\n"): + previous_text_block.parts.append("\n") + elif isinstance(directive, _TextBlock): + assert len(directive.parts) > 0 + + previous_text_block = directive + directives_wo_enforce_new_line.append(directive) + + elif isinstance(directive, _RelativeIndention): + directives_wo_enforce_new_line.append(directive) + else: + assert_never(directive) + + # endregion + + # region Merge consecutive text blocks + + directives_w_merged_blocks = [] # type: List[_TextDirectiveExceptEnforceNewLine] + + for directive in directives_wo_enforce_new_line: + if isinstance(directive, _TextBlock): + assert len(directive.parts) > 0 + + if len(directives_w_merged_blocks) > 0 and isinstance( + directives_w_merged_blocks[-1], _TextBlock + ): + directives_w_merged_blocks[-1].parts.extend(directive.parts) + else: + directives_w_merged_blocks.append(directive) + else: + directives_w_merged_blocks.append(directive) + + # endregion + + return directives_w_merged_blocks + + +def _to_text(node: _NodeUnion) -> str: + """ + Convert the node to a text representation. + + For readability and no phantom elements, the ``node`` is expected to be compressed + before. + """ + to_text_directives_visitor = _ToTextDirectivesVisitor() + to_text_directives_visitor.visit(node) + + # NOTE (mristin, 2022-07-18): + # We compress to do away with the new-line enforcement and consecutive and empty + # blocks, so that the operations below become much easier to write. + directives = _compress_text_directives(to_text_directives_visitor.directives) + + writer = io.StringIO() + level = 0 # indention level + + for directive in directives: + if isinstance(directive, _TextBlock): + writer.write(textwrap.indent("".join(directive.parts), level * " ")) + + elif isinstance(directive, _RelativeIndention): + assert level + directive.direction >= 0, ( + f"Negative absolute indention not possible: " + f"{level=}, {directive.direction=}" + ) + level += directive.direction + + else: + assert_never(directive) + + return writer.getvalue() + + +def _generate_summary_remarks( + description: intermediate.SummaryRemarksDescription, +) -> Tuple[Optional[Stripped], Optional[List[Error]]]: + """Generate the documentation comment for a summary-remarks description.""" + node, errors = _render_summary_remarks(description=description) + if errors is not None: + return None, errors + + assert node is not None + + _compress_node_in_place(node=node) + text = _to_text(node) + + commented_lines = [_slash_slash_slash_line(line) for line in text.splitlines()] + + return Stripped("\n".join(commented_lines)), None + + +def _generate_summary_remarks_constraints( + description: intermediate.SummaryRemarksConstraintsDescription, +) -> Tuple[Optional[Stripped], Optional[List[Error]]]: + """Generate the documentation comment for a summary-remarks-constraints.""" + node, errors = _render_summary_remarks_constraints(description=description) + if errors is not None: + return None, errors + + assert node is not None + + _compress_node_in_place(node=node) + text = _to_text(node) + + commented_lines = [_slash_slash_slash_line(line) for line in text.splitlines()] + + return Stripped("\n".join(commented_lines)), None + + +def generate_comment_for_meta_model( + description: intermediate.DescriptionOfMetaModel, +) -> Tuple[Optional[Stripped], Optional[List[Error]]]: + """Generate the documentation comment for the given meta-model.""" + return _generate_summary_remarks_constraints(description) + + +def generate_comment_for_our_type( + description: intermediate.DescriptionOfOurType, +) -> Tuple[Optional[Stripped], Optional[List[Error]]]: + """Generate the documentation comment for our type.""" + return _generate_summary_remarks_constraints(description) + + +def generate_comment_for_property( + description: intermediate.DescriptionOfProperty, +) -> Tuple[Optional[Stripped], Optional[List[Error]]]: + """Generate the documentation comment for the given property.""" + return _generate_summary_remarks_constraints(description) + + +def generate_comment_for_enumeration_literal( + description: intermediate.DescriptionOfEnumerationLiteral, +) -> Tuple[Optional[Stripped], Optional[List[Error]]]: + """Generate the documentation comment for the given enumeration literal.""" + return _generate_summary_remarks(description) + + +def _render_description_of_signature( + description: intermediate.DescriptionOfSignature, +) -> Tuple[Optional[_List], Optional[List[Error]]]: + """Render a description where constraints are put in remarks.""" + result_items = [] # type: List[_NodeUnion] + errors = [] # type: List[Error] + + renderer = _ElementRenderer() + + summary_node, summary_errors = renderer.transform(description.summary) + if summary_errors is not None: + errors.extend( + Error(description.parsed.node, message) for message in summary_errors + ) + else: + assert summary_node is not None + result_items.append( + _Element(name="summary", children=_List(items=[summary_node])) + ) + + remark_nodes = [] # type: List[_NodeUnion] + for remark in description.remarks: + remark_node, remark_errors = renderer.transform(remark) + if remark_errors: + errors.extend( + Error(description.parsed.node, message) for message in remark_errors + ) + else: + assert remark_node is not None + remark_nodes.append(remark_node) + + param_nodes = [] # type: List[_NodeUnion] + + for name, docutils_element in description.arguments_by_name.items(): + returns, body_errors = renderer.transform(docutils_element) + if body_errors is not None: + errors.extend( + Error(description.parsed.node, message) for message in body_errors + ) + else: + assert returns is not None + + param_nodes.append( + _Element( + name="param", + attrs=collections.OrderedDict([("name", name)]), + children=_List(items=[returns]), + ) + ) + + returns_node = None # type: Optional[_NodeUnion] + + if description.returns is not None: + # NOTE (mristin, 2022-07-18): + # We need to help the type checker in PyCharm a bit. + assert isinstance(description.returns, docutils.nodes.field_body) + + returns, returns_errors = renderer.transform(description.returns) + if returns_errors is not None: + errors.extend( + Error(description.parsed.node, message) for message in returns_errors + ) + else: + assert returns is not None + + returns_node = _Element(name="returns", children=_List(items=[returns])) + + if len(errors) > 0: + return None, errors + + if len(remark_nodes) > 0: + result_items.append( + _Element(name="remarks", children=_List(items=remark_nodes)) + ) + + result_items.extend(param_nodes) + + if returns_node is not None: + result_items.append(returns_node) + + return _List(items=result_items), None + + +def generate_comment_for_signature( + description: intermediate.DescriptionOfSignature, +) -> Tuple[Optional[Stripped], Optional[List[Error]]]: + """ + Generate the documentation comment for the given signature. + + A signature, in this context, means a function or a method signature. + """ + node, errors = _render_description_of_signature(description=description) + if errors is not None: + return None, errors + + assert node is not None + + _compress_node_in_place(node=node) + text = _to_text(node) + + commented_lines = [_slash_slash_slash_line(line) for line in text.splitlines()] + + return Stripped("\n".join(commented_lines)), None diff --git a/aas_core_codegen/protobuf/main.py b/aas_core_codegen/protobuf/main.py new file mode 100644 index 000000000..b2e8b74db --- /dev/null +++ b/aas_core_codegen/protobuf/main.py @@ -0,0 +1,117 @@ +"""Generate ProtoBuf code to handle asset administration shells based on the meta-model.""" + +from typing import TextIO + +from aas_core_codegen import specific_implementations, run, intermediate +from aas_core_codegen.protobuf import ( + common as proto_common, + structure as proto_structure, +) + + +def execute(context: run.Context, stdout: TextIO, stderr: TextIO) -> int: + """Generate the code.""" + verified_ir_table, errors = proto_structure.verify( + symbol_table=context.symbol_table + ) + + if errors is not None: + run.write_error_report( + message=f"Failed to verify the intermediate symbol table " + f"for generation of ProtoBuf code" + f"based on {context.model_path}", + errors=[context.lineno_columner.error_message(error) for error in errors], + stderr=stderr, + ) + return 1 + + assert verified_ir_table is not None + + unsupported_contracts_errors = ( + intermediate.errors_if_contracts_for_functions_or_methods_defined( + verified_ir_table + ) + ) + if unsupported_contracts_errors is not None: + run.write_error_report( + message=f"We do not support pre and post-conditions and snapshots " + f"at the moment. Please notify the developers if you need this " + f"feature (based on meta-model {context.model_path})", + errors=[ + context.lineno_columner.error_message(error) + for error in unsupported_contracts_errors + ], + stderr=stderr, + ) + return 1 + + unsupported_methods_errors = ( + intermediate.errors_if_non_implementation_specific_methods(verified_ir_table) + ) + if unsupported_methods_errors is not None: + run.write_error_report( + message=f"We added some support for understood methods already and keep " + f"maintaining it as it is only a matter of time when we will " + f"introduce their transpilation. Introducing them after the fact " + f"would have been much more difficult.\n" + f"\n" + f"At the given moment, however, we deliberately focus only on " + f"implementation-specific methods. " + f"(based on meta-model {context.model_path})", + errors=[ + context.lineno_columner.error_message(error) + for error in unsupported_methods_errors + ], + stderr=stderr, + ) + return 1 + + namespace_key = specific_implementations.ImplementationKey("namespace.txt") + namespace_text = context.spec_impls.get(namespace_key, None) + if namespace_text is None: + stderr.write(f"The namespace snippet is missing: {namespace_key}\n") + return 1 + + if not proto_common.NAMESPACE_IDENTIFIER_RE.fullmatch(namespace_text): + stderr.write( + f"The text from the snippet {namespace_key} " + f"is not a valid namespace identifier: {namespace_text!r}\n" + ) + return 1 + + namespace = proto_common.NamespaceIdentifier(namespace_text) + + # region Structure + + code, errors = proto_structure.generate( + symbol_table=verified_ir_table, + namespace=namespace, + spec_impls=context.spec_impls, + ) + + if errors is not None: + run.write_error_report( + message=f"Failed to generate the structures in the ProtoBuf code " + f"based on {context.model_path}", + errors=[context.lineno_columner.error_message(error) for error in errors], + stderr=stderr, + ) + return 1 + + assert code is not None + + pth = context.output_dir / "types.proto" + try: + pth.write_text(code, encoding="utf-8") + except Exception as exception: + run.write_error_report( + message=f"Failed to write the ProtoBuf structures to {pth}", + errors=[str(exception)], + stderr=stderr, + ) + return 1 + + # endregion + + stdout.write(f"Code generated to: {context.output_dir}\n") + return 0 diff --git a/aas_core_codegen/protobuf/naming.py b/aas_core_codegen/protobuf/naming.py new file mode 100644 index 000000000..4e1b5627b --- /dev/null +++ b/aas_core_codegen/protobuf/naming.py @@ -0,0 +1,137 @@ +"""Generate ProtoBuf identifiers based on the identifiers from the meta-model.""" + +from typing import Union + +import aas_core_codegen.naming +from aas_core_codegen import intermediate +from aas_core_codegen.common import Identifier, assert_never + + +def interface_name(identifier: Identifier) -> Identifier: + """ + Generate a ProtoBuf name for an interface based on its meta-model ``identifier``. + + This method is not to be used because proto3 does not support interfaces. + """ + raise NotImplementedError("Interfaces are not supported by proto3.") + + +def enum_name(identifier: Identifier) -> Identifier: + """ + Generate a ProtoBuf name for an enum based on its meta-model ``identifier``. + + >>> enum_name(Identifier("something")) + 'Something' + + >>> enum_name(Identifier("URL_to_something")) + 'UrlToSomething' + """ + return aas_core_codegen.naming.capitalized_camel_case(identifier) + + +def enum_literal_name(identifier: Identifier) -> Identifier: + """ + Generate a Protobuf name for an enum literal based on its meta-model ``identifier``. + + >>> enum_literal_name(Identifier("something")) + 'SOMETHING' + + >>> enum_literal_name(Identifier("URL_to_something")) + 'URL_TO_SOMETHING' + """ + return aas_core_codegen.naming.upper_snake_case(identifier) + + +def class_name(identifier: Identifier) -> Identifier: + """ + Generate a ProtoBuf name for a class based on its meta-model ``identifier``. + + >>> class_name(Identifier("something")) + 'Something' + + >>> class_name(Identifier("URL_to_something")) + 'UrlToSomething' + """ + return aas_core_codegen.naming.capitalized_camel_case(identifier) + + +def name_of( + something: Union[ + intermediate.Enumeration, intermediate.ConcreteClass, intermediate.Interface + ] +) -> Identifier: + """Dispatch to the appropriate naming function.""" + if isinstance(something, intermediate.Enumeration): + return enum_name(something.name) + + elif isinstance(something, intermediate.ConcreteClass): + return class_name(something.name) + + elif isinstance(something, intermediate.Interface): + return interface_name(something.name) + + else: + assert_never(something) + + raise AssertionError("Should not have gotten here") + + +def property_name(identifier: Identifier) -> Identifier: + """ + Generate a ProtoBuf name for a public property based on its meta-model ``identifier``. + + >>> property_name(Identifier("something")) + 'something' + + >>> property_name(Identifier("something_to_URL")) + 'something_to_url' + """ + return aas_core_codegen.naming.lower_snake_case(identifier) + + +def private_property_name(identifier: Identifier) -> Identifier: + """ + Generate a ProtoBuf name for a private property. + + This method is not to be used because proto3 does not support private properties. + """ + raise NotImplementedError( + "Private properties are not supported by proto3 Messages." + ) + + +def private_method_name(identifier: Identifier) -> Identifier: + """ + Generate a ProtoBuf name for a private method. + + This method is not to be used because proto3 does not support private methods. + """ + raise NotImplementedError("Methods are not supported by proto3 Messages.") + + +def method_name(identifier: Identifier) -> Identifier: + """ + Generate a ProtoBuf name for a method. + + This method is not to be used because proto3 does not support methods. + """ + raise NotImplementedError("Methods are not supported by proto3 Messages.") + + +def argument_name(identifier: Identifier) -> Identifier: + """ + Generate a ProtoBuf name for an argument. + + This method is not to be used because proto3 does not support methods and + thus no arguments. + """ + raise NotImplementedError("Arguments are not supported by proto3 Messages.") + + +def variable_name(identifier: Identifier) -> Identifier: + """ + Generate a ProtoBuf name for a variable. + + This method is not to be used because proto3 does not support variables. + """ + raise NotImplementedError("Variables are not supported by proto3 Messages.") diff --git a/aas_core_codegen/protobuf/structure/__init__.py b/aas_core_codegen/protobuf/structure/__init__.py new file mode 100644 index 000000000..b99d88583 --- /dev/null +++ b/aas_core_codegen/protobuf/structure/__init__.py @@ -0,0 +1,6 @@ +"""Generate C# data structures to represent an AAS.""" + +from aas_core_codegen.protobuf.structure import _generate + +verify = _generate.verify +generate = _generate.generate diff --git a/aas_core_codegen/protobuf/structure/_generate.py b/aas_core_codegen/protobuf/structure/_generate.py new file mode 100644 index 000000000..4d153a0bb --- /dev/null +++ b/aas_core_codegen/protobuf/structure/_generate.py @@ -0,0 +1,581 @@ +"""Generate the ProtoBuf data structures from the intermediate representation.""" + +import io +import textwrap +from typing import ( + Optional, + Dict, + List, + Tuple, + cast, + Union, + Set, +) + +from icontract import ensure, require + +from aas_core_codegen import intermediate +from aas_core_codegen import specific_implementations +from aas_core_codegen.common import ( + Error, + Identifier, + assert_never, + Stripped, + indent_but_first_line, +) +from aas_core_codegen.protobuf import ( + common as proto_common, + naming as proto_naming, + description as proto_description, +) +from aas_core_codegen.protobuf.common import INDENT as I, INDENT2 as II + + +# region Checks + + +def _human_readable_identifier( + something: Union[ + intermediate.Enumeration, intermediate.AbstractClass, intermediate.ConcreteClass + ] +) -> str: + """ + Represent ``something`` in a human-readable text. + + The reader should be able to trace ``something`` back to the meta-model. + """ + result: str + + if isinstance(something, intermediate.Enumeration): + result = f"meta-model enumeration {something.name!r}" + elif isinstance(something, intermediate.AbstractClass): + result = f"meta-model abstract class {something.name!r}" + elif isinstance(something, intermediate.ConcreteClass): + result = f"meta-model concrete class {something.name!r}" + else: + assert_never(something) + + return result + + +def _verify_intra_structure_collisions( + our_type: intermediate.OurType, +) -> Optional[Error]: + """Verify that no member names collide in the ProtoBuf structure of our type.""" + errors = [] # type: List[Error] + + if isinstance(our_type, intermediate.Enumeration): + pass + + elif isinstance(our_type, intermediate.ConstrainedPrimitive): + pass + + elif isinstance(our_type, intermediate.Class): + observed_member_names = {} # type: Dict[Identifier, str] + + for prop in our_type.properties: + prop_name = proto_naming.property_name(prop.name) + if prop_name in observed_member_names: + errors.append( + Error( + prop.parsed.node, + f"ProtoBuf property {prop_name!r} corresponding " + f"to the meta-model property {prop.name!r} collides with " + f"the {observed_member_names[prop_name]}", + ) + ) + else: + observed_member_names[prop_name] = ( + f"ProtoBuf property {prop_name!r} corresponding to " + f"the meta-model property {prop.name!r}" + ) + + else: + assert_never(our_type) + + if len(errors) > 0: + errors.append( + Error( + our_type.parsed.node, + f"Naming collision(s) in ProtoBuf code for our type {our_type.name!r}", + underlying=errors, + ) + ) + + return None + + +def _verify_structure_name_collisions( + symbol_table: intermediate.SymbolTable, +) -> List[Error]: + """Verify that the ProtoBuf names of the structures do not collide.""" + observed_structure_names: Dict[ + Identifier, + Union[ + intermediate.Enumeration, + intermediate.AbstractClass, + intermediate.ConcreteClass, + ], + ] = dict() + + errors = [] # type: List[Error] + + # region Inter-structure collisions + + for our_type in symbol_table.our_types: + if not isinstance( + our_type, + ( + intermediate.Enumeration, + intermediate.Class, + ), + ): + continue + + if isinstance(our_type, intermediate.Enumeration): + name = proto_naming.enum_name(our_type.name) + other = observed_structure_names.get(name, None) + + if other is not None: + errors.append( + Error( + our_type.parsed.node, + f"The ProtoBuf name {name!r} for the enumeration" + f" {our_type.name!r} " + f"collides with the same ProtoBuf name " + f"coming from the {_human_readable_identifier(other)}", + ) + ) + else: + observed_structure_names[name] = our_type + + elif isinstance(our_type, intermediate.Class): + interface_name = proto_naming.class_name(our_type.name) + + other = observed_structure_names.get(interface_name, None) + + if other is not None: + errors.append( + Error( + our_type.parsed.node, + f"The ProtoBuf name {interface_name!r} of the interface " + f"for the class {our_type.name!r} " + f"collides with the same ProtoBuf name " + f"coming from the {_human_readable_identifier(other)}", + ) + ) + else: + observed_structure_names[interface_name] = our_type + + else: + assert_never(our_type) + + # endregion + + # region Intra-structure collisions + + for our_type in symbol_table.our_types: + collision_error = _verify_intra_structure_collisions(our_type=our_type) + + if collision_error is not None: + errors.append(collision_error) + + # endregion + + return errors + + +class VerifiedIntermediateSymbolTable(intermediate.SymbolTable): + """Represent a verified symbol table which can be used for code generation.""" + + # noinspection PyInitNewSignature + def __new__( + cls, symbol_table: intermediate.SymbolTable + ) -> "VerifiedIntermediateSymbolTable": + raise AssertionError("Only for type annotation") + + +@ensure(lambda result: (result[0] is None) ^ (result[1] is None)) +def verify( + symbol_table: intermediate.SymbolTable, +) -> Tuple[Optional[VerifiedIntermediateSymbolTable], Optional[List[Error]]]: + """Verify that ProtoBuf code can be generated from the ``symbol_table``.""" + errors = [] # type: List[Error] + + structure_name_collisions = _verify_structure_name_collisions( + symbol_table=symbol_table + ) + + errors.extend(structure_name_collisions) + + if len(errors) > 0: + return None, errors + + return cast(VerifiedIntermediateSymbolTable, symbol_table), None + + +# endregion + +# region Generation + + +@ensure(lambda result: (result[0] is None) ^ (result[1] is None)) +def _generate_enum( + enum: intermediate.Enumeration, +) -> Tuple[Optional[Stripped], Optional[Error]]: + """Generate the ProtoBuf code for the enum.""" + writer = io.StringIO() + + if enum.description is not None: + comment, comment_errors = proto_description.generate_comment_for_our_type( + enum.description + ) + if comment_errors: + return None, Error( + enum.description.parsed.node, + "Failed to generate the documentation comment", + comment_errors, + ) + + assert comment is not None + + writer.write(comment) + writer.write("\n") + + name = proto_naming.enum_name(enum.name) + + # write enum and its name + writer.write(f"enum {name} {{\n") + # write at least the unspecified enum entry + writer.write(textwrap.indent(f"{proto_naming.enum_name(name)}_UNSPECIFIED = 0;", I)) + + if len(enum.literals) == 0: + writer.write("\n}") + return Stripped(writer.getvalue()), None + + for i, literal in enumerate(enum.literals): + writer.write("\n\n") + + if literal.description: + ( + literal_comment, + literal_comment_errors, + ) = proto_description.generate_comment_for_enumeration_literal( + literal.description + ) + + if literal_comment_errors: + return None, Error( + literal.description.parsed.node, + f"Failed to generate the comment " + f"for the enumeration literal {literal.name!r}", + literal_comment_errors, + ) + + assert literal_comment is not None + + writer.write(textwrap.indent(literal_comment, I)) + writer.write("\n") + + # Enums cannot have string-values assigned to them in proto3. Instead, they each get assigned + # an ID that is used for (de-)serialization. + # If that ID is re-assigned to another literal in the same enum in a later version, a system using the + # old version will (de-)serialize that literal differently. Hence, hope that the order of writing the literals + # stays the same in each build so that one literal always gets the same ID. Otherwise, don't mix versions. + # With each version, compare to the previous one and assign same ID. + # With each version, add a `reserved`-statement for deleted literals and their IDs. + writer.write( + textwrap.indent( + f"""\ +{proto_naming.enum_name(name)}_{proto_naming.enum_literal_name(literal.name)}\ + = {i + 1};""", + I, + ) + ) + + writer.write("\n}") + + return Stripped(writer.getvalue()), None + + +@require(lambda cls: not cls.is_implementation_specific) +@ensure(lambda result: (result[0] is None) ^ (result[1] is None)) +def _generate_class( + cls: intermediate.ConcreteClass, +) -> Tuple[Optional[Stripped], Optional[Error], List[intermediate.AbstractClass],]: + """Generate ProtoBuf code for the given concrete class ``cls``.""" + # Code blocks to be later joined by double newlines and indented once + blocks = [] # type: List[Stripped] + + required_choice_object = [] # type: List[intermediate.AbstractClass] + + # region Getters and setters + for i, prop in enumerate( + set(cls.properties).union( + set(cls.interface.properties if cls.interface is not None else []) + ) + ): + prop_type = proto_common.generate_type(type_annotation=prop.type_annotation) + + prop_name = proto_naming.property_name(prop.name) + + prop_blocks = [] # type: List[Stripped] + + if prop.description is not None: + ( + prop_comment, + prop_comment_errors, + ) = proto_description.generate_comment_for_property(prop.description) + if prop_comment_errors: + return ( + None, + Error( + prop.description.parsed.node, + f"Failed to generate the documentation comment " + f"for the property {prop.name!r}", + prop_comment_errors, + ), + [], + ) + + assert prop_comment is not None + + prop_blocks.append(prop_comment) + + # lists of our types where our types are abstract/interfaces + if ( + isinstance(prop.type_annotation, intermediate.ListTypeAnnotation) + and isinstance(prop.type_annotation.items, intermediate.OurTypeAnnotation) + and isinstance( + prop.type_annotation.items.our_type, + (intermediate.Interface, intermediate.AbstractClass), + ) + ): + # -> must create a new message (choice object) since "oneof" + # and "repeated" do not go together + prop_blocks.append(Stripped(f"{prop_type} {prop_name} = {i + 2};")) + required_choice_object.append(prop.type_annotation.items.our_type) + + # optional lists of our types where our types are abstract/interfaces + elif ( + isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation) + and isinstance(prop.type_annotation.value, intermediate.ListTypeAnnotation) + and isinstance( + prop.type_annotation.value.items, intermediate.OurTypeAnnotation + ) + and isinstance( + prop.type_annotation.value.items.our_type, + (intermediate.Interface, intermediate.AbstractClass), + ) + ): + # -> same as the case before + prop_blocks.append(Stripped(f"{prop_type} {prop_name} = {i + 2};")) + required_choice_object.append(prop.type_annotation.value.items.our_type) + + # our types where our types are abstract/interfaces + elif isinstance( + prop.type_annotation, intermediate.OurTypeAnnotation + ) and isinstance( + prop.type_annotation.our_type, + (intermediate.Interface, intermediate.AbstractClass), + ): + # -> must use "oneof" + prop_blocks.append(Stripped(f"{prop_type} {prop_name} = {i + 2};")) + required_choice_object.append(prop.type_annotation.our_type) + + else: + # just a normal property with type + prop_blocks.append(Stripped(f"{prop_type} {prop_name} = {i + 2};")) + + blocks.append(Stripped("\n".join(prop_blocks))) + + # one additional property indicating the concrete class type (in case multiple + # inherit from the same interface) when instantiating a class of this proto, + # the field must be set (ideally in the constructor) + blocks.append(Stripped("MessageType message_type = 1;")) + + # endregion + + name = proto_naming.class_name(cls.name) + + writer = io.StringIO() + + if cls.description is not None: + comment, comment_errors = proto_description.generate_comment_for_our_type( + cls.description + ) + if comment_errors is not None: + return ( + None, + Error( + cls.description.parsed.node, + "Failed to generate the comment description", + comment_errors, + ), + [], + ) + + assert comment is not None + + writer.write(comment) + writer.write("\n") + + writer.write(f"message {name} {{\n") + + for i, block in enumerate(blocks): + if i > 0: + writer.write("\n\n") + + writer.write(textwrap.indent(block, I)) + + writer.write("\n}") + + return Stripped(writer.getvalue()), None, required_choice_object + + +def _generate_message_type_enum( + symbol_table: VerifiedIntermediateSymbolTable, +) -> Tuple[Stripped, Optional[Error]]: + writer = io.StringIO() + + name = "MessageType" + + # write enum and its name + writer.write(f"enum {name} {{\n") + # write at least the unspecified enum entry + writer.write(textwrap.indent(f"{name}_UNSPECIFIED = 0;", I)) + + # generate one enum entry for each concrete class + for i, cls in enumerate(symbol_table.concrete_classes): + writer.write("\n\n") + + writer.write( + textwrap.indent( + f"{name}_{proto_naming.enum_literal_name(cls.name)} = {i + 1};", + I, + ) + ) + + writer.write("\n}") + + return Stripped(writer.getvalue()), None + + +def _generate_choice_class(cls: intermediate.AbstractClass) -> str: + msg_header = f"message {proto_naming.class_name(cls.name)} {{\n" + msg_body = f"{I}oneof value {{\n" + + for j, subtype in enumerate(cls.concrete_descendants): + subtype_type = proto_naming.class_name(subtype.name) + subtype_name = proto_naming.property_name(subtype.name) + msg_body += f"{II}{subtype_type} {subtype_name} = {j + 1};\n" + + return msg_header + msg_body + f"{I}}}\n}}" + + +@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None)) +@ensure( + lambda result: not (result[0] is not None) or result[0].endswith("\n"), + "Trailing newline mandatory for valid end-of-files", +) +# fmt: on +def generate( + symbol_table: VerifiedIntermediateSymbolTable, + namespace: proto_common.NamespaceIdentifier, + spec_impls: specific_implementations.SpecificImplementations, +) -> Tuple[Optional[str], Optional[List[Error]]]: + """ + Generate the ProtoBuf code of the structures based on the symbol table. + + The ``namespace`` defines the AAS ProtoBuf package. + """ + code_blocks = [] # type: List[Stripped] + + errors = [] # type: List[Error] + + required_choice_objects = set([]) # type: Set[intermediate.AbstractClass] + + for our_type in symbol_table.our_types: + if not isinstance( + our_type, + ( + intermediate.Enumeration, + intermediate.ConcreteClass, + ), + ): + continue + + if isinstance(our_type, intermediate.ConcreteClass): + code, error, choice_obj = _generate_class(cls=our_type) + if error is not None: + errors.append( + Error( + our_type.parsed.node, + f"Failed to generate the class code for " + f"the class {our_type.name!r}", + [error], + ) + ) + continue + + assert code is not None + code_blocks.append(code) + required_choice_objects = required_choice_objects.union(set(choice_obj)) + + elif isinstance(our_type, intermediate.Enumeration): + code, error = _generate_enum(enum=our_type) + if error is not None: + errors.append( + Error( + our_type.parsed.node, + f"Failed to generate the code for " + f"the enumeration {our_type.name!r}", + [error], + ) + ) + continue + + assert code is not None + code_blocks.append(code) + + else: + assert_never(our_type) + + # generate the necessary classes for choice (i.e. a class for every property that + # was like "repeated ") + for cls in required_choice_objects: + code_blocks.append(Stripped(_generate_choice_class(cls))) + + code, error = _generate_message_type_enum(symbol_table) + if error is None: + code_blocks.append(code) + + if len(errors) > 0: + return None, errors + + code_blocks_joined = "\n\n".join(code_blocks) + + blocks = [ + proto_common.WARNING, + Stripped( + f"""\ +syntax = "proto3"; + +package {namespace}; + + +{I}{indent_but_first_line(code_blocks_joined, I)}""" + ), + proto_common.WARNING, + ] # type: List[Stripped] + + out = io.StringIO() + for i, block in enumerate(blocks): + if i > 0: + out.write("\n\n") + + out.write(block) + + out.write("\n") + + return out.getvalue(), None