-
Notifications
You must be signed in to change notification settings - Fork 18
/
main.py
75 lines (64 loc) · 2.76 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import json
from yuptoo.lib.config import KAFKA_AUTO_COMMIT, ANNOUNCE_TOPIC, METRICS_PORT, KAFKA_BROKER, TRACKER_TOPIC
from yuptoo.lib.logger import initialize_logging, threadctx
import logging
from yuptoo.lib.exceptions import QPCKafkaMsgException, FailExtractException
from yuptoo.lib.metrics import kafka_failures, report_processing_exceptions, extract_report_slices_failures
from yuptoo.validators.qpc_message_validator import validate_qpc_message
from yuptoo.processor.report_processor import process_report
from yuptoo.lib import consume, produce
from prometheus_client import start_http_server
from yuptoo.processor.utils import tracker_message
initialize_logging()
LOG = logging.getLogger(__name__)
# Create cacert for kafka managed kafka config.
if KAFKA_BROKER and KAFKA_BROKER.cacert:
with open('/tmp/cacert', 'w') as f:
f.write(KAFKA_BROKER.cacert)
def set_extra_log_data(request_obj):
threadctx.request_id = request_obj['request_id']
threadctx.account = request_obj['account']
threadctx.org_id = request_obj['org_id']
consumer = consume.init_consumer()
producer = produce.init_producer()
LOG.info(f"Started listening on kafka topic - {ANNOUNCE_TOPIC}.")
start_http_server(METRICS_PORT)
LOG.info("Started Yuptoo Prometheus Server.")
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
LOG.error(f"Kafka error occured : {msg.error()}.")
continue
try:
service = dict(msg.headers() or []).get('service')
if service:
service = service.decode("utf-8")
if service == 'qpc':
topic = msg.topic()
msg = json.loads(msg.value().decode("utf-8"))
msg['topic'] = topic
request_obj = validate_qpc_message(msg)
produce.send_message(
TRACKER_TOPIC,
tracker_message(request_obj, "received", "Payload received by yuptoo")
)
set_extra_log_data(request_obj)
consumer.commit()
process_report(msg, request_obj)
except json.decoder.JSONDecodeError:
LOG.error(f"Unable to decode kafka message: {msg.value()}")
except QPCKafkaMsgException as message_error:
kafka_failures.inc()
LOG.error(f"Error processing Kafka message. Message: {msg}, Error: {message_error}")
except FailExtractException as err:
extract_report_slices_failures.inc()
LOG.error(f"Error Extracting report. Message: {msg}, Error: {err}")
except Exception as err:
report_processing_exceptions.inc()
LOG.error(f"An error occurred during message processing: {repr(err)}")
finally:
if not KAFKA_AUTO_COMMIT:
consumer.commit()
producer.flush()