Skip to content

Commit

Permalink
Add choices for concrete classes in Protobuf (#531)
Browse files Browse the repository at this point in the history
We add additional choice classes corresponding to concrete classes with
concrete descendants in Protobuf.

So far, we omitted this case as the concrete classes with concrete
descendants do not appear in the AAS meta-models. However, we decide to
make the implementation future-proof in that regard as it is very possible
that in the near future we do face concrete classes with descendants in
properties.
  • Loading branch information
mristin authored Oct 27, 2024
1 parent cc5acd8 commit dc2d21c
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 97 deletions.
43 changes: 19 additions & 24 deletions aas_core_codegen/protobuf/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,54 +119,49 @@ def _assert_all_primitive_types_are_mapped() -> None:
_assert_all_primitive_types_are_mapped()


# fmt: off
@require(
lambda our_type_qualifier:
not (our_type_qualifier is not None)
or not our_type_qualifier.endswith('.')
)
# fmt: on
def generate_type(
type_annotation: intermediate.TypeAnnotationUnion,
our_type_qualifier: Optional[Stripped] = None,
) -> Stripped:
def generate_type(type_annotation: intermediate.TypeAnnotationUnion) -> Stripped:
"""
Generate the ProtoBuf type for the given type annotation.
``our_type_prefix`` is appended to all our types, if specified.
"""
our_type_prefix = "" if our_type_qualifier is None else f"{our_type_qualifier}."
if isinstance(type_annotation, intermediate.PrimitiveTypeAnnotation):
return PRIMITIVE_TYPE_MAP[type_annotation.a_type]

elif isinstance(type_annotation, intermediate.OurTypeAnnotation):
our_type = type_annotation.our_type

if isinstance(our_type, intermediate.Enumeration):
return Stripped(
our_type_prefix + proto_naming.enum_name(type_annotation.our_type.name)
)
return Stripped(proto_naming.enum_name(type_annotation.our_type.name))

elif isinstance(our_type, intermediate.ConstrainedPrimitive):
return PRIMITIVE_TYPE_MAP[our_type.constrainee]

elif isinstance(our_type, intermediate.Class):
return Stripped(our_type_prefix + proto_naming.class_name(our_type.name))
message_name = proto_naming.class_name(our_type.name)
if (
isinstance(our_type, intermediate.ConcreteClass)
and len(our_type.concrete_descendants) > 0
):
# NOTE (mristin):
# We have to add the suffix ``_choice`` since this field points
# to one of the concrete descendants of the class as well as
# the concrete class itself.
message_name += "_choice"

return Stripped(message_name)

elif isinstance(type_annotation, intermediate.ListTypeAnnotation):
item_type = generate_type(
type_annotation=type_annotation.items, our_type_qualifier=our_type_qualifier
)
item_type = generate_type(type_annotation=type_annotation.items)

return Stripped(f"repeated {item_type}")

elif isinstance(type_annotation, intermediate.OptionalTypeAnnotation):
value = generate_type(
type_annotation=type_annotation.value, our_type_qualifier=our_type_qualifier
)
value = generate_type(type_annotation=type_annotation.value)

# careful: do not generate "optional" keyword for list-type elements since otherwise we get invalid
# constructs like "optional repeated <type> <name>"
# NOTE (TomGneuss):
# Careful: do not generate "optional" keyword for list-type elements since
# otherwise we get invalid constructs like "optional repeated <type> <name>".
if isinstance(type_annotation.value, intermediate.ListTypeAnnotation):
return Stripped(f"{value}")
else:
Expand Down
4 changes: 1 addition & 3 deletions aas_core_codegen/protobuf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ def execute(context: run.Context, stdout: TextIO, stderr: TextIO) -> int:
# region Structure

code, errors = proto_structure.generate(
symbol_table=verified_ir_table,
namespace=namespace,
spec_impls=context.spec_impls,
symbol_table=verified_ir_table, namespace=namespace
)

if errors is not None:
Expand Down
6 changes: 6 additions & 0 deletions aas_core_codegen/protobuf/naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import Union

from icontract import ensure

import aas_core_codegen.naming
from aas_core_codegen import intermediate
from aas_core_codegen.common import Identifier, assert_never
Expand Down Expand Up @@ -42,6 +44,10 @@ def enum_literal_name(identifier: Identifier) -> Identifier:
return aas_core_codegen.naming.upper_snake_case(identifier)


@ensure(
lambda result: "_" not in result,
"No underscode allowed so that we can attached our own suffixes such as ``_choice``",
)
def class_name(identifier: Identifier) -> Identifier:
"""
Generate a ProtoBuf name for a class based on its meta-model ``identifier``.
Expand Down
116 changes: 48 additions & 68 deletions aas_core_codegen/protobuf/structure/_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Identifier,
assert_never,
Stripped,
indent_but_first_line,
)
from aas_core_codegen.protobuf import (
common as proto_common,
Expand Down Expand Up @@ -301,17 +302,14 @@ def _generate_enum(
@ensure(lambda result: (result[0] is None) ^ (result[1] is None))
def _generate_class(
cls: intermediate.ConcreteClass,
) -> Tuple[Optional[Stripped], Optional[Error], List[intermediate.AbstractClass],]:
) -> Tuple[Optional[Stripped], Optional[Error]]:
"""Generate ProtoBuf code for the given concrete class ``cls``."""
# Code blocks to be later joined by double newlines and indented once
blocks = [] # type: List[Stripped]

required_choice_object = [] # type: List[intermediate.AbstractClass]

# region Getters and setters
for i, prop in enumerate(cls.properties):
prop_type = proto_common.generate_type(type_annotation=prop.type_annotation)

prop_name = proto_naming.property_name(prop.name)

prop_blocks = [] # type: List[Stripped]
Expand All @@ -330,57 +328,13 @@ def _generate_class(
f"for the property {prop.name!r}",
prop_comment_errors,
),
[],
)

assert prop_comment is not None

prop_blocks.append(prop_comment)

# lists of our types where our types are abstract/interfaces
if (
isinstance(prop.type_annotation, intermediate.ListTypeAnnotation)
and isinstance(prop.type_annotation.items, intermediate.OurTypeAnnotation)
and isinstance(
prop.type_annotation.items.our_type,
(intermediate.Interface, intermediate.AbstractClass),
)
):
# -> must create a new message (choice object) since "oneof"
# and "repeated" do not go together
prop_blocks.append(Stripped(f"{prop_type} {prop_name} = {i + 1};"))
required_choice_object.append(prop.type_annotation.items.our_type)

# optional lists of our types where our types are abstract/interfaces
elif (
isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation)
and isinstance(prop.type_annotation.value, intermediate.ListTypeAnnotation)
and isinstance(
prop.type_annotation.value.items, intermediate.OurTypeAnnotation
)
and isinstance(
prop.type_annotation.value.items.our_type,
(intermediate.Interface, intermediate.AbstractClass),
)
):
# -> same as the case before
prop_blocks.append(Stripped(f"{prop_type} {prop_name} = {i + 1};"))
required_choice_object.append(prop.type_annotation.value.items.our_type)

# our types where our types are abstract/interfaces
elif isinstance(
prop.type_annotation, intermediate.OurTypeAnnotation
) and isinstance(
prop.type_annotation.our_type,
(intermediate.Interface, intermediate.AbstractClass),
):
# -> must use "oneof"
prop_blocks.append(Stripped(f"{prop_type} {prop_name} = {i + 1};"))
required_choice_object.append(prop.type_annotation.our_type)

else:
# just a normal property with type
prop_blocks.append(Stripped(f"{prop_type} {prop_name} = {i + 1};"))
prop_blocks.append(Stripped(f"{prop_type} {prop_name} = {i + 1};"))

blocks.append(Stripped("\n".join(prop_blocks)))

Expand All @@ -402,7 +356,6 @@ def _generate_class(
"Failed to generate the comment description",
comment_errors,
),
[],
)

assert comment is not None
Expand All @@ -420,19 +373,52 @@ def _generate_class(

writer.write("\n}")

return Stripped(writer.getvalue()), None, required_choice_object
return Stripped(writer.getvalue()), None


@require(
lambda cls: len(cls.concrete_descendants) > 0,
"No choice possible if no concrete descendants",
)
def _generate_choice_class(cls: intermediate.ClassUnion) -> Stripped:
fields = [] # type: List[Stripped]

concrete_classes = []
if isinstance(cls, intermediate.ConcreteClass):
concrete_classes.append(cls)

def _generate_choice_class(cls: intermediate.AbstractClass) -> str:
msg_header = f"message {proto_naming.class_name(cls.name)} {{\n"
msg_body = f"{I}oneof value {{\n"
concrete_classes.extend(cls.concrete_descendants)

for j, subtype in enumerate(cls.concrete_descendants):
for j, subtype in enumerate(concrete_classes):
subtype_type = proto_naming.class_name(subtype.name)
subtype_name = proto_naming.property_name(subtype.name)
msg_body += f"{II}{subtype_type} {subtype_name} = {j + 1};\n"
fields.append(Stripped(f"{subtype_type} {subtype_name} = {j + 1};"))

return msg_header + msg_body + f"{I}}}\n}}"
if isinstance(cls, intermediate.AbstractClass):
message_name = proto_naming.class_name(cls.name)
elif isinstance(cls, intermediate.ConcreteClass):
# NOTE (mristin):
# We have to append the ``_choice`` suffix for concrete classes with descendants
# to distinguish between the concrete classes and one-of choice classes.
# Protocol Buffers do not support inheritance, so we have to work around that
# circumstance.

message_name = proto_naming.class_name(cls.name) + "_choice"

else:
assert_never(cls)
raise AssertionError("Unexpected execution path")

fields_joined = "\n".join(fields)

return Stripped(
f"""\
message {message_name} {{
{I}oneof value {{
{II}{indent_but_first_line(fields_joined, II)}
{I}}}
}}"""
)


@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
Expand All @@ -444,7 +430,6 @@ def _generate_choice_class(cls: intermediate.AbstractClass) -> str:
def generate(
symbol_table: VerifiedIntermediateSymbolTable,
namespace: proto_common.NamespaceIdentifier,
spec_impls: specific_implementations.SpecificImplementations,
) -> Tuple[Optional[str], Optional[List[Error]]]:
"""
Generate the ProtoBuf code of the structures based on the symbol table.
Expand All @@ -460,15 +445,12 @@ def generate(
for our_type in symbol_table.our_types:
if not isinstance(
our_type,
(
intermediate.Enumeration,
intermediate.ConcreteClass,
),
(intermediate.Enumeration, intermediate.ConcreteClass),
):
continue

if isinstance(our_type, intermediate.ConcreteClass):
code, error, choice_obj = _generate_class(cls=our_type)
code, error = _generate_class(cls=our_type)
if error is not None:
errors.append(
Error(
Expand All @@ -482,7 +464,6 @@ def generate(

assert code is not None
code_blocks.append(code)
required_choice_objects = required_choice_objects.union(set(choice_obj))

elif isinstance(our_type, intermediate.Enumeration):
code, error = _generate_enum(enum=our_type)
Expand All @@ -503,10 +484,9 @@ def generate(
else:
assert_never(our_type)

# generate the necessary classes for choice (i.e. a class for every property that
# was like "repeated <interface>")
for cls in required_choice_objects:
code_blocks.append(Stripped(_generate_choice_class(cls)))
for cls in symbol_table.classes:
if len(cls.concrete_descendants) > 0:
code_blocks.append(_generate_choice_class(cls))

if len(errors) > 0:
return None, errors
Expand Down
Loading

0 comments on commit dc2d21c

Please sign in to comment.