Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limiter functionality #28

Merged
merged 3 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion api-server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from webrtc import setup_webrtc
import uuid
from utils.Debugger import Debugger
from utils.RateLimiter import RateLimiter

app = Flask(__name__)
CORS(app)
Expand All @@ -13,6 +14,9 @@
# In-memory storage for each session
session_storage = {}

# Central RateLimiter
rate_limiter = RateLimiter(max_chat_requests_per_user=5, max_users_per_meeting=7, rate_limit_time_window=5)

# Setup routes and sockets
@app.route('/api/create-meeting', methods=['POST'])
def create_meeting():
Expand All @@ -30,6 +34,7 @@ def create_meeting():
'users': {}, # {username: sid}
'chat_history': [] # [{sender: username, text: message}]
}

Debugger.log_message('INFO', f'User {username} created a new meeting', meeting_id)
return jsonify({'meeting_id': meeting_id})

Expand All @@ -43,6 +48,9 @@ def get_session(meeting_id):
def get_users(meeting_id):
if meeting_id not in session_storage:
return jsonify({'error': 'Meeting ID not found'}), 404
if len(session_storage[meeting_id]['users']) > 7:
return jsonify({'error': 'Meeting is full'}), 404

return jsonify(list(session_storage[meeting_id]['users'].keys()))

@socketio.on('join')
Expand All @@ -58,6 +66,9 @@ def handle_join(data):
session = session_storage[meeting_id]
session['users'][username] = request.sid

# Update rate limiter meta data
rate_limiter.updateMeetingCount(meeting_id, "join")

Debugger.log_message('INFO', f'User {username} joined the meeting', meeting_id)
emit('user_joined', {'username': username, 'meeting_id': meeting_id}, room=meeting_id)

Expand All @@ -71,12 +82,16 @@ def handle_disconnect():
if len(session['users']) == 0:
to_delete.append(meeting_id)
Debugger.log_message('INFO', f'User {request.sid} left the meeting', meeting_id)

# Update rate limiter meta data
rate_limiter.updateMeetingCount(meeting_id, "leave")

emit('user_left', {'meeting_id': meeting_id, 'username': username}, room=meeting_id)

for meeting_id in to_delete:
del session_storage[meeting_id]

setup_chat(app, socketio, session_storage, Debugger.log_message)
setup_chat(app, socketio, session_storage, Debugger.log_message, rate_limiter)
setup_webrtc(app, socketio, session_storage, Debugger.log_message)

if __name__ == '__main__':
Expand Down
13 changes: 11 additions & 2 deletions api-server/chat.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from flask import jsonify, request
from flask_socketio import emit
from utils.Debugger import Debugger

def setup_chat(app, socketio, session_storage, log_message):
def setup_chat(app, socketio, session_storage, log_message, rate_limiter):
@app.route('/api/chat_history/<meeting_id>', methods=['GET'])
def get_chat_history(meeting_id):
if meeting_id not in session_storage:
Expand All @@ -19,6 +20,14 @@ def handle_chat_message(data):
emit('error', {'message': 'Meeting ID not found'}, to=request.sid)
return

isRateLimited = rate_limiter.rateLimitChat(sender)
Debugger.log_message(Debugger.DEBUG, f"isRateLimited: {isRateLimited}")
if isRateLimited:
Debugger.log_message(Debugger.DEBUG, "Rate limited")
emit('chat_message', {'rate_limited': True}, to=request.sid)
return

Debugger.log_message(Debugger.DEBUG, "trace 1")
session_storage[meeting_id]['chat_history'].append({'sender': sender, 'text': message})
log_message('INFO', f'User {sender} sent: {message}', meeting_id)
emit('chat_message', {'sender': sender, 'text': message}, room=meeting_id)
emit('chat_message', {'sender': sender, 'text': message}, room=meeting_id)
4 changes: 4 additions & 0 deletions api-server/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
bidict==0.23.1
blinker==1.8.2
click==8.1.7
dnspython==2.6.1
eventlet==0.36.1
Flask==3.0.3
Flask-Cors==4.0.1
Flask-SocketIO==5.3.6
greenlet==3.0.3
h11==0.14.0
itsdangerous==2.2.0
Jinja2==3.1.4
MarkupSafe==2.1.5
python-engineio==4.9.1
python-socketio==5.11.3
simple-websocket==1.0.0
six==1.16.0
uuid==1.30
Werkzeug==3.0.4
wsproto==1.2.0
48 changes: 29 additions & 19 deletions api-server/utils/Debugger.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from flask import request

"""
Usage Guide:
------------
Debugger.<method_name>(<parameters>)

Examples:
- Debugger.log_message(Debugger.INFO, "User logged in")
- Debugger.log_transport()

Available methods:
- log_message(level: str, message: str, meeting: str = '')
- log_transport()
"""
class Debugger:
"""
Usage Guide:
------------
Debugger.<method_name>(<parameters>)

Examples:
- Debugger.log_message(Debugger.INFO, "User logged in")
- Debugger.log_transport()

Available methods:
- log_message(level: str, message: str, meeting: str = '')
- log_transport()
"""

# Define log levels and color codes
DEBUG = 'DEBUG'
INFO = 'INFO'
Expand All @@ -30,9 +31,18 @@ class Debugger:

