forked from numaproj/numaflow-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.py
30 lines (25 loc) · 896 Bytes
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pynumaflow.mapper import Messages, Message, Datum, MapServer, Mapper
class Flatmap(Mapper):
"""
This is a class that inherits from the Mapper class.
It implements the handler method that is called for each datum.
"""
def handler(self, keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
messages = Messages()
if len(strs) == 0:
messages.append(Message.to_drop())
return messages
for s in strs:
messages.append(Message(str.encode(s)))
return messages
if __name__ == "__main__":
"""
This example shows how to use the Flatmap mapper.
We use a class as handler, but a function can be used as well.
"""
grpc_server = MapServer(Flatmap())
grpc_server.start()