Skip to content

Commit

Permalink
feat: remove old schema (#102)
Browse files Browse the repository at this point in the history
* feat: remove old schema
  • Loading branch information
debajyoti-truefoundry authored Nov 29, 2024
1 parent 6833eb0 commit 1661f6e
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 98 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pip install "async_processor[sqs]"
#### `app.py`
```python
from async_processor import (
InputMessage,
InputMessageInterface,
Processor,
WorkerConfig,
SQSInputConfig,
Expand All @@ -20,8 +20,8 @@ from async_processor import (


class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
body = input_message.body
def process(self, input_message: InputMessageInterface) -> int:
body = input_message.get_body()
return body["x"] * body["y"]


Expand Down Expand Up @@ -77,7 +77,7 @@ Output:
import json
import uuid

from async_processor import InputMessage, OutputMessage, ProcessStatus
from async_processor import InputMessageV2, OutputMessage, ProcessStatus
import boto3


Expand All @@ -88,7 +88,7 @@ def send_request(input_sqs_url: str, output_sqs_url: str):
sqs.send_message(
QueueUrl=input_sqs_url,
MessageBody=json.dumps(
InputMessage(request_id=request_id, body={"x": 1, "y": 2}).dict()
InputMessageV2(tfy_request_id=request_id, x=1, y=2).dict()
),
)

Expand Down
2 changes: 0 additions & 2 deletions async_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
CoreNATSOutputConfig,
Input,
InputConfig,
InputMessage,
InputMessageInterface,
InputMessageV2,
KafkaInputConfig,
Expand All @@ -28,7 +27,6 @@
__all__ = [
"ProcessorApp",
"Processor",
"InputMessage",
"NATSInputConfig",
"NATSOutputConfig",
"OutputMessage",
Expand Down
6 changes: 3 additions & 3 deletions async_processor/function_service/async_function_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)
from async_processor.processor import Processor
from async_processor.types import (
InputMessage,
InputMessageInterface,
OutputMessage,
OutputMessageFetchTimeoutError,
WorkerConfig,
Expand Down Expand Up @@ -104,8 +104,8 @@ def init(self):
if init_function:
init_function()

def process(self, input_message: InputMessage) -> int:
body = input_message.body
def process(self, input_message: InputMessageInterface) -> int:
body = input_message.get_body()
func_name = body.pop(INTERNAL_FUNCTION_NAME, None)
if func_name is None:
raise ValueError(
Expand Down
4 changes: 2 additions & 2 deletions async_processor/function_service/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Callable, Dict

from async_processor.pydantic_v1 import BaseModel, create_model
from async_processor.types import Input, InputMessage
from async_processor.types import Input, InputMessageV2

INTERNAL_FUNCTION_NAME = "internal_func_name"

Expand Down Expand Up @@ -72,7 +72,7 @@ async def send_request_to_queue(
):
my_dict = input.dict()
my_dict[INTERNAL_FUNCTION_NAME] = input.__class__.__name__
input_message = InputMessage(request_id=request_id, body=my_dict)
input_message = InputMessageV2(tfy_request_id=request_id, **my_dict)

await input_publisher.publish_input_message(
request_id=request_id,
Expand Down
10 changes: 4 additions & 6 deletions async_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

from async_processor.app import ProcessorApp
from async_processor.logger import logger
from async_processor.pydantic_v1 import ValidationError
from async_processor.types import (
InputMessage,
InputMessageInterface,
InputMessageV2,
OutputMessage,
Expand Down Expand Up @@ -39,10 +37,10 @@ def input_deserializer(
f"Expected dict, got {type(input_message)}"
)
logger.debug(f"Deserializing input message: {input_message!r}")
try:
return InputMessage(**input_message)
except ValidationError:
return InputMessageV2(**input_message)

## This is really risky as this can fail if `input_message` is hitting any pydantic internal fields.
## MAKE THIS SAFE!!!
return InputMessageV2(**input_message)

def output_serializer(self, output_message: OutputMessage) -> bytes:
logger.debug(f"Serializing output message: {output_message!r}")
Expand Down
42 changes: 1 addition & 41 deletions async_processor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,44 +47,6 @@ def get_request_method(self) -> Optional[str]:
...


class InputMessage(InputMessageInterface):
request_id: constr(min_length=1)
body: Any
published_at_epoch_ns: Optional[int] = None
stream_response: bool = False
request_headers: Optional[Dict[str, Union[str, List[str]]]] = None
request_url_path: Optional[str] = None
request_method: Optional[str] = None

class Config:
extra = Extra.forbid

def get_request_id(self) -> Optional[str]:
return self.request_id

def get_published_at_epoch_ns(self) -> Optional[int]:
return self.published_at_epoch_ns

# TODO: this method is only here for sidecar
# move this logic to sidecar module
def get_body(self) -> Any:
return self.body

def should_stream_response(self) -> bool:
return self.stream_response

def get_request_headers(self) -> Optional[Dict[str, Union[str, List[str]]]]:
return self.request_headers

def get_request_url_path(self) -> Optional[str]:
return self.request_url_path

def get_request_method(self) -> Optional[str]:
return self.request_method


# We cannot maintain two different types and should remove `InputMessage`
# after sometime
class InputMessageV2(InputMessageInterface):
tfy_request_id: Optional[constr(regex=r"^[a-zA-Z0-9\-]{1,36}$")] = None
tfy_published_at_epoch_ns: Optional[int] = None
Expand All @@ -102,8 +64,6 @@ def get_request_id(self) -> Optional[str]:
def get_published_at_epoch_ns(self) -> Optional[int]:
return self.tfy_published_at_epoch_ns

# TODO: this method is only here for sidecar
# move this logic to sidecar module
def get_body(self) -> Dict:
body = self.dict()

Expand Down Expand Up @@ -131,7 +91,7 @@ class OutputMessage(BaseModel):
status: ProcessStatus
body: Optional[Any] = None
error: Optional[str] = None
input_message: Optional[Union[InputMessage, InputMessageV2]] = None
input_message: Optional[InputMessageV2] = None

# these are experimental fields
status_code: Optional[str] = None
Expand Down
6 changes: 3 additions & 3 deletions examples/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from async_processor import (
AMQPInputConfig,
AMQPOutputConfig,
InputMessage,
InputMessageInterface,
Processor,
WorkerConfig,
)
Expand All @@ -22,8 +22,8 @@


class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
body = input_message.body
def process(self, input_message: InputMessageInterface) -> int:
body = input_message.get_body()
return body["x"] * body["y"]


Expand Down
14 changes: 10 additions & 4 deletions tests/dummy_input_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@

import orjson

from async_processor import Input, InputConfig, InputMessage, Output, OutputConfig
from async_processor import (
Input,
InputConfig,
InputMessageInterface,
Output,
OutputConfig,
)


class DummyInputConfig(InputConfig):
messages: List[InputMessage]
messages: List[InputMessageInterface]

def to_input(self) -> Input:
return DummyInput(self)
Expand All @@ -37,7 +43,7 @@ async def publish_input_message(


class _Result(NamedTuple):
request_id: str
request_id: Optional[str]
serialized_output_message: bytes


Expand All @@ -46,7 +52,7 @@ def __init__(self, config: DummyOutputConfig):
self._config = config

async def publish_output_message(
self, serialized_output_message: bytes, request_id: str
self, serialized_output_message: bytes, request_id: str | None
):
self._config.results.append(
_Result(
Expand Down
6 changes: 1 addition & 5 deletions tests/test_processor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from async_processor import InputMessage, InputMessageV2
from async_processor import InputMessageV2
from async_processor.processor import BaseProcessor


def test_default_input_deserializer():
p = BaseProcessor()

assert p.input_deserializer(
serialized_input_message='{"request_id": "a", "body": "a"}'
) == InputMessage(request_id="a", body="a")

assert p.input_deserializer(
serialized_input_message='{"a": "b"}'
) == InputMessageV2(a="b")
4 changes: 1 addition & 3 deletions tests/test_types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from async_processor import InputMessage, InputMessageV2
from async_processor import InputMessageV2


def test_input_message_get_body():
assert InputMessage(request_id="a", body="b").get_body() == "b"

assert InputMessageV2(tfy_request_id="a", foo="bar", baz=[1, 2]).get_body() == {
"foo": "bar",
"baz": [1, 2],
Expand Down
Loading

0 comments on commit 1661f6e

Please sign in to comment.