-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdemo.py
165 lines (127 loc) · 4.87 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# mypy: disable-error-code="no-untyped-def,arg-type"
import argparse
import logging
import random
import secrets
import time
import multiprocessing as mp
from typing import Dict
from mpubsub.net import PubSub
from mpubsub.broker import Broker
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname).1s %(processName)s: %(message)s')
logger = logging.getLogger(__name__)
# Now let's start a few publishers. Each one has its own PubSub
# object, and a Bridge connected to the broker.
def publisher(address, delay_min, delay_max, n):
logger.info('Starting up')
pubsub = PubSub(address)
pubsub.connect()
stop = False
def panic_callback(topic):
nonlocal stop
logger.critical('PANIC received from %s', topic[1])
stop = True
pubsub.add_subscriber('PANIC', panic_callback)
msg = 0
while not stop:
msg += 1
logger.info('Publishing: %r', msg)
pubsub.publish(('demo', f'PUB{n}'), msg=msg)
if delay_max:
time.sleep(delay_min + (random.random() * delay_max))
pubsub.disconnect()
def subscriber(address, n):
logger.info('Starting up')
pubsub = PubSub(address)
msg_sequence: Dict[str, int] = {}
stop = False
def panic_callback(topic):
nonlocal stop
logger.critical('PANIC received from %s', topic[1])
stop = True
def message_callback(topic, msg):
publisher = topic[1]
logger.info('Got %r from %s', msg, publisher)
if publisher.startswith('PUB'):
expected = msg_sequence.get(publisher, 1)
if msg != expected:
logger.critical('Expecting message %d from %r, got %r',
expected, publisher, msg)
pubsub.publish(('PANIC', f'sub{n}'))
msg_sequence[publisher] = msg + 1
# Let the subscriber publish something at 5% probability.
if random.random() < 0.05:
msg = secrets.token_hex(4)
logger.info("I've decided to publish %r", msg)
pubsub.publish(('demo', f'sub{n}'), msg=msg)
# Also publish something locally at 5% probability.
if random.random() < 0.05:
logger.info("I'm gonna publish a local message")
pubsub.publish(('demo', f'sub{n}', '*local'),
msg='This is a local message')
pubsub.add_subscriber('demo', message_callback)
pubsub.add_subscriber('PANIC', panic_callback)
pubsub.connect()
while not stop:
pubsub.poll(None)
pubsub.disconnect()
def main() -> None:
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--delay-min',
type=float,
default=5,
help='minimum random delay')
parser.add_argument('--delay-max',
type=float,
default=10,
help='maximum random delay; pass 0 for no delay')
parser.add_argument('-p', '--publishers',
type=int,
default=3,
help='publishers to spawn')
parser.add_argument('-s', '--subscribers',
type=int,
default=3,
help='subscribers to spawn')
parser.add_argument('-w', '--wait',
type=float,
default=0.2,
help=('seconds to wait per subscriber before '
'starting publishers'))
args = parser.parse_args()
logger.info('Starting demo with %d publishers and %d subscribers',
args.publishers, args.subscribers)
mp.set_start_method('spawn')
processes = []
# Get the broker object.
broker = Broker(None)
# Grab its address.
address = broker.address
# Start the broker in a separate process.
broker_process = mp.Process(target=broker.start, name='broker')
broker_process.start()
processes.append(broker_process)
# Start subscribers.
for n in range(args.subscribers):
proc = mp.Process(target=subscriber, name=f'sub{n}', args=(address, n))
proc.start()
processes.append(proc)
# Give some time for the subscribers to connect before the
# publishers, otherwise they might miss messages and fail message
# tracking.
logger.info('Waiting for all subscribers to start')
time.sleep(args.subscribers * args.wait)
# Start publishers.
for n in range(args.publishers):
proc = mp.Process(target=publisher, name=f'PUB{n}',
args=(address, min(args.delay_min, args.delay_max),
args.delay_max, n))
proc.start()
processes.append(proc)
for process in processes:
process.join()
if __name__ == '__main__':
main()