@staticmethod
def log_message(level: str, message: str, meeting=''):
'''
Dev Function to standardize different types of logs
'''
"""
Logs a message with a specified level and optional meeting identifier.

This method standardizes log output across the application, applying
consistent formatting and color-coding based on the log level.

Parameters:
- level (str): The log level. Use Debugger.DEBUG, Debugger.INFO,
Debugger.WARNING, Debugger.ERROR, or Debugger.CRITICAL.
- message (str): The message to be logged.
- meeting (str, optional): An identifier for the meeting context.
"""
color_map = {
Debugger.DEBUG: Debugger.DEBUG_COLOR,
Debugger.INFO: Debugger.INFO_COLOR,
Expand All @@ -47,9 +57,9 @@ def log_message(level: str, message: str, meeting=''):

@staticmethod
def log_transport():
'''
"""
Dev Function to log the transport method used in the connections
'''
"""
transport = request.args.get('transport', 'unknown')
Debugger.log_message('DEBUG', f'Connection transport: {transport}')
Debugger.log_message(Debugger.DEBUG, f'Connection transport: {transport}')

63 changes: 63 additions & 0 deletions api-server/utils/RateLimiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from collections import defaultdict, deque
from .Debugger import Debugger
import time

class RateLimiter:
__user_chat_timestamps = defaultdict(deque) # Dict to log chat request timestamps per user for rate limiting --> {user_id: [timestamp1, timestamp2, ...]}
__meeting_user_count = defaultdict(int) # A dictionary to track the number of users in each meeting currently --> {meeting_id: user_count}

def __init__(self, max_chat_requests_per_user: int, max_users_per_meeting: int, rate_limit_time_window: int):
self.max_chat_requests_per_user = max_chat_requests_per_user # The maximum number of chat requests allowed per user within a specific time window
self.max_users_per_meeting = max_users_per_meeting # The maximum number of users allowed to join a meeting at once
self.rate_limit_time_window = rate_limit_time_window # The time frame within which the rate limiting is applied for chat requests (seconds)

def updateMeetingCount(self, meeting_id: str, action: str) -> None:
"""
Function to update in-memory data structure on head counts in meetings
"""
if action == "join":
self.__meeting_user_count[meeting_id] += 1
Debugger.log_message(Debugger.DEBUG, f"Meeting count: {self.__meeting_user_count[meeting_id]}")
elif action == "leave":
self.__meeting_user_count[meeting_id] -= 1
Debugger.log_message(Debugger.DEBUG, f"Meeting count: {self.__meeting_user_count[meeting_id]}")
else:
raise ValueError(f"Invalid action: {action}. Expected 'join' or 'leave'.")

def rateLimitChat(self, username: str) -> bool:
"""
Checks if a user has exceeded their chat rate limit.

This method:
1. Adds the current timestamp to the user's chat history.
2. Removes outdated timestamps outside the rate limit window.
3. Checks if the number of recent messages exceeds the allowed limit.

Args:
username (str): The username to check.

Returns:
bool: True if the user is within the rate limit, False otherwise.
"""
curr_time = time.time()
self.__user_chat_timestamps[username].append(curr_time)

Debugger.log_message(Debugger.DEBUG, f"User chat timestamps: {self.__user_chat_timestamps[username]}")
# Remove redundant time stamps
while curr_time - self.__user_chat_timestamps[username][0] > self.rate_limit_time_window:
self.__user_chat_timestamps[username].popleft()
Debugger.log_message(Debugger.DEBUG, f"User chat timestamps: {self.__user_chat_timestamps[username]}")

# Chat rate limit check
if len(self.__user_chat_timestamps[username]) > self.max_chat_requests_per_user:
return True

return False








8 changes: 8 additions & 0 deletions client/src/components/Chat.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ const Chat = ({ chatHandler, initialChatHistory }) => {
toast.error('Please enter a message before sending.');
return;
}

// Check if the user is rate-limited
if (chatHandler.rateLimited) {
toast.error('You are currently rate-limited and cannot send messages.');
return;
}

// Send the message if not rate-limited
chatHandler.sendMessage(message);
setChatHistory(prevHistory => [...prevHistory, { sender: chatHandler.username, text: message }]);
}, [chatHandler]);
Expand Down
31 changes: 27 additions & 4 deletions client/src/services/chatHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ class ChatHandler {
this.chatHistory = [];
this.setChatHistory = setChatHistory;
this.onError = onError;
this.rateLimited = false; // Initialize rate-limited flag
this.rateLimitTimeWindow = 3; // 3 second rate limit cool down
}

async initialize() {
this.setupSocketListeners();
const response = await fetch(`${apiUrl}/api/chat_history/${this.meeting_id}`)
if (!response.ok) {
this.onError('Failed to fetch chat history. Please try again.');
return;
}
this.chatHistory = await response.json();
Expand All @@ -26,14 +27,36 @@ class ChatHandler {
setupSocketListeners() {
this.socket.on('chat_message', (data) => {
console.log("WebSocket event received:", data);
this.chatHistory.push(data);
if (this.setChatHistory) {
this.setChatHistory([...this.chatHistory]); // update UI
if (data['rate_limited'] === true) {
console.log("Rate limited chat");
this.rateLimited = true;

// Refresh rate limiting flag callback
setTimeout(() => {
this.rateLimited = false;
console.log("Rate limit has been reset automatically after timeout.");
}, this.rateLimitTimeWindow * 1000);

} else {
this.chatHistory.push(data);
if (this.setChatHistory) {
this.setChatHistory([...this.chatHistory]); // update UI
}
}
});

// Listen for the rate limit reset event
this.socket.on('rate_limit_reset', () => {
console.log("Rate limit reset received");
this.rateLimited = false; // Reset the flag
});
}

sendMessage(text) {
if (this.rateLimited) {
console.log("You are rate limited");
return;
}
this.socket.emit('chat_message', { meeting_id: this.meeting_id, text: text, sender: this.username });
}

Expand Down
Loading