-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer_entrypoint.py
84 lines (70 loc) · 2.31 KB
/
consumer_entrypoint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import argparse
from pycon_kafka.helpers.kafka_helper import KafkaConsumer
from pycon_kafka.helpers.schema_registry_helper import SchemaRegistry
from dotenv import load_dotenv
load_dotenv()
SR_USR = os.getenv('SCHEMA_REGISTRY_USERNAME')
SR_PWD = os.getenv('SCHEMA_REGISTRY_PASSWORD')
EXAMPLE_TOPIC = 'test-topic'
EXAMPLE_SCHEMA_STR = """
{
"namespace": "schema_registry.examples.serialization.avro",
"name": "User",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "sex",
"type": "string"
},
{
"name": "phone_number",
"type": "string"
},
{
"name": "interest",
"type": [
"string",
"null"
],
"default": "null"
}
]
}
"""
def main(args):
schema_registry_client = SchemaRegistry(
schema_registry_endpoint=args.schema_registry_endpoint,
auth_info=f'{SR_USR}:{SR_PWD}'
)
corresponding_avro_deserializer = schema_registry_client.get_deserializer(
schema_str=EXAMPLE_SCHEMA_STR
)
consumer = KafkaConsumer(
kafka_brokers=args.kafka_brokers,
group_id=args.consumer_group_id,
avro_deserializer=corresponding_avro_deserializer
)
consumer.consume_messages(EXAMPLE_TOPIC)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Kafka Consumer with schema registy example")
parser.add_argument('--schema-registry-endpoint',
action="store",
type=str,
required=True,
help="schema registry endpoint")
parser.add_argument('--kafka-brokers',
action="store",
type=str,
required=True,
help="kafka brokers endpoint")
parser.add_argument('--consumer-group-id',
action="store",
type=str,
required=True,
help="consumer group id")
main(parser.parse_args())