-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnode.py
56 lines (36 loc) · 1.45 KB
/
node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
from .communication.pubsub import RabbitMQProducer, RabbitMQConsumer
class TEACHINGNode(object):
def __init__(self, produce, consume):
self._id = os.environ['SERVICE_NAME']
self._mq_params = {
'user': os.environ['RABBITMQ_USER'],
'password': os.environ['RABBITMQ_PASSWORD'],
'host': os.environ['RABBITMQ_HOST'],
'port': os.environ['RABBITMQ_PORT']
}
self._produce = produce
self._producer = None
self._consume = consume
if self._consume:
it = os.environ['TOPICS']
self._topics = it.split(',') if ',' in it else [it]
self._consumer = None
self._build()
def _build(self):
print("Building the TEACHING Node...")
if self._produce:
self._producer = RabbitMQProducer(self._mq_params)
if self._consume:
self._consumer = RabbitMQConsumer(self._mq_params, self._topics)
print("Done!")
def __call__(self, service_fn):
def service_pipeline(*args):
obj = args[0]
if self._consume and not self._produce:
service_fn(obj, self._consumer())
if not self._consume and self._produce:
self._producer(service_fn(obj))
if self._consume and self._produce:
self._producer(service_fn(obj, self._consumer()))
return service_pipeline