From 9614ee0c7ba0fa2af301a1a5975b33df606a2426 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Wed, 4 Dec 2024 01:34:06 +0000 Subject: [PATCH 1/2] Code generation for ThingClient subclasses This generates Python code for a module containing a client. The Python code generated for the test thing executes successfully, though the name of the class is not quite right yet. --- .../code_generation/__init__.py | 168 ++++++++++++++++++ tests/test_client_generation.py | 26 +++ 2 files changed, 194 insertions(+) create mode 100644 src/labthings_fastapi/code_generation/__init__.py create mode 100644 tests/test_client_generation.py diff --git a/src/labthings_fastapi/code_generation/__init__.py b/src/labthings_fastapi/code_generation/__init__.py new file mode 100644 index 00000000..0d634b43 --- /dev/null +++ b/src/labthings_fastapi/code_generation/__init__.py @@ -0,0 +1,168 @@ +from inspect import cleandoc +import re +from typing import Sequence + +from labthings_fastapi.thing_description.model import ( + DataSchema, + ThingDescription, + Type, +) + + +def title_to_snake_case(title: str) -> str: + """Convert text to snake_case""" + words = re.findall(r"[a-z0-9]+", title.lower()) + return "_".join(words) + + +def snake_to_camel_case(snake: str) -> str: + """Convert snake_case to CamelCase""" + words = snake.split("_") + return "".join(word.capitalize() for word in words) + + +def title_to_camel_case(title: str) -> str: + """Convert text to CamelCase""" + return snake_to_camel_case(title_to_snake_case(title)) + + +def clean_code(code: str, prefix: str = "") -> str: + """Clean up code by removing leading/trailing whitespace and empty lines""" + lines = cleandoc(code).split("\n") + return "\n".join([prefix + l for l in lines]) + + +def quoted_docstring(docstring: str, indent: int = 4) -> str: + """Wrap a docstring in triple quotes""" + prefix = " " * indent + lines = docstring.split("\n") + lines[0] = f'"""{lines[0]}' + lines.append('"""') + return "".join([f"{prefix}{line}\n" for line in lines]) + + +def dataschema_to_type(schema: DataSchema) -> str: + """Convert a DataSchema to a Python type""" + if isinstance(schema.oneOf, Sequence) and len(schema.oneOf) > 0: + types = [dataschema_to_type(s) for s in schema.oneOf] + return f"Union[{", ".join(types)}]" + if schema.type == Type.string: + return "str" + elif schema.type == Type.integer: + return "int" + elif schema.type == Type.number: + return "float" + elif schema.type == Type.boolean: + return "bool" + elif schema.type == Type.array: + if schema.items is None: + return "list" + return f"list[{dataschema_to_type(schema.items)}]" + elif schema.type == Type.object: + return "dict[str, Any]" + else: + return "Any" + +def property_to_argument( + name: str, + property: DataSchema, + ) -> str: + """Convert a property to a function argument""" + dtype = dataschema_to_type(property) + arg = f"{name}: {dtype}" + if "default" in property.model_fields_set: + if property.default is None: + arg += " = None" + elif isinstance(property.default, str): + arg += f' = "{property.default}"' + elif ( + isinstance(property.default, bool) + or isinstance(property.default, int) + or isinstance(property.default, float) + ): + arg += f" = {property.default}" + else: + raise NotImplementedError(f"Unsupported default value: {property.default}") + return arg + + +def input_model_to_arguments(model: DataSchema) -> list[str]: + """Convert an input model to a list of arguments""" + if model.type is None: + return [] + if model.type != Type.object: + print(f"model.type: {model.type}") + raise NotImplementedError("Only object models are supported") + if not model.properties: + return [] + args = [] + if model.required: + for name in model.required: + property = model.properties[name] + args.append( + property_to_argument(name, property) + ) + for name, property in model.properties.items(): + if model.required and name in model.required: + continue + args.append(property_to_argument(name, property)) + if "=" not in args[-1]: + args[-1] += " = ..." + return args + + +def generate_client(thing_description: ThingDescription) -> str: + """Generate a client from a Thing Description""" + code = ( + "from labthings_fastapi.client import ThingClient\n" + "from typing import Any, Union\n" + "\n" + ) + class_name = title_to_camel_case(thing_description.title) + code += f"class {class_name}Client(ThingClient):\n" + code += f' """A client for the {thing_description.title} Thing"""\n\n' + for name, property in thing_description.properties.items(): + pname = title_to_snake_case(name) + dtype = dataschema_to_type(property) + code += " @property\n" + code += f" def {pname}(self) -> {dtype}:\n" + code += quoted_docstring(property.description, indent=8) + code += f' return self.get_property("{name}")\n\n' + + if not property.readOnly: + code += clean_code( + f''' + @{pname}.setter + def {pname}(self, value: {dtype}): + self.set_property("{name}", value) + ''', + prefix = " ", + ) + "\n\n" + + for name, action in thing_description.actions.items(): + aname = title_to_snake_case(name) + args = input_model_to_arguments(action.input) + output_type = dataschema_to_type(action.output) + code += f" def {aname}(\n" + code += " self,\n" + for arg in args: + code += f" {arg},\n" + code += " **kwargs\n" + code += f" ) -> {output_type}:\n" + code += quoted_docstring(action.description, indent=8) + for arg in args: + k = arg.split(":")[0].strip() + if arg.endswith("..."): + code += clean_code( + f""" + if {k} is not ...: + kwargs[{k}] = {k} + """, + prefix = " ", + ) + "\n" + else: + code += f" kwargs[{k}] = {k}\n" + code += f' return self.invoke_action("{name}", **kwargs)\n\n' + + return code + diff --git a/tests/test_client_generation.py b/tests/test_client_generation.py new file mode 100644 index 00000000..fe1864f4 --- /dev/null +++ b/tests/test_client_generation.py @@ -0,0 +1,26 @@ +import os +import tempfile +import importlib.util + +from labthings_fastapi.code_generation import generate_client +from labthings_fastapi.example_things import MyThing + + +def test_client_generation(): + td = MyThing().thing_description() + code = generate_client(td) + with tempfile.TemporaryDirectory() as d: + fname = os.path.join(d,"client.py") + with open(fname, "w") as f: + f.write(code) + spec = importlib.util.spec_from_file_location("client", f.name) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + assert "MyThingClient" in dir(module) + +if __name__ == "__main__": + td = MyThing().thing_description() + print("Thing Description:") + print(td.model_dump_json(indent=2, exclude_unset=True)) + print("\nGenerated Client:") + print(generate_client(td)) From 2c68771ea17199ccfe25f909f443e8628d573fb5 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Tue, 21 Jan 2025 00:46:25 +0000 Subject: [PATCH 2/2] Convert objets to Models Dataschemas of type Object that have defined properties are now converted to Pydantic models. This should allow much better autocompletion. The inconsistency of using `dict[str, any]` as a fallback rankles, but I think it's the best option. --- .../code_generation/__init__.py | 88 +++++++++++++++---- tests/test_client_generation.py | 59 +++++++++++-- 2 files changed, 125 insertions(+), 22 deletions(-) diff --git a/src/labthings_fastapi/code_generation/__init__.py b/src/labthings_fastapi/code_generation/__init__.py index 0d634b43..0524da39 100644 --- a/src/labthings_fastapi/code_generation/__init__.py +++ b/src/labthings_fastapi/code_generation/__init__.py @@ -1,6 +1,6 @@ from inspect import cleandoc import re -from typing import Sequence +from typing import Optional, Sequence from labthings_fastapi.thing_description.model import ( DataSchema, @@ -11,8 +11,10 @@ def title_to_snake_case(title: str) -> str: """Convert text to snake_case""" - words = re.findall(r"[a-z0-9]+", title.lower()) - return "_".join(words) + # First, look for CamelCase so it doesn't get ignored: + uncameled = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", title) + words = re.findall(r"[a-zA-Z0-9]+", uncameled) + return "_".join(w.lower() for w in words) def snake_to_camel_case(snake: str) -> str: @@ -32,8 +34,10 @@ def clean_code(code: str, prefix: str = "") -> str: return "\n".join([prefix + l for l in lines]) -def quoted_docstring(docstring: str, indent: int = 4) -> str: +def quoted_docstring(docstring: Optional[str], indent: int = 4) -> str: """Wrap a docstring in triple quotes""" + if docstring is None: + return "" prefix = " " * indent lines = docstring.split("\n") lines[0] = f'"""{lines[0]}' @@ -41,10 +45,10 @@ def quoted_docstring(docstring: str, indent: int = 4) -> str: return "".join([f"{prefix}{line}\n" for line in lines]) -def dataschema_to_type(schema: DataSchema) -> str: +def dataschema_to_type(schema: DataSchema, models: dict[str, str], name: str = "anonymous") -> str: """Convert a DataSchema to a Python type""" if isinstance(schema.oneOf, Sequence) and len(schema.oneOf) > 0: - types = [dataschema_to_type(s) for s in schema.oneOf] + types = [dataschema_to_type(s, models) for s in schema.oneOf] return f"Union[{", ".join(types)}]" if schema.type == Type.string: return "str" @@ -57,18 +61,50 @@ def dataschema_to_type(schema: DataSchema) -> str: elif schema.type == Type.array: if schema.items is None: return "list" - return f"list[{dataschema_to_type(schema.items)}]" + if isinstance(schema.items, Sequence): + types = [dataschema_to_type(s, models) for s in schema.items] + return f"tuple[{', '.join(types)}]" + return f"list[{dataschema_to_type(schema.items, models)}]" elif schema.type == Type.object: - return "dict[str, Any]" + # If the object has no properties, return a generic dict + if not schema.properties: + return "dict[str, Any]" + # Objects with properties are converted to Pydantic models + if schema.title: + model_name = title_to_camel_case(schema.title + "_model") + else: + model_name = snake_to_camel_case(name + "_model") + if model_name in models: + i = 0 + while f"{model_name}_{i}" in models: + i += 1 + model_name = f"{model_name}_{i}" + models[model_name] = "# placeholder" + models[model_name] = dataschema_to_model(schema, models, model_name) + return model_name else: return "Any" + +def dataschema_to_model(schema: DataSchema, models: dict[str, str], name: str) -> str: + """Convert a DataSchema to a Pydantic model""" + code = f"class {name}(BaseModel):\n" + for pname, property in schema.properties.items(): + code += " " + property_to_argument(pname, property, models) + "\n" + code += ( + "\n" + " class Config:\n" + " extra = 'allow'\n" + ) + return code + def property_to_argument( name: str, property: DataSchema, + models: dict[str, str] = None, ) -> str: """Convert a property to a function argument""" - dtype = dataschema_to_type(property) + dtype = dataschema_to_type(property, models, name) arg = f"{name}: {dtype}" if "default" in property.model_fields_set: if property.default is None: @@ -86,7 +122,7 @@ def property_to_argument( return arg -def input_model_to_arguments(model: DataSchema) -> list[str]: +def input_model_to_arguments(model: DataSchema, models) -> list[str]: """Convert an input model to a list of arguments""" if model.type is None: return [] @@ -100,12 +136,12 @@ def input_model_to_arguments(model: DataSchema) -> list[str]: for name in model.required: property = model.properties[name] args.append( - property_to_argument(name, property) + property_to_argument(name, property, models) ) for name, property in model.properties.items(): if model.required and name in model.required: continue - args.append(property_to_argument(name, property)) + args.append(property_to_argument(name, property, models)) if "=" not in args[-1]: args[-1] += " = ..." return args @@ -116,18 +152,29 @@ def generate_client(thing_description: ThingDescription) -> str: code = ( "from labthings_fastapi.client import ThingClient\n" "from typing import Any, Union\n" + "from pydantic import BaseModel\n" + "\n" + "\n" + "# Model definitions\n" # will be replaced at the end + "\n" "\n" ) + models: dict[str, str] = {} class_name = title_to_camel_case(thing_description.title) code += f"class {class_name}Client(ThingClient):\n" code += f' """A client for the {thing_description.title} Thing"""\n\n' for name, property in thing_description.properties.items(): pname = title_to_snake_case(name) - dtype = dataschema_to_type(property) + dtype = dataschema_to_type(property, models=models) code += " @property\n" code += f" def {pname}(self) -> {dtype}:\n" code += quoted_docstring(property.description, indent=8) - code += f' return self.get_property("{name}")\n\n' + code += f" val = self.get_property(\"{name}\")\n" + if dtype in models: + # If we've defined a model, convert it + code += f" return {dtype}(**val)\n\n" + else: + code += " return val\n\n" if not property.readOnly: code += clean_code( @@ -141,8 +188,8 @@ def {pname}(self, value: {dtype}): for name, action in thing_description.actions.items(): aname = title_to_snake_case(name) - args = input_model_to_arguments(action.input) - output_type = dataschema_to_type(action.output) + args = input_model_to_arguments(action.input, models) + output_type = dataschema_to_type(action.output, models) code += f" def {aname}(\n" code += " self,\n" for arg in args: @@ -162,7 +209,14 @@ def {pname}(self, value: {dtype}): ) + "\n" else: code += f" kwargs[{k}] = {k}\n" - code += f' return self.invoke_action("{name}", **kwargs)\n\n' + code += f' result = self.invoke_action("{name}", **kwargs)\n' + if output_type in models: + code += f" return {output_type}(**result)\n\n" + else: + code += " return result\n\n" + + # Include the model definitions + code = code.replace("# Model definitions", "\n\n".join(models.values())) return code diff --git a/tests/test_client_generation.py b/tests/test_client_generation.py index fe1864f4..e0859b4e 100644 --- a/tests/test_client_generation.py +++ b/tests/test_client_generation.py @@ -2,24 +2,73 @@ import tempfile import importlib.util +from pydantic import BaseModel + from labthings_fastapi.code_generation import generate_client +import labthings_fastapi.code_generation as cg +from labthings_fastapi.decorators import thing_action, thing_property from labthings_fastapi.example_things import MyThing +from labthings_fastapi.thing import Thing + + +def test_title_to_snake_case(): + assert cg.title_to_snake_case("CamelCase") == "camel_case" + assert cg.title_to_snake_case("Camel") == "camel" + assert cg.title_to_snake_case("camel") == "camel" + assert cg.title_to_snake_case("CAMEL") == "camel" + assert cg.title_to_snake_case("CamelCASE") == "camel_case" + +def test_snake_to_camel_case(): + assert cg.snake_to_camel_case("snake_case") == "SnakeCase" + assert cg.snake_to_camel_case("snake") == "Snake" + assert cg.snake_to_camel_case("SNAKE") == "Snake" + assert cg.snake_to_camel_case("snakeCASE_word") == "SnakecaseWord" -def test_client_generation(): - td = MyThing().thing_description() + +def generate_and_verify(thing): + td = thing().thing_description() code = generate_client(td) with tempfile.TemporaryDirectory() as d: - fname = os.path.join(d,"client.py") + fname = os.path.join(d, "client.py") with open(fname, "w") as f: f.write(code) spec = importlib.util.spec_from_file_location("client", f.name) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) - assert "MyThingClient" in dir(module) + assert f"{thing.__name__}Client" in dir(module) + + +def test_mything_generation(): + generate_and_verify(MyThing) + + +class TestModel(BaseModel): + a: int + b: str + +class NestedModel(BaseModel): + c: TestModel + +class ThingWithModels(Thing): + @thing_property + def prop1(self) -> TestModel: + return TestModel(a=1, b="test") + + @thing_action + def action1(self, arg1: TestModel) -> TestModel: + return arg1 + + @thing_property + def prop2(self) -> NestedModel: + return NestedModel(c=TestModel(a=1, b="test")) + + +def test_with_models(): + generate_and_verify(ThingWithModels) if __name__ == "__main__": - td = MyThing().thing_description() + td = ThingWithModels().thing_description() print("Thing Description:") print(td.model_dump_json(indent=2, exclude_unset=True)) print("\nGenerated Client:")