Skip to content

Commit 02afc01

Browse files
committed
change reduce signature
Signed-off-by: Sidhant Kohli <[email protected]>
1 parent 6b56383 commit 02afc01

File tree

8 files changed

+169
-55
lines changed

8 files changed

+169
-55
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pre-commit install
4545
- [Implement User Defined Source Transformers](https://github.com/numaproj/numaflow-python/tree/main/examples/sourcetransform)
4646
- Implement User Defined Functions
4747
- [Map](https://github.com/numaproj/numaflow-python/tree/main/examples/map)
48-
- [Reduce](https://github.com/numaproj/numaflow-python/tree/main/examples/reduce/counter)
48+
- [Reduce](https://github.com/numaproj/numaflow-python/tree/main/examples/reduce)
4949
- [Map Stream](https://github.com/numaproj/numaflow-python/tree/main/examples/mapstream)
5050
- [Implement User Defined Sinks](https://github.com/numaproj/numaflow-python/tree/main/examples/sink)
5151
- [Implement User Defined SideInputs](https://github.com/numaproj/numaflow-python/tree/main/examples/sideinput)

examples/reduce/README.md

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Reducer in Python
2+
3+
For creating a reducer UDF we can use two different approaches:
4+
- Class based reducer
5+
- For the class based reducer we need to implement a class that inherits from the `Reducer` class and implements the required methods.
6+
- Next we need to create a `ReduceAsyncServer` instance and pass the reducer class to it along with any input args or
7+
kwargs that the custom reducer class requires.
8+
- Finally we need to call the `start` method on the `ReduceAsyncServer` instance to start the reducer server.
9+
```python
10+
from numaflow import Reducer, ReduceAsyncServer
11+
class Example(Reducer):
12+
def __init__(self, counter):
13+
self.counter = counter
14+
15+
async def handler(
16+
self, keys: list[str], datums: AsyncIterable[Datum], md: Metadata
17+
) -> Messages:
18+
interval_window = md.interval_window
19+
self.counter = 0
20+
async for _ in datums:
21+
self.counter += 1
22+
msg = (
23+
f"counter:{self.counter} interval_window_start:{interval_window.start} "
24+
f"interval_window_end:{interval_window.end}"
25+
)
26+
return Messages(Message(str.encode(msg), keys=keys))
27+
28+
if __name__ == "__main__":
29+
# Here we are using the class instance as the reducer_instance
30+
# which will be used to invoke the handler function.
31+
# We are passing the init_args for the class instance.
32+
grpc_server = ReduceAsyncServer(Example, init_args=(0,))
33+
grpc_server.start()
34+
```
35+
36+
- Function based reducer
37+
For the function based reducer we need to create a function of the signature
38+
```python
39+
async def handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
40+
```
41+
that takes the required arguments and returns the `Messages` object.
42+
- Next we need to create a `ReduceAsyncServer` instance and pass the function to it along with any input args or kwargs that the custom reducer function requires.
43+
- Finally we need to call the `start` method on the `ReduceAsyncServer` instance to start the reducer server.
44+
- We must ensure that no init_args or init_kwargs are passed to the `ReduceAsyncServer` instance as they are not used for function based reducers.
45+
```python
46+
from numaflow import ReduceAsyncServer
47+
async def handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
48+
counter = 0
49+
interval_window = md.interval_window
50+
async for _ in datums:
51+
counter += 1
52+
msg = (
53+
f"counter:{counter} interval_window_start:{interval_window.start} "
54+
f"interval_window_end:{interval_window.end}"
55+
)
56+
return Messages(Message(str.encode(msg), keys=keys))
57+
58+
if __name__ == "__main__":
59+
# Here we are using the function as the reducer_instance
60+
# which will be used to invoke the handler function.
61+
grpc_server = ReduceAsyncServer(handler)
62+
grpc_server.start()
63+
```
64+
65+

examples/reduce/counter/example.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66

77
class Example(Reducer):
8-
def __init__(self):
9-
self.counter = 0
8+
def __init__(self, counter):
9+
self.counter = counter
1010

1111
async def handler(
1212
self, keys: list[str], datums: AsyncIterable[Datum], md: Metadata
@@ -37,7 +37,11 @@ async def reduce_handler(keys: list[str], datums: AsyncIterable[Datum], md: Meta
3737
if __name__ == "__main__":
3838
invoke = os.getenv("INVOKE", "handler")
3939
if invoke == "class":
40-
grpc_server = ReduceAsyncServer(Example())
40+
# Here we are using the class instance as the reducer_instance
41+
# which will be used to invoke the handler function.
42+
# We are passing the init_args for the class instance.
43+
grpc_server = ReduceAsyncServer(Example, init_args=(0,))
4144
else:
45+
# Here we are using the handler function directly as the reducer_instance.
4246
grpc_server = ReduceAsyncServer(reduce_handler)
4347
grpc_server.start()

pynumaflow/reducer/_dtypes.py

+25-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from abc import ABCMeta, abstractmethod
22
from asyncio import Task
33
from collections.abc import Iterator, Sequence, Awaitable
4-
from copy import deepcopy
54
from dataclasses import dataclass
65
from datetime import datetime
76
from typing import TypeVar, Callable, Union
@@ -234,6 +233,9 @@ def keys(self) -> list[str]:
234233
return self._key
235234

236235

236+
ReduceAsyncCallable = Callable[[list[str], AsyncIterable[Datum], Metadata], Awaitable[Messages]]
237+
238+
237239
class Reducer(metaclass=ABCMeta):
238240
"""
239241
Provides an interface to write a Reducer
@@ -247,17 +249,6 @@ def __call__(self, *args, **kwargs):
247249
"""
248250
return self.handler(*args, **kwargs)
249251

250-
def __deepcopy__(self, memo):
251-
"""
252-
Allow to deepcopy the class instance.
253-
"""
254-
cls = self.__class__
255-
result = cls.__new__(cls)
256-
memo[id(self)] = result
257-
for k, v in self.__dict__.items():
258-
setattr(result, k, deepcopy(v, memo))
259-
return result
260-
261252
@abstractmethod
262253
async def handler(
263254
self, keys: list[str], datums: AsyncIterable[Datum], md: Metadata
@@ -268,6 +259,26 @@ async def handler(
268259
pass
269260

270261

271-
ReduceAsyncCallable = Callable[[list[str], AsyncIterable[Datum], Metadata], Awaitable[Messages]]
262+
class ReduceBuilderClass:
263+
"""
264+
Class to build a Reducer class instance.
265+
Args:
266+
reducer_class: the reducer class to be used for Reduce UDF
267+
args: the arguments to be passed to the reducer class
268+
kwargs: the keyword arguments to be passed to the reducer class
269+
"""
270+
271+
def __init__(self, reducer_class: type[Reducer], args: tuple, kwargs: dict):
272+
self._reducer_class: type[Reducer] = reducer_class
273+
self._args = args
274+
self._kwargs = kwargs
275+
276+
def create(self) -> Reducer:
277+
"""
278+
Create a new Reducer instance.
279+
"""
280+
return self._reducer_class(*self._args, **self._kwargs)
281+
282+
272283
# ReduceCallable is a callable which can be used as a handler for the Reduce UDF.
273-
ReduceCallable = Union[ReduceAsyncCallable, Reducer]
284+
ReduceCallable = Union[ReduceAsyncCallable, type[Reducer]]

pynumaflow/reducer/async_server.py

+33-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import inspect
2+
13
import aiorun
24
import grpc
35

@@ -12,19 +14,44 @@
1214
_LOGGER,
1315
)
1416

15-
from pynumaflow.reducer._dtypes import ReduceCallable
17+
from pynumaflow.reducer._dtypes import (
18+
ReduceCallable,
19+
ReduceBuilderClass,
20+
Reducer,
21+
)
1622

1723
from pynumaflow.shared.server import NumaflowServer, start_async_server
1824

1925

26+
def get_handler(reducer_handler: ReduceCallable, init_args: tuple = (), init_kwargs: dict = None):
27+
"""
28+
Get the correct handler type based on the arguments passed
29+
"""
30+
if inspect.isfunction(reducer_handler):
31+
if len(init_args) > 0 or len(init_kwargs) > 0:
32+
# if the init_args or init_kwargs are passed, then the reducer_handler
33+
# can only be of class Reducer type
34+
raise TypeError("Cannot pass function handler with init args or kwargs")
35+
# return the function handler
36+
return reducer_handler
37+
elif issubclass(reducer_handler, Reducer):
38+
# if handler is type of Class Reducer, create a new instance of
39+
# a ReducerBuilderClass
40+
return ReduceBuilderClass(reducer_handler, init_args, init_kwargs)
41+
else:
42+
raise TypeError("Invalid type passed")
43+
44+
2045
class ReduceAsyncServer(NumaflowServer):
2146
"""
2247
Class for a new Reduce Server instance.
2348
"""
2449

2550
def __init__(
2651
self,
27-
reducer_instance: ReduceCallable,
52+
reducer_handler: ReduceCallable,
53+
init_args: tuple = (),
54+
init_kwargs: dict = None,
2855
sock_path=REDUCE_SOCK_PATH,
2956
max_message_size=MAX_MESSAGE_SIZE,
3057
max_threads=MAX_THREADS,
@@ -41,7 +68,9 @@ def __init__(
4168
defaults to number of processors x4
4269
server_type: The type of server to be used
4370
"""
44-
self.reducer_instance: ReduceCallable = reducer_instance
71+
if init_kwargs is None:
72+
init_kwargs = {}
73+
self.reducer_handler = get_handler(reducer_handler, init_args, init_kwargs)
4574
self.sock_path = f"unix://{sock_path}"
4675
self.max_message_size = max_message_size
4776
self.max_threads = max_threads
@@ -51,7 +80,7 @@ def __init__(
5180
("grpc.max_receive_message_length", self.max_message_size),
5281
]
5382
# Get the servicer instance for the async server
54-
self.servicer = AsyncReduceServicer(reducer_instance)
83+
self.servicer = AsyncReduceServicer(self.reducer_handler)
5584

5685
def start(self):
5786
"""

pynumaflow/reducer/servicer/async_servicer.py

+16-8
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import asyncio
2-
from copy import deepcopy
32

43
from datetime import datetime, timezone
54
from collections.abc import AsyncIterable
5+
from typing import Union
66

77
import grpc
88
from google.protobuf import empty_pb2 as _empty_pb2
@@ -13,8 +13,14 @@
1313
STREAM_EOF,
1414
DELIMITER,
1515
)
16-
from pynumaflow.reducer._dtypes import Datum, IntervalWindow, Metadata, Reducer
17-
from pynumaflow.reducer._dtypes import ReduceResult, ReduceCallable
16+
from pynumaflow.reducer._dtypes import (
17+
Datum,
18+
IntervalWindow,
19+
Metadata,
20+
ReduceAsyncCallable,
21+
ReduceBuilderClass,
22+
)
23+
from pynumaflow.reducer._dtypes import ReduceResult
1824
from pynumaflow.reducer.servicer.asynciter import NonBlockingIterator
1925
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
2026
from pynumaflow.types import NumaflowServicerContext
@@ -43,13 +49,14 @@ class AsyncReduceServicer(reduce_pb2_grpc.ReduceServicer):
4349

4450
def __init__(
4551
self,
46-
handler: ReduceCallable,
52+
handler: Union[ReduceAsyncCallable, ReduceBuilderClass],
4753
):
4854
# Collection for storing strong references to all running tasks.
4955
# Event loop only keeps a weak reference, which can cause it to
5056
# get lost during execution.
5157
self.background_tasks = set()
52-
self.__reduce_handler: ReduceCallable = handler
58+
# The reduce handler can be a function or a builder class instance.
59+
self.__reduce_handler: Union[ReduceAsyncCallable, ReduceBuilderClass] = handler
5360

5461
async def ReduceFn(
5562
self,
@@ -143,11 +150,12 @@ async def __invoke_reduce(
143150
self, keys: list[str], request_iterator: AsyncIterable[Datum], md: Metadata
144151
):
145152
new_instance = self.__reduce_handler
146-
# If the reduce handler is a class instance, create a new copy of it.
153+
# If the reduce handler is a class instance, create a new instance of it.
147154
# It is required for a new key to be processed by a
148155
# new instance of the reducer for a given window
149-
if isinstance(self.__reduce_handler, Reducer):
150-
new_instance = deepcopy(self.__reduce_handler)
156+
# Otherwise the function handler can be called directly
157+
if isinstance(self.__reduce_handler, ReduceBuilderClass):
158+
new_instance = self.__reduce_handler.create()
151159
try:
152160
msgs = await new_instance(keys, request_iterator, md)
153161
except Exception as err:

tests/reduce/test_async_reduce.py

+21-24
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,6 @@
2828

2929
LOGGER = setup_logging(__name__)
3030

31-
# if set to true, map handler will raise a `ValueError` exception.
32-
raise_error_from_map = False
33-
34-
35-
async def async_reduce_handler(
36-
keys: list[str], datums: AsyncIterable[Datum], md: Metadata
37-
) -> Messages:
38-
interval_window = md.interval_window
39-
counter = 0
40-
async for _ in datums:
41-
counter += 1
42-
msg = (
43-
f"counter:{counter} interval_window_start:{interval_window.start} "
44-
f"interval_window_end:{interval_window.end}"
45-
)
46-
47-
return Messages(Message(str.encode(msg), keys=keys))
48-
4931

5032
def request_generator(count, request, resetkey: bool = False):
5133
for i in range(count):
@@ -80,8 +62,8 @@ def startup_callable(loop):
8062

8163

8264
class ExampleClass(Reducer):
83-
def __init__(self):
84-
self.counter = 0
65+
def __init__(self, counter):
66+
self.counter = counter
8567

8668
async def handler(
8769
self, keys: list[str], datums: AsyncIterable[Datum], md: Metadata
@@ -97,10 +79,20 @@ async def handler(
9779
return Messages(Message(str.encode(msg), keys=keys))
9880

9981

100-
def NewAsyncReducer(
101-
reduce_handler=async_reduce_handler,
102-
):
103-
server_instance = ReduceAsyncServer(reducer_instance=ExampleClass())
82+
async def err_handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
83+
interval_window = md.interval_window
84+
counter = 0
85+
async for _ in datums:
86+
counter += 1
87+
msg = (
88+
f"counter:{counter} interval_window_start:{interval_window.start} "
89+
f"interval_window_end:{interval_window.end}"
90+
)
91+
return Messages(Message(str.encode(msg), keys=keys))
92+
93+
94+
def NewAsyncReducer():
95+
server_instance = ReduceAsyncServer(ExampleClass, init_args=(0,))
10496
udfs = server_instance.servicer
10597

10698
return udfs
@@ -240,8 +232,13 @@ def __stub(self):
240232
return reduce_pb2_grpc.ReduceStub(_channel)
241233

242234
def test_error_init(self):
235+
# Check that reducer_handler in required
243236
with self.assertRaises(TypeError):
244237
ReduceAsyncServer()
238+
# Check that the init_args and init_kwargs are passed
239+
# only with a Reducer class
240+
with self.assertRaises(TypeError):
241+
ReduceAsyncServer(err_handler, init_args=(0, 1))
245242

246243

247244
if __name__ == "__main__":

tests/reduce/test_async_reduce_err.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ async def err_handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadat
7373

7474

7575
def NewAsyncReducer():
76-
server_instance = ReduceAsyncServer(reducer_instance=err_handler)
76+
server_instance = ReduceAsyncServer(err_handler)
7777
udfs = server_instance.servicer
7878

7979
return udfs

0 commit comments

Comments
 (0)