diff --git a/src/hayhooks/server/pipelines/models.py b/src/hayhooks/server/pipelines/models.py new file mode 100644 index 0000000..8f3d877 --- /dev/null +++ b/src/hayhooks/server/pipelines/models.py @@ -0,0 +1,88 @@ +from typing import get_args, get_origin, List + +from pydantic import BaseModel, create_model +from haystack.dataclasses import Document + + +class HaystackDocument(BaseModel): + id: str + content: str + + +class PipelineDefinition(BaseModel): + name: str + source_code: str + + +def get_request_model(pipeline_name: str, pipeline_inputs): + """ + Inputs have this form: + { + 'first_addition': { <-- Component Name + 'value': {'type': , 'is_mandatory': True}, <-- Input + 'add': {'type': typing.Optional[int], 'is_mandatory': False, 'default_value': None}, <-- Input + }, + 'second_addition': {'add': {'type': typing.Optional[int], 'is_mandatory': False}}, + } + """ + request_model = {} + for component_name, inputs in pipeline_inputs.items(): + + component_model = {} + for name, typedef in inputs.items(): + component_model[name] = (typedef["type"], typedef.get("default_value", ...)) + request_model[component_name] = (create_model('ComponentParams', **component_model, __config__=config), ...) + + return create_model(f'{pipeline_name.capitalize()}RunRequest', **request_model, __config__=config) + + +def get_response_model(pipeline_name: str, pipeline_outputs): + """ + Outputs have this form: + { + 'second_addition': { <-- Component Name + 'result': {'type': ""} <-- Output + }, + } + """ + response_model = {} + for component_name, outputs in pipe.outputs().items(): + component_model = {} + for name, typedef in outputs.items(): + output_type = typedef["type"] + if get_origin(output_type) == list and get_args(output_type)[0] == Document: + component_model[name] = (List[HaystackDocument], ...) + else: + component_model[name] = (typedef["type"], ...) + response_model[component_name] = (create_model('ComponentParams', **component_model, __config__=config), ...) + + return create_model(f'{pipeline_name.capitalize()}RunResponse', **response_model, __config__=config) + + +def convert_component_output(component_output): + """ + Component output has this form: + + "documents":[ + {"id":"818170...", "content":"RapidAPI for Mac is a full-featured HTTP client."} + ] + + We inspect the output and convert haystack.Document into the HaystackDocument pydantic model as needed + """ + result = {} + for output_name, data in component_output.items(): + # Empty containers, None values, empty strings and the likes: do nothing + if not data: + result[output_name] = data + + # Output contains a list of Document + if type(data) is list and type(data[0]) is Document: + result[output_name] = [HaystackDocument(id=d.id, content=d.content) for d in data] + # Output is a single Document + elif type(data) is Document: + result[output_name] = HaystackDocument(id=data.id, content=data.content or "") + # Any other type: do nothing + else: + result[output_name] = data + + return result diff --git a/src/hayhooks/server/pipelines/registry.py b/src/hayhooks/server/pipelines/registry.py index 860eea8..85e9974 100644 --- a/src/hayhooks/server/pipelines/registry.py +++ b/src/hayhooks/server/pipelines/registry.py @@ -1,5 +1,3 @@ -from typing import Any - from haystack.pipeline import Pipeline, Optional from haystack.core.errors import PipelineError diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index 8c44ee3..2e85f2b 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -1,13 +1,16 @@ from fastapi import HTTPException from fastapi.responses import JSONResponse -from pydantic import BaseModel, create_model, ConfigDict +from pydantic import ConfigDict from hayhooks.server.pipelines import registry - - -class PipelineDefinition(BaseModel): - name: str - source_code: str +from hayhooks.server.pipelines.models import ( + HaystackDocument, + PipelineDefinition, + get_request_model, + get_response_model, + convert_component_output, +) +from haystack.dataclasses import Document def deploy_pipeline_def(app, pipeline_def: PipelineDefinition): @@ -18,46 +21,19 @@ def deploy_pipeline_def(app, pipeline_def: PipelineDefinition): config = ConfigDict(arbitrary_types_allowed=True) - request_model = {} - for component_name, inputs in pipe.inputs().items(): - # Inputs have this form: - # { - # 'first_addition': { <-- Component Name - # 'value': {'type': , 'is_mandatory': True}, <-- Input - # 'add': {'type': typing.Optional[int], 'is_mandatory': False, 'default_value': None}, <-- Input - # }, - # 'second_addition': {'add': {'type': typing.Optional[int], 'is_mandatory': False}}, - # } - component_model = {} - for name, typedef in inputs.items(): - component_model[name] = (typedef["type"], typedef.get("default_value", ...)) - request_model[component_name] = (create_model('ComponentParams', **component_model, __config__=config), ...) - - PipelineRunRequest = create_model(f'{pipeline_def.name.capitalize()}RunRequest', **request_model, __config__=config) - - response_model = {} - for component_name, outputs in pipe.outputs().items(): - # Outputs have this form: - # { - # 'second_addition': { <-- Component Name - # 'result': {'type': ""} <-- Output - # }, - # } - component_model = {} - for name, typedef in outputs.items(): - component_model[name] = (typedef["type"], ...) - response_model[component_name] = (create_model('ComponentParams', **component_model, __config__=config), ...) - - PipelineRunResponse = create_model( - f'{pipeline_def.name.capitalize()}RunResponse', **response_model, __config__=config - ) + PipelineRunRequest = get_request_model(pipeline_def.name, pipe.inputs()) + PipelineRunResponse = get_response_model(pipeline_def.name, pipe.outputs()) # There's no way in FastAPI to define the type of the request body other than annotating # the endpoint handler. We have to ignore the type here to make FastAPI happy while # silencing static type checkers (that would have good reasons to trigger!). async def pipeline_run(pipeline_run_req: PipelineRunRequest) -> JSONResponse: # type: ignore - output = pipe.run(data=pipeline_run_req.dict()) - return JSONResponse(PipelineRunResponse(**output).model_dump(), status_code=200) + result = pipe.run(data=pipeline_run_req.dict()) + final_output = {} + for component_name, output in result.items(): + final_output[component_name] = convert_component_output(output) + + return JSONResponse(PipelineRunResponse(**final_output).model_dump(), status_code=200) app.add_api_route( path=f"/{pipeline_def.name}",