Skip to content

Commit

Permalink
implemented rag producer and consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Jan 9, 2024
1 parent 011751b commit eebb3b5
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 15 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
.env
venv/
__pycache__
.venv/
kafkaconsumer.py
kafkaproducer.py
**/__pycache__
8 changes: 6 additions & 2 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
load_dotenv()

username = os.getenv('USERNAME')
password = os.getenv('PASSWORD')
password = os.getenv('PRODUCERPASSWORD')
kafka_broker = os.getenv('KAFKA_BROKER')

print("====================================")
print(username)
print(password)
print(kafka_broker)
print("====================================")
app = FastAPI()

conf = {
Expand Down
20 changes: 10 additions & 10 deletions flow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
load_dotenv()

username = os.getenv('USERNAME')
password = os.getenv('PASSWORD')
password = os.getenv('CONSUMERPASSWORD')
kafka_broker = os.getenv('KAFKA_BROKER')

print(username, password, kafka_broker)
print(username)
print(password)
print(kafka_broker)

conf = {
'bootstrap.servers': kafka_broker, # replace with your EventHub namespace
Expand Down Expand Up @@ -38,14 +40,10 @@ def listen_message():
if not msg.error():
# Parse the message as JSON
event = json.loads(msg.value().decode('utf-8'))

# Print the event type
return {}
print('Event Type: ', event['event_type'])
else:
return event
elif msg.error().code() != KafkaException._PARTITION_EOF:
print(msg.error())
# elif msg.error().code() != KafkaException._PARTITION_EOF:
# print(msg.error())
else:
continue

Expand All @@ -54,7 +52,9 @@ def listen_message():

finally:
# Close down consumer to commit final offsets.
print('Stopping consumer')
consumer.close()

print('Starting Listening')
listen_message()
if __name__ == '__main__':
print('Starting Listening')
listen_message()
82 changes: 80 additions & 2 deletions rag/main.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,80 @@
def main():
print("I am in RAG")
from confluent_kafka import Consumer, KafkaException, Producer
import json
import os
from dotenv import load_dotenv
load_dotenv()

username = os.getenv('USERNAME')
kafka_broker = os.getenv('KAFKA_BROKER')
producerpassword = os.getenv('PRODUCERPASSWORD')
consumerpassword = os.getenv('CONSUMERPASSWORD')

consumer_conf = {
'bootstrap.servers': kafka_broker, # replace with your EventHub namespace
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': consumerpassword, # replace with your EventHub connection string
'group.id': 'python-example-consumer',
'auto.offset.reset': 'earliest'
}

producer_conf = {
'bootstrap.servers': kafka_broker, # replace with your EventHub namespace
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': producerpassword, # replace with your EventHub connection string
# 'group.id': 'python-example-consumer',
# 'auto.offset.reset': 'earliest'
}

producer = Producer(producer_conf)

consumer = Consumer(consumer_conf)
consumer.subscribe(['topic3'])

def process_message(msg):
return {"data":msg, "message": "result from RAG", "type":"rag"}

def produce_message(msg):
producer.produce('topic1', msg)
producer.flush()


def listen_and_produce():
try:
while True:
# Poll for a message
msg = consumer.poll(1.0)

if msg is not None:
if not msg.error():
event = json.loads(msg.value().decode('utf-8'))
# Print the event type
print("Recieved Event:", event)

result = process_message(event)
print("Result:", result)

producer.produce('topic1', json.dumps(result))
producer.flush()

elif msg.error().code() != KafkaException._PARTITION_EOF:
print(msg.error())
else:
continue

except KeyboardInterrupt:
pass

finally:
# Close down consumer to commit final offsets.
print('Stopping RAG consumer')
consumer.close()

if __name__ == '__main__':
result = {"event":"rag", "message": "result from RAG", "data":"result"}
produce_message(json.dumps(result))
print('Starting RAG Listening')
listen_and_produce()

0 comments on commit eebb3b5

Please sign in to comment.