Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: class based refactor for SDK #129

Merged
merged 78 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
e4df766
refactor init
kohlisid Jan 4, 2024
382bdf2
init
kohlisid Jan 4, 2024
87420ca
move infor
kohlisid Jan 4, 2024
136da24
move infor
kohlisid Jan 4, 2024
58b6496
await change
kohlisid Jan 4, 2024
83d366a
await change
kohlisid Jan 4, 2024
ceee882
change to create task
kohlisid Jan 4, 2024
b828204
change to create task
kohlisid Jan 4, 2024
837de07
ev loop
kohlisid Jan 5, 2024
c4f0376
cleanup
kohlisid Jan 5, 2024
b385e25
cleanup
kohlisid Jan 5, 2024
a22673b
cleanup
kohlisid Jan 5, 2024
fe52e9f
server on same thread
kohlisid Jan 5, 2024
e2c315d
server on same thread
kohlisid Jan 5, 2024
28b5c70
server on same thread
kohlisid Jan 5, 2024
da9d1db
Export class
kohlisid Jan 5, 2024
27967bf
Class refactor
kohlisid Jan 5, 2024
a0ac382
multiproc info fix
kohlisid Jan 6, 2024
05a2cfe
modular
kohlisid Jan 8, 2024
3f6049f
modular
kohlisid Jan 8, 2024
b642b17
modular
kohlisid Jan 8, 2024
40bb295
modular
kohlisid Jan 8, 2024
e3b7090
mapstream
kohlisid Jan 8, 2024
88bb474
mapstream
kohlisid Jan 8, 2024
e187f90
mapstream
kohlisid Jan 8, 2024
d7ff4a1
reduce
kohlisid Jan 8, 2024
32d2975
reduce
kohlisid Jan 9, 2024
3e3238a
sink
kohlisid Jan 9, 2024
e35b156
sink
kohlisid Jan 9, 2024
1ba673d
sink
kohlisid Jan 9, 2024
069d681
sink
kohlisid Jan 9, 2024
285a1ae
sink
kohlisid Jan 9, 2024
e86fbe1
proto
kohlisid Jan 9, 2024
98dd013
transform
kohlisid Jan 9, 2024
9e268f8
source
kohlisid Jan 9, 2024
b517b6c
source
kohlisid Jan 9, 2024
9c814c6
source
kohlisid Jan 9, 2024
111684a
source
kohlisid Jan 9, 2024
39788cd
source
kohlisid Jan 9, 2024
d7fe470
tests
kohlisid Jan 10, 2024
7b96725
source
kohlisid Jan 10, 2024
0a3c004
tests
kohlisid Jan 10, 2024
50fd5c3
tests
kohlisid Jan 10, 2024
c76de42
examples
kohlisid Jan 10, 2024
2aa9b30
examples
kohlisid Jan 10, 2024
49c8b8d
cleanup
kohlisid Jan 10, 2024
bfde535
lint
kohlisid Jan 10, 2024
ebecebf
lint
kohlisid Jan 11, 2024
b11d086
README
kohlisid Jan 11, 2024
b2e7be3
SideInput
kohlisid Jan 11, 2024
617f63a
tests
kohlisid Jan 11, 2024
9bb16a7
tests
kohlisid Jan 11, 2024
e4656d5
tests
kohlisid Jan 11, 2024
7c1b276
add uvloop
kohlisid Jan 11, 2024
74efb87
seperate mappers
kohlisid Jan 16, 2024
fe6ec20
seperate mappers
kohlisid Jan 16, 2024
0575d2d
seperate mapstream
kohlisid Jan 16, 2024
c1256f6
seperate reducer
kohlisid Jan 17, 2024
c7b8a84
seperate servers
kohlisid Jan 17, 2024
c0a025c
examples
kohlisid Jan 17, 2024
678ec60
examples
kohlisid Jan 17, 2024
cdb97c1
examples
kohlisid Jan 17, 2024
a25387f
examples
kohlisid Jan 17, 2024
dffdc5f
examples
kohlisid Jan 17, 2024
2742b56
examples
kohlisid Jan 17, 2024
6bf2c4a
examples
kohlisid Jan 17, 2024
6e44a6c
README
kohlisid Jan 17, 2024
cbf47bf
REDUCER INSTANCE
kohlisid Jan 18, 2024
d88ed5c
REDUCER INSTANCE
kohlisid Jan 18, 2024
3961f95
deep copy
kohlisid Jan 19, 2024
4065717
deep copy
kohlisid Jan 22, 2024
d2c844d
deep copy
kohlisid Jan 22, 2024
d3b59ff
lint
kohlisid Jan 22, 2024
6b56383
add reducer test
kohlisid Jan 22, 2024
02afc01
change reduce signature
kohlisid Jan 23, 2024
d8bcefe
comments
kohlisid Jan 23, 2024
d8ce1ce
comments
kohlisid Jan 24, 2024
59d6550
comments
kohlisid Jan 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,4 @@ coverage:

ignore:
- "examples/"
- "pynumaflow/mapper/proto/*"
- "pynumaflow/sinker/proto/*"
- "pynumaflow/mapstreamer/proto/*"
- "pynumaflow/reducer/proto/*"
- "pynumaflow/sourcetransformer/proto/*"
- "pynumaflow/sideinput/proto/*"
- "pynumaflow/sourcer/proto/*"
- "pynumaflow/proto/*"
12 changes: 8 additions & 4 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ source = pynumaflow
omit =
pynumaflow/tests/*
examples/*
pynumaflow/proto/*
pynumaflow/shared/server.py

[report]
exclude_lines =
def start
def start_async
def __serve_async
def start_multiproc
def sync_server_start
def _run_server
def start_multiproc_server
async def start_async_server
def _reserve_port
if os.getenv("PYTHONDEBUG"):
_LOGGER.setLevel(logging.DEBUG)
def exec_multiproc
def exec
async def aexec
16 changes: 8 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ setup:


proto:
python3 -m grpc_tools.protoc -I=pynumaflow/sinker/proto --python_out=pynumaflow/sinker/proto --grpc_python_out=pynumaflow/sinker/proto pynumaflow/sinker/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/mapper/proto --python_out=pynumaflow/mapper/proto --grpc_python_out=pynumaflow/mapper/proto pynumaflow/mapper/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/mapstreamer/proto --python_out=pynumaflow/mapstreamer/proto --grpc_python_out=pynumaflow/mapstreamer/proto pynumaflow/mapstreamer/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/reducer/proto --python_out=pynumaflow/reducer/proto --grpc_python_out=pynumaflow/reducer/proto pynumaflow/reducer/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sourcetransformer/proto --python_out=pynumaflow/sourcetransformer/proto --grpc_python_out=pynumaflow/sourcetransformer/proto pynumaflow/sourcetransformer/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sideinput/proto --python_out=pynumaflow/sideinput/proto --grpc_python_out=pynumaflow/sideinput/proto pynumaflow/sideinput/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sourcer/proto --python_out=pynumaflow/sourcer/proto --grpc_python_out=pynumaflow/sourcer/proto pynumaflow/sourcer/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sinker --python_out=pynumaflow/proto/sinker --grpc_python_out=pynumaflow/proto/sinker pynumaflow/proto/sinker/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/mapper --python_out=pynumaflow/proto/mapper --grpc_python_out=pynumaflow/proto/mapper pynumaflow/proto/mapper/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/mapstreamer --python_out=pynumaflow/proto/mapstreamer --grpc_python_out=pynumaflow/proto/mapstreamer pynumaflow/proto/mapstreamer/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/reducer --python_out=pynumaflow/proto/reducer --grpc_python_out=pynumaflow/proto/reducer pynumaflow/proto/reducer/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sourcetransformer --python_out=pynumaflow/proto/sourcetransformer --grpc_python_out=pynumaflow/proto/sourcetransformer pynumaflow/proto/sourcetransformer/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sideinput --python_out=pynumaflow/proto/sideinput --grpc_python_out=pynumaflow/proto/sideinput pynumaflow/proto/sideinput/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sourcer --python_out=pynumaflow/proto/sourcer --grpc_python_out=pynumaflow/proto/sourcer pynumaflow/proto/sourcer/*.proto


sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/*/proto/*.py
sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/proto/*/*.py
177 changes: 98 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
[![Release Version](https://img.shields.io/github/v/release/numaproj/numaflow-python?label=pynumaflow)](https://github.com/numaproj/numaflow-python/releases/latest)

This is the Python SDK for [Numaflow](https://numaflow.numaproj.io/).

This SDK provides the interface for writing [UDFs](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/)
and [UDSinks](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/) in Python.
This SDK provides the interface for writing different functionalities of Numaflow like [UDFs](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/), [UDSinks](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/), [UDSources](https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/) and [SideInput](https://numaflow.numaproj.io/specifications/side-inputs/) in Python.

## Installation

Expand Down Expand Up @@ -40,100 +40,119 @@ Setup [pre-commit](https://pre-commit.com/) hooks:
pre-commit install
```

## Implement a User Defined Function (UDF)
## Implementing different functionalities
- [Implement User Defined Sources](https://github.com/numaproj/numaflow-python/tree/main/examples/source)
- [Implement User Defined Source Transformers](https://github.com/numaproj/numaflow-python/tree/main/examples/sourcetransform)
- Implement User Defined Functions
- [Map](https://github.com/numaproj/numaflow-python/tree/main/examples/map)
- [Reduce](https://github.com/numaproj/numaflow-python/tree/main/examples/reduce)
- [Map Stream](https://github.com/numaproj/numaflow-python/tree/main/examples/mapstream)
- [Implement User Defined Sinks](https://github.com/numaproj/numaflow-python/tree/main/examples/sink)
- [Implement User Defined SideInputs](https://github.com/numaproj/numaflow-python/tree/main/examples/sideinput)

## Server Types

### Map
There are different types of gRPC server mechanisms which can be used to serve the UDFs, UDSinks and UDSource.
These have different functionalities and are used for different use cases.

```python
from pynumaflow.mapper import Messages, Message, Datum, Mapper
Currently we support the following server types:
- Sync Server
- Asyncronous Server
- MultiProcessing Server

Not all of the above are supported for all UDFs, UDSource and UDSinks.

def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
return Messages(Message(value=val, keys=keys))
For each of the UDFs, UDSource and UDSinks, there are seperate classes for each of the server types.
This helps in keeping the interface simple and easy to use, and the user can start the specific server type based
on the use case.


if __name__ == "__main__":
grpc_server = Mapper(handler=my_handler)
grpc_server.start()
```
### SourceTransformer - Map with event time assignment capability
In addition to the regular Map function, SourceTransformer supports assigning a new event time to the message.
SourceTransformer is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.

```python
from datetime import datetime
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformer


def transform_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
new_event_time = datetime.now()
_ = datum.watermark
message_t_s = Messages(Message(val, event_time=new_event_time, keys=keys))
return message_t_s
#### SyncServer

Syncronous Server is the simplest server type. It is a multithreaded threaded server which can be used for simple UDFs and UDSinks.
Here the server will invoke the handler function for each message. The messaging is synchronous and the server will wait
for the handler to return before processing the next message.

if __name__ == "__main__":
grpc_server = SourceTransformer(handler=transform_handler)
grpc_server.start()
```

### Reduce

```python
import aiorun
from typing import Iterator, List
from pynumaflow.reducer import Messages, Message, Datum, Metadata, AsyncReducer


async def my_handler(
keys: List[str], datums: Iterator[Datum], md: Metadata
) -> Messages:
interval_window = md.interval_window
counter = 0
async for _ in datums:
counter += 1
msg = (
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(str.encode(msg), keys))


if __name__ == "__main__":
grpc_server = AsyncReducer(handler=my_handler)
aiorun.run(grpc_server.start())
grpc_server = MapServer(handler)
```

### Sample Image
A sample UDF [Dockerfile](examples/map/forward_message/Dockerfile) is provided
under [examples](examples/map/forward_message).
#### AsyncServer

## Implement a User Defined Sink (UDSink)
Asyncronous Server is a multi threaded server which can be used for UDFs which are asyncronous. Here we utilize the asyncronous capabilities of Python to process multiple messages in parallel. The server will invoke the handler function for each message. The messaging is asyncronous and the server will not wait for the handler to return before processing the next message. Thus this server type is useful for UDFs which are asyncronous.
The handler function for such a server should be an async function.

```python
from typing import Iterator
from pynumaflow.sinker import Datum, Responses, Response, Sinker
```
grpc_server = MapAsyncServer(handler)
```

#### MultiProcessServer

def my_handler(datums: Iterator[Datum]) -> Responses:
responses = Responses()
for msg in datums:
print("User Defined Sink", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
return responses
MultiProcess Server is a multi process server which can be used for UDFs which are CPU intensive. Here we utilize the multi process capabilities of Python to process multiple messages in parallel by forking multiple servers in different processes.
The server will invoke the handler function for each message. Individually at the server level the messaging is synchronous and the server will wait for the handler to return before processing the next message. But since we have multiple servers running in parallel, the overall messaging also executes in parallel.

This could be an alternative to creating multiple replicas of the same UDF container as here we are using the multi processing capabilities of the system to process multiple messages in parallel but within the same container.

if __name__ == "__main__":
grpc_server = Sinker(my_handler)
grpc_server.start()
Thus this server type is useful for UDFs which are CPU intensive.
```
grpc_server = MapMultiProcServer(handler)
```

### Sample Image

A sample UDSink [Dockerfile](examples/sink/log/Dockerfile) is provided
under [examples](examples/sink/log).
#### Currently Supported Server Types for each functionality

These are the class names for the server types supported by each of the functionalities.

- UDFs
- Map
- MapServer
- MapAsyncServer
- MapMultiProcServer
- Reduce
- ReduceAsyncServer
- MapStream
- MapStreamAsyncServer
- Source Transform
- SourceTransformServer
- SourceTransformMultiProcServer
- UDSource
- SourceServer
- SourceAsyncServer
- UDSink
- SinkServer
- SinkAsyncServer
- SideInput
- SideInputServer




### Handler Function and Classes

All the server types take a instance of a handler class or a handler function as an argument.
The handler function or class is the function or class which implements the functionality of the UDF, UDSource or UDSink.
For ease of use the user can pass either of the two to the server and the server will handle the rest.

The handler for each of the servers has a specific signature which is defined by the server type and the implentation of the handlers
should follow the same signature.

For using the class based handlers the user can inherit from the base handler class for each of the functionalities and implement the handler function.
The base handler class for each of the functionalities has the same signature as the handler function for the respective server type.
The list of base handler classes for each of the functionalities is given below -
- UDFs
- Map
- Mapper
- Reduce
- Reducer
- MapStream
- MapStreamer
- Source Transform
- SourceTransformer
- UDSource
- Sourcer
- UDSink
- Sinker
- SideInput
- SideInput

More details about the signature of the handler function for each of the server types is given in the
documentation of the respective server type.
16 changes: 7 additions & 9 deletions examples/developer_guide/example.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
import aiorun
from collections.abc import Iterator
from collections.abc import AsyncIterable


from pynumaflow.reducer import (
Messages,
Message,
Datum,
Metadata,
AsyncReducer,
ReduceAsyncServer,
)


async def my_handler(keys: list[str], datums: Iterator[Datum], md: Metadata) -> Messages:
# count the number of events
async def reduce_handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
interval_window = md.interval_window
counter = 0
async for _ in datums:
counter += 1

msg = (
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(keys=keys, value=str.encode(msg)))
return Messages(Message(str.encode(msg), keys=keys))


if __name__ == "__main__":
grpc_server = AsyncReducer(handler=my_handler)
aiorun.run(grpc_server.start())
grpc_server = ReduceAsyncServer(reduce_handler)
grpc_server.start()
11 changes: 9 additions & 2 deletions examples/map/even_odd/example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pynumaflow.mapper import Messages, Message, Datum, Mapper
from pynumaflow.mapper import Messages, Message, Datum, MapServer


def my_handler(keys: list[str], datum: Datum) -> Messages:
Expand All @@ -22,5 +22,12 @@ def my_handler(keys: list[str], datum: Datum) -> Messages:


if __name__ == "__main__":
grpc_server = Mapper(handler=my_handler)
"""
This example shows how to create a simple map function that takes in a
number and outputs it to the "even" or "odd" key depending on whether it
is even or odd.
We use a function as handler, but a class that implements
a Mapper can be used as well.
"""
grpc_server = MapServer(my_handler)
grpc_server.start()
2 changes: 1 addition & 1 deletion examples/map/even_odd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
pynumaflow = "~0.6.0"
pynumaflow = "~0.7.0"

[tool.poetry.dev-dependencies]

Expand Down
36 changes: 23 additions & 13 deletions examples/map/flatmap/example.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
from pynumaflow.mapper import Messages, Message, Datum, Mapper
from pynumaflow.mapper import Messages, Message, Datum, MapServer, Mapper


def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
messages = Messages()
if len(strs) == 0:
messages.append(Message.to_drop())
class Flatmap(Mapper):
"""
This is a class that inherits from the Mapper class.
It implements the handler method that is called for each datum.
"""

def handler(self, keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
messages = Messages()
if len(strs) == 0:
messages.append(Message.to_drop())
return messages
for s in strs:
messages.append(Message(str.encode(s)))
return messages
for s in strs:
messages.append(Message(str.encode(s)))
return messages


if __name__ == "__main__":
grpc_server = Mapper(handler=my_handler)
"""
This example shows how to use the Flatmap mapper.
We use a class as handler, but a function can be used as well.
"""
grpc_server = MapServer(Flatmap())
grpc_server.start()
2 changes: 1 addition & 1 deletion examples/map/flatmap/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec:
- name: flatmap
udf:
container:
image: "quay.io/numaio/numaflow-python/map-flatmap:v0.5.0"
image: "quay.io/numaio/numaflow-python/map-flatmap:v0.7.0"
env:
- name: PYTHONDEBUG
value: "true"
Expand Down
2 changes: 1 addition & 1 deletion examples/map/flatmap/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
pynumaflow = "~0.6.0"
pynumaflow = "~0.7.0"


[tool.poetry.dev-dependencies]
Expand Down
Loading
Loading