From 011751bc3b99ec2001f73663db4c08349d864b11 Mon Sep 17 00:00:00 2001 From: Shrey Pandey Date: Tue, 9 Jan 2024 17:06:28 +0530 Subject: [PATCH 1/2] Initial release --- .gitignore | 3 +++ api/__init__.py | 0 api/main.py | 39 ++++++++++++++++++++++++++++++- flow/main.py | 62 +++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 .gitignore create mode 100644 api/__init__.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..09f14554 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.env +venv/ +__pycache__ \ No newline at end of file diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/main.py b/api/main.py index f6f5b79e..ca90a1c4 100644 --- a/api/main.py +++ b/api/main.py @@ -1,10 +1,47 @@ from fastapi import FastAPI +import os +import json +from confluent_kafka import Producer +from pydantic import BaseModel + from dotenv import load_dotenv load_dotenv() + +username = os.getenv('USERNAME') +password = os.getenv('PASSWORD') +kafka_broker = os.getenv('KAFKA_BROKER') app = FastAPI() +conf = { + 'bootstrap.servers': kafka_broker, # replace with your EventHub namespace + 'security.protocol': 'SASL_SSL', + 'sasl.mechanism': 'PLAIN', + 'sasl.username': username, + 'sasl.password': password, # replace with your EventHub connection string + 'client.id': 'python-example-producer' +} + +# Create a Kafka producer with the above configuration +producer = Producer(conf) + +class MessageRequest(BaseModel): + event_type: str + message: str + @app.post("/") -def receive_event(event_type: str, message: str): +def receive_event(data: MessageRequest): + message = json.dumps({"message" : data.message, "type" : data.event_type}) + print(message) + + + # Send the message to the 'test' topic + producer.produce('topic1', message) + + # Wait for any outstanding messages to be delivered and report delivery failures + producer.flush() # write to Kafka queue return 200 + + +print('Started') \ No newline at end of file diff --git a/flow/main.py b/flow/main.py index b30fdace..c36e904b 100644 --- a/flow/main.py +++ b/flow/main.py @@ -1,2 +1,60 @@ -def main(): - print("I am in FLOW") \ No newline at end of file +from confluent_kafka import Consumer, KafkaException +import json +import os +from dotenv import load_dotenv +load_dotenv() + +username = os.getenv('USERNAME') +password = os.getenv('PASSWORD') +kafka_broker = os.getenv('KAFKA_BROKER') + +print(username, password, kafka_broker) + +conf = { + 'bootstrap.servers': kafka_broker, # replace with your EventHub namespace + 'security.protocol': 'SASL_SSL', + 'sasl.mechanism': 'PLAIN', + 'sasl.username': username, + 'sasl.password': password, # replace with your EventHub connection string + 'group.id': 'python-example-consumer', + 'auto.offset.reset': 'earliest' +} + +# Create a Kafka consumer with the above configuration +consumer = Consumer(conf) + +# Subscribe to the 'test' topic +consumer.subscribe(['topic1']) + +def listen_message(): + try: + while True: + # Poll for a message + msg = consumer.poll(1.0) + + # If a message is received + if msg is not None: + # If the message is not an error + if not msg.error(): + # Parse the message as JSON + event = json.loads(msg.value().decode('utf-8')) + + # Print the event type + return {} + print('Event Type: ', event['event_type']) + else: + print(msg.error()) + # elif msg.error().code() != KafkaException._PARTITION_EOF: + # print(msg.error()) + else: + continue + + except KeyboardInterrupt: + pass + + finally: + # Close down consumer to commit final offsets. + consumer.close() + +print('Starting Listening') +listen_message() From eebb3b554210368498663aea73499e4b73122890 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 9 Jan 2024 18:29:40 +0530 Subject: [PATCH 2/2] implemented rag producer and consumer --- .gitignore | 5 +++- api/main.py | 8 +++-- flow/main.py | 20 ++++++------- rag/main.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 100 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 09f14554..5ef73df4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ .env venv/ -__pycache__ \ No newline at end of file +.venv/ +kafkaconsumer.py +kafkaproducer.py +**/__pycache__ \ No newline at end of file diff --git a/api/main.py b/api/main.py index ca90a1c4..5cae2fc1 100644 --- a/api/main.py +++ b/api/main.py @@ -8,9 +8,13 @@ load_dotenv() username = os.getenv('USERNAME') -password = os.getenv('PASSWORD') +password = os.getenv('PRODUCERPASSWORD') kafka_broker = os.getenv('KAFKA_BROKER') - +print("====================================") +print(username) +print(password) +print(kafka_broker) +print("====================================") app = FastAPI() conf = { diff --git a/flow/main.py b/flow/main.py index c36e904b..9eb4cdc3 100644 --- a/flow/main.py +++ b/flow/main.py @@ -5,10 +5,12 @@ load_dotenv() username = os.getenv('USERNAME') -password = os.getenv('PASSWORD') +password = os.getenv('CONSUMERPASSWORD') kafka_broker = os.getenv('KAFKA_BROKER') -print(username, password, kafka_broker) +print(username) +print(password) +print(kafka_broker) conf = { 'bootstrap.servers': kafka_broker, # replace with your EventHub namespace @@ -38,14 +40,10 @@ def listen_message(): if not msg.error(): # Parse the message as JSON event = json.loads(msg.value().decode('utf-8')) - # Print the event type - return {} - print('Event Type: ', event['event_type']) - else: + return event + elif msg.error().code() != KafkaException._PARTITION_EOF: print(msg.error()) - # elif msg.error().code() != KafkaException._PARTITION_EOF: - # print(msg.error()) else: continue @@ -54,7 +52,9 @@ def listen_message(): finally: # Close down consumer to commit final offsets. + print('Stopping consumer') consumer.close() -print('Starting Listening') -listen_message() +if __name__ == '__main__': + print('Starting Listening') + listen_message() diff --git a/rag/main.py b/rag/main.py index 9eda8b9c..6e8e5c5c 100644 --- a/rag/main.py +++ b/rag/main.py @@ -1,2 +1,80 @@ -def main(): - print("I am in RAG") \ No newline at end of file +from confluent_kafka import Consumer, KafkaException, Producer +import json +import os +from dotenv import load_dotenv +load_dotenv() + +username = os.getenv('USERNAME') +kafka_broker = os.getenv('KAFKA_BROKER') +producerpassword = os.getenv('PRODUCERPASSWORD') +consumerpassword = os.getenv('CONSUMERPASSWORD') + +consumer_conf = { + 'bootstrap.servers': kafka_broker, # replace with your EventHub namespace + 'security.protocol': 'SASL_SSL', + 'sasl.mechanism': 'PLAIN', + 'sasl.username': username, + 'sasl.password': consumerpassword, # replace with your EventHub connection string + 'group.id': 'python-example-consumer', + 'auto.offset.reset': 'earliest' +} + +producer_conf = { + 'bootstrap.servers': kafka_broker, # replace with your EventHub namespace + 'security.protocol': 'SASL_SSL', + 'sasl.mechanism': 'PLAIN', + 'sasl.username': username, + 'sasl.password': producerpassword, # replace with your EventHub connection string + # 'group.id': 'python-example-consumer', + # 'auto.offset.reset': 'earliest' +} + +producer = Producer(producer_conf) + +consumer = Consumer(consumer_conf) +consumer.subscribe(['topic3']) + +def process_message(msg): + return {"data":msg, "message": "result from RAG", "type":"rag"} + +def produce_message(msg): + producer.produce('topic1', msg) + producer.flush() + + +def listen_and_produce(): + try: + while True: + # Poll for a message + msg = consumer.poll(1.0) + + if msg is not None: + if not msg.error(): + event = json.loads(msg.value().decode('utf-8')) + # Print the event type + print("Recieved Event:", event) + + result = process_message(event) + print("Result:", result) + + producer.produce('topic1', json.dumps(result)) + producer.flush() + + elif msg.error().code() != KafkaException._PARTITION_EOF: + print(msg.error()) + else: + continue + + except KeyboardInterrupt: + pass + + finally: + # Close down consumer to commit final offsets. + print('Stopping RAG consumer') + consumer.close() + +if __name__ == '__main__': + result = {"event":"rag", "message": "result from RAG", "data":"result"} + produce_message(json.dumps(result)) + print('Starting RAG Listening') + listen_and_produce()