Skip to content

Commit

Permalink
clean up code
Browse files Browse the repository at this point in the history
  • Loading branch information
masci committed Feb 14, 2024
1 parent 277ec70 commit 81cb0e3
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 43 deletions.
88 changes: 88 additions & 0 deletions src/hayhooks/server/pipelines/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from typing import get_args, get_origin, List

from pydantic import BaseModel, create_model
from haystack.dataclasses import Document


class HaystackDocument(BaseModel):
id: str
content: str


class PipelineDefinition(BaseModel):
name: str
source_code: str


def get_request_model(pipeline_name: str, pipeline_inputs):
"""
Inputs have this form:
{
'first_addition': { <-- Component Name
'value': {'type': <class 'int'>, 'is_mandatory': True}, <-- Input
'add': {'type': typing.Optional[int], 'is_mandatory': False, 'default_value': None}, <-- Input
},
'second_addition': {'add': {'type': typing.Optional[int], 'is_mandatory': False}},
}
"""
request_model = {}
for component_name, inputs in pipeline_inputs.items():

component_model = {}
for name, typedef in inputs.items():
component_model[name] = (typedef["type"], typedef.get("default_value", ...))
request_model[component_name] = (create_model('ComponentParams', **component_model, __config__=config), ...)

return create_model(f'{pipeline_name.capitalize()}RunRequest', **request_model, __config__=config)


def get_response_model(pipeline_name: str, pipeline_outputs):
"""
Outputs have this form:
{
'second_addition': { <-- Component Name
'result': {'type': "<class 'int'>"} <-- Output
},
}
"""
response_model = {}
for component_name, outputs in pipe.outputs().items():
component_model = {}
for name, typedef in outputs.items():
output_type = typedef["type"]
if get_origin(output_type) == list and get_args(output_type)[0] == Document:
component_model[name] = (List[HaystackDocument], ...)
else:
component_model[name] = (typedef["type"], ...)
response_model[component_name] = (create_model('ComponentParams', **component_model, __config__=config), ...)

return create_model(f'{pipeline_name.capitalize()}RunResponse', **response_model, __config__=config)


def convert_component_output(component_output):
"""
Component output has this form:
"documents":[
{"id":"818170...", "content":"RapidAPI for Mac is a full-featured HTTP client."}
]
We inspect the output and convert haystack.Document into the HaystackDocument pydantic model as needed
"""
result = {}
for output_name, data in component_output.items():
# Empty containers, None values, empty strings and the likes: do nothing
if not data:
result[output_name] = data

# Output contains a list of Document
if type(data) is list and type(data[0]) is Document:
result[output_name] = [HaystackDocument(id=d.id, content=d.content) for d in data]
# Output is a single Document
elif type(data) is Document:
result[output_name] = HaystackDocument(id=data.id, content=data.content or "")
# Any other type: do nothing
else:
result[output_name] = data

return result
2 changes: 0 additions & 2 deletions src/hayhooks/server/pipelines/registry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Any

from haystack.pipeline import Pipeline, Optional
from haystack.core.errors import PipelineError

Expand Down
58 changes: 17 additions & 41 deletions src/hayhooks/server/utils/deploy_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, create_model, ConfigDict
from pydantic import ConfigDict

from hayhooks.server.pipelines import registry


class PipelineDefinition(BaseModel):
name: str
source_code: str
from hayhooks.server.pipelines.models import (
HaystackDocument,
PipelineDefinition,
get_request_model,
get_response_model,
convert_component_output,
)
from haystack.dataclasses import Document


def deploy_pipeline_def(app, pipeline_def: PipelineDefinition):
Expand All @@ -18,46 +21,19 @@ def deploy_pipeline_def(app, pipeline_def: PipelineDefinition):

config = ConfigDict(arbitrary_types_allowed=True)

request_model = {}
for component_name, inputs in pipe.inputs().items():
# Inputs have this form:
# {
# 'first_addition': { <-- Component Name
# 'value': {'type': <class 'int'>, 'is_mandatory': True}, <-- Input
# 'add': {'type': typing.Optional[int], 'is_mandatory': False, 'default_value': None}, <-- Input
# },
# 'second_addition': {'add': {'type': typing.Optional[int], 'is_mandatory': False}},
# }
component_model = {}
for name, typedef in inputs.items():
component_model[name] = (typedef["type"], typedef.get("default_value", ...))
request_model[component_name] = (create_model('ComponentParams', **component_model, __config__=config), ...)

PipelineRunRequest = create_model(f'{pipeline_def.name.capitalize()}RunRequest', **request_model, __config__=config)

response_model = {}
for component_name, outputs in pipe.outputs().items():
# Outputs have this form:
# {
# 'second_addition': { <-- Component Name
# 'result': {'type': "<class 'int'>"} <-- Output
# },
# }
component_model = {}
for name, typedef in outputs.items():
component_model[name] = (typedef["type"], ...)
response_model[component_name] = (create_model('ComponentParams', **component_model, __config__=config), ...)

PipelineRunResponse = create_model(
f'{pipeline_def.name.capitalize()}RunResponse', **response_model, __config__=config
)
PipelineRunRequest = get_request_model(pipeline_def.name, pipe.inputs())
PipelineRunResponse = get_response_model(pipeline_def.name, pipe.outputs())

# There's no way in FastAPI to define the type of the request body other than annotating
# the endpoint handler. We have to ignore the type here to make FastAPI happy while
# silencing static type checkers (that would have good reasons to trigger!).
async def pipeline_run(pipeline_run_req: PipelineRunRequest) -> JSONResponse: # type: ignore
output = pipe.run(data=pipeline_run_req.dict())
return JSONResponse(PipelineRunResponse(**output).model_dump(), status_code=200)
result = pipe.run(data=pipeline_run_req.dict())
final_output = {}
for component_name, output in result.items():
final_output[component_name] = convert_component_output(output)

return JSONResponse(PipelineRunResponse(**final_output).model_dump(), status_code=200)

app.add_api_route(
path=f"/{pipeline_def.name}",
Expand Down

0 comments on commit 81cb0e3

Please sign in to comment.