Skip to content

Commit

Permalink
Merge pull request #2 from OpenNyAI/k_branch
Browse files Browse the repository at this point in the history
rag-kafka
  • Loading branch information
kanak8278 authored Jan 9, 2024
2 parents f8d787e + eebb3b5 commit 18cba33
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.env
venv/
.venv/
kafkaconsumer.py
kafkaproducer.py
**/__pycache__
Empty file added api/__init__.py
Empty file.
45 changes: 43 additions & 2 deletions api/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,51 @@
from fastapi import FastAPI
import os
import json
from confluent_kafka import Producer
from pydantic import BaseModel

from dotenv import load_dotenv
load_dotenv()


username = os.getenv('USERNAME')
password = os.getenv('PRODUCERPASSWORD')
kafka_broker = os.getenv('KAFKA_BROKER')
print("====================================")
print(username)
print(password)
print(kafka_broker)
print("====================================")
app = FastAPI()

conf = {
'bootstrap.servers': kafka_broker, # replace with your EventHub namespace
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': password, # replace with your EventHub connection string
'client.id': 'python-example-producer'
}

# Create a Kafka producer with the above configuration
producer = Producer(conf)

class MessageRequest(BaseModel):
event_type: str
message: str

@app.post("/")
def receive_event(event_type: str, message: str):
def receive_event(data: MessageRequest):
message = json.dumps({"message" : data.message, "type" : data.event_type})
print(message)


# Send the message to the 'test' topic
producer.produce('topic1', message)

# Wait for any outstanding messages to be delivered and report delivery failures
producer.flush()
# write to Kafka queue
return 200


print('Started')
62 changes: 60 additions & 2 deletions flow/main.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,60 @@
def main():
print("I am in FLOW")
from confluent_kafka import Consumer, KafkaException
import json
import os
from dotenv import load_dotenv
load_dotenv()

username = os.getenv('USERNAME')
password = os.getenv('CONSUMERPASSWORD')
kafka_broker = os.getenv('KAFKA_BROKER')

print(username)
print(password)
print(kafka_broker)

conf = {
'bootstrap.servers': kafka_broker, # replace with your EventHub namespace
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': password, # replace with your EventHub connection string
'group.id': 'python-example-consumer',
'auto.offset.reset': 'earliest'
}

# Create a Kafka consumer with the above configuration
consumer = Consumer(conf)

# Subscribe to the 'test' topic
consumer.subscribe(['topic1'])

def listen_message():
try:
while True:
# Poll for a message
msg = consumer.poll(1.0)

# If a message is received
if msg is not None:
# If the message is not an error
if not msg.error():
# Parse the message as JSON
event = json.loads(msg.value().decode('utf-8'))
# Print the event type
return event
elif msg.error().code() != KafkaException._PARTITION_EOF:
print(msg.error())
else:
continue

except KeyboardInterrupt:
pass

finally:
# Close down consumer to commit final offsets.
print('Stopping consumer')
consumer.close()

if __name__ == '__main__':
print('Starting Listening')
listen_message()
82 changes: 80 additions & 2 deletions rag/main.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,80 @@
def main():
print("I am in RAG")
from confluent_kafka import Consumer, KafkaException, Producer
import json
import os
from dotenv import load_dotenv
load_dotenv()

username = os.getenv('USERNAME')
kafka_broker = os.getenv('KAFKA_BROKER')
producerpassword = os.getenv('PRODUCERPASSWORD')
consumerpassword = os.getenv('CONSUMERPASSWORD')

consumer_conf = {
'bootstrap.servers': kafka_broker, # replace with your EventHub namespace
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': consumerpassword, # replace with your EventHub connection string
'group.id': 'python-example-consumer',
'auto.offset.reset': 'earliest'
}

producer_conf = {
'bootstrap.servers': kafka_broker, # replace with your EventHub namespace
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': producerpassword, # replace with your EventHub connection string
# 'group.id': 'python-example-consumer',
# 'auto.offset.reset': 'earliest'
}

producer = Producer(producer_conf)

consumer = Consumer(consumer_conf)
consumer.subscribe(['topic3'])

def process_message(msg):
return {"data":msg, "message": "result from RAG", "type":"rag"}

def produce_message(msg):
producer.produce('topic1', msg)
producer.flush()


def listen_and_produce():
try:
while True:
# Poll for a message
msg = consumer.poll(1.0)

if msg is not None:
if not msg.error():
event = json.loads(msg.value().decode('utf-8'))
# Print the event type
print("Recieved Event:", event)

result = process_message(event)
print("Result:", result)

producer.produce('topic1', json.dumps(result))
producer.flush()

elif msg.error().code() != KafkaException._PARTITION_EOF:
print(msg.error())
else:
continue

except KeyboardInterrupt:
pass

finally:
# Close down consumer to commit final offsets.
print('Stopping RAG consumer')
consumer.close()

if __name__ == '__main__':
result = {"event":"rag", "message": "result from RAG", "data":"result"}
produce_message(json.dumps(result))
print('Starting RAG Listening')
listen_and_produce()

0 comments on commit 18cba33

Please sign in to comment.