-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsample1.py
67 lines (50 loc) · 1.67 KB
/
sample1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""Minimal example of NATS micro usage with decorators."""
import asyncio
import signal
import os
import sys
from pydantic import BaseModel
from nats.aio.client import Client
from hazelnats import ClientBuilder, service, endpoint
from micro import Request
async def main(nats_server):
# Define an event to signal when to quit
quit_event = asyncio.Event()
# Attach signal handler to the event loop
loop = asyncio.get_event_loop()
for sig in (signal.Signals.SIGINT, signal.Signals.SIGTERM):
loop.add_signal_handler(sig, lambda *_: quit_event.set())
await ClientBuilder(nats_server).run(quit_event)
## Some sample micro services
## There will be one singleton instance of this serivce created, as for now
@service(name="sample")
class SampleService:
def __init__(self):
self._counter = 0
# request, response pattern, default behaviour
@endpoint
async def reply_one(self, req: Request):
data = req.data()
await req.respond(data)
# reply will be handled automatically
@endpoint
async def reply_two(self, req: Request):
return req.data()
# automatic mapping of arguments from request payload, same for result handling
@endpoint
async def say_hello(self, s: str):
return f"Hello {s}!"
@endpoint
async def counter(self):
self._counter += 1
return { "result": self._counter}
@endpoint
async def get_pid(self):
return { 'pid': os.getpid()}
@service(name="calc")
class CalcService:
@endpoint(name = "add")
async def add(self, a, b):
return { 'result': a + b }
if __name__ == "__main__":
asyncio.run(main(sys.argv[1:]))