-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_client.py
62 lines (48 loc) · 1.4 KB
/
test_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# test_client.py
import socketio
import uuid
import time
# Initialize SocketIO client
sio = socketio.Client()
# Generate a unique session ID for this client
SESSION_ID = str(uuid.uuid4())
# Event handler for connection
@sio.event
def connect():
print("Successfully connected to the server.")
# Prepare the message data
message_data = {
"session_id": SESSION_ID,
"query": "Who was Robert Rogers?"
}
print(f"Sending message: {message_data['query']}")
sio.emit('chat message', message_data)
# Event handler for disconnection
@sio.event
def disconnect():
print("Disconnected from the server.")
# Event handler for receiving response chunks
@sio.on('response')
def on_response(data):
chunk = data.get('chunk', '')
print(f"Received chunk: {chunk}")
# Event handler for response completion
@sio.on('response_complete')
def on_response_complete():
print("Response streaming completed.")
sio.disconnect()
# Event handler for errors
@sio.on('error')
def on_error(data):
error_message = data.get('error', 'Unknown error')
print(f"Error from server: {error_message}")
def main():
try:
# Connect to the server
sio.connect('http://localhost:5000')
# Wait for the communication to complete
sio.wait()
except socketio.exceptions.ConnectionError as e:
print(f"Connection failed: {e}")
if __name__ == '__main__':
main()