-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconsumer.py
29 lines (24 loc) · 896 Bytes
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import pulsar
from pulsar.schema import *
class Data(Record):
a = Integer()
b = String()
c = Float()
d = Array(String())
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe(
topic='data_identify',
subscription_name='read_data',
schema=AvroSchema(Data))
while True:
msg = consumer.receive()
ex = msg.value()
try:
# Print the received message type and the message itself
print("Received message type a='{}' b='{}' c='{}' d='{}'".format(type(ex.a), type(ex.b), type(ex.c), type(ex.d)))
print("Received message a='{}' b='{}' c='{}' d='{}'".format(ex.a, ex.b, ex.c, ex.d))
consumer.acknowledge(msg)
except:
# In the event the message failed to be processed
consumer.negative_acknowledge(msg)
client.close()