-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
93 lines (68 loc) · 2.55 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from threading import Thread
import logging
from confluent_kafka import Consumer, KafkaError, Producer
from flask import Flask, json, jsonify, render_template, request
from flask_socketio import SocketIO
app = Flask(__name__)
socketio = SocketIO(app)
logging.basicConfig(level=logging.INFO)
@socketio.on("connect")
def handle_connect():
logging.info("Client Connected")
consumer_config = {
"bootstrap.servers": "localhost:9092",
"group.id": "my_consumer_group",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_config)
consumer.subscribe(["received_messages"])
# Kafka consumer loop
def kafka_consumer():
while True:
msg = consumer.poll(10000)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logging.info(msg.error())
break
# Log received message
kafka_message = msg.value().decode("utf-8")
# Parse the JSON string into a Python dictionary
kafka_message_dict = json.loads(kafka_message)
logging.info(f"Received Kafka message:{kafka_message_dict}")
# Emit message to WebSocket clients
socketio.emit("new_message", kafka_message_dict)
# Start Kafka consumer in a separate thread
kafka_thread = Thread(target=kafka_consumer)
kafka_thread.start()
@app.route("/")
def index():
return render_template("index.html")
if __name__ == "__main__":
socketio.run(app, port=8000, debug=True)
# # Kafka producer setup
# producer_config = {'bootstrap.servers': 'localhost:9092'}
# producer = Producer(producer_config)
# # Route to send a new message
# @app.route('/api/send_message', methods=['POST'])
# def send_message():
# try:
# data = request.json
# username = data.get('username')
# if username:
# # Publish the message to the 'received_messages' topic
# kafka_message = {
# 'username': username
# }
# producer.produce('received_messages', value=json.dumps(kafka_message))
# producer.flush()
# # socketio.emit('new_message', kafka_message)
# return jsonify({'status': 'success', 'message': 'Message sent successfully!'})
# else:
# return jsonify({'status': 'error', 'message': 'Invalid request format'}), 400
# except Exception as e:
# logging.info(f"Error sending message: {e}")
# return jsonify({'status': 'error', 'message': 'Internal server error'}), 500