Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: including UTC time in Sonda logs #2926

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions apps/sonda/sonda.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import urllib.parse
import requests
import argparse
from datetime import datetime
from prometheus_client import Counter, Gauge, start_http_server

# Content topic where Sona messages are going to be sent
Expand All @@ -32,6 +33,12 @@
args = parser.parse_args()


# Logs message including current UTC time
def log_with_utc(message):
utc_time = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{utc_time} UTC] {message}")


# Sends Sonda message. Returns True if successful, False otherwise
def send_sonda_msg(rest_address, pubsub_topic, content_topic, timestamp):
message = "Hi, I'm Sonda"
Expand All @@ -47,22 +54,22 @@ def send_sonda_msg(rest_address, pubsub_topic, content_topic, timestamp):
url = f'{rest_address}/relay/v1/messages/{encoded_pubsub_topic}'
headers = {'content-type': 'application/json'}

print(f'Waku REST API: {url} PubSubTopic: {pubsub_topic}, ContentTopic: {content_topic}')
log_with_utc(f'Sending Sonda message via REST: {url} PubSubTopic: {pubsub_topic}, ContentTopic: {content_topic}, timestamp: {timestamp}')

try:
start_time = time.time()
response = requests.post(url, json=body, headers=headers, timeout=10)
elapsed_seconds = time.time() - start_time

print(f'Response from {rest_address}: status:{response.status_code} content:{response.text} [{elapsed_seconds:.4f} s.]')
log_with_utc(f'Response from {rest_address}: status:{response.status_code} content:{response.text} [{elapsed_seconds:.4f} s.]')

if response.status_code == 200:
successful_sonda_msgs.inc()
return True
else:
response.raise_for_status()
except requests.RequestException as e:
print(f'Error sending request: {e}')
log_with_utc(f'Error sending request: {e}')

failed_sonda_msgs.inc()
return False
Expand All @@ -74,7 +81,7 @@ def check_store_response(json_response, store_node, timestamp):
# Check for the store node status code
if json_response.get('statusCode') != 200:
error = f"{json_response.get('statusCode')} {json_response.get('statusDesc')}"
print(f'Failed performing store query {error}')
log_with_utc(f'Failed performing store query {error}')
failed_store_queries.labels(node=store_node, error=error).inc()
consecutive_successful_responses.labels(node=store_node).set(0)

Expand All @@ -83,7 +90,7 @@ def check_store_response(json_response, store_node, timestamp):
messages = json_response.get('messages')
# If there's no message in the response, increase counters and return
if not messages:
print("No messages in store response")
log_with_utc("No messages in store response")
empty_store_responses.labels(node=store_node).inc()
consecutive_successful_responses.labels(node=store_node).set(0)
return True
Expand All @@ -92,12 +99,12 @@ def check_store_response(json_response, store_node, timestamp):
for message in messages:
# If message field is missing in current message, continue
if not message.get("message"):
print("Could not retrieve message")
log_with_utc("Could not retrieve message")
continue

# If a message is found with the same timestamp as sonda message, increase counters and return
if timestamp == message.get('message').get('timestamp'):
print(f'Found Sonda message in store response node={store_node}')
log_with_utc(f'Found Sonda message in store response node={store_node}')
successful_store_queries.labels(node=store_node).inc()
consecutive_successful_responses.labels(node=store_node).inc()
return True
Expand All @@ -121,16 +128,16 @@ def send_store_query(rest_address, store_node, encoded_pubsub_topic, encoded_con
s_time = time.time()

try:
print(f'Sending store request to {store_node}')
log_with_utc(f'Sending store request to {store_node}')
response = requests.get(url, params=params)
except Exception as e:
print(f'Error sending request: {e}')
log_with_utc(f'Error sending request: {e}')
failed_store_queries.labels(node=store_node, error=str(e)).inc()
consecutive_successful_responses.labels(node=store_node).set(0)
return False

elapsed_seconds = time.time() - s_time
print(f'Response from {rest_address}: status:{response.status_code} [{elapsed_seconds:.4f} s.]')
log_with_utc(f'Response from {rest_address}: status:{response.status_code} [{elapsed_seconds:.4f} s.]')

if response.status_code != 200:
failed_store_queries.labels(node=store_node, error=f'{response.status_code} {response.content}').inc()
Expand All @@ -141,7 +148,7 @@ def send_store_query(rest_address, store_node, encoded_pubsub_topic, encoded_con
try:
json_response = response.json()
except Exception as e:
print(f'Error parsing response JSON: {e}')
log_with_utc(f'Error parsing response JSON: {e}')
failed_store_queries.labels(node=store_node, error="JSON parse error").inc()
consecutive_successful_responses.labels(node=store_node).set(0)
return False
Expand All @@ -155,7 +162,7 @@ def send_store_query(rest_address, store_node, encoded_pubsub_topic, encoded_con


def send_store_queries(rest_address, store_nodes, pubsub_topic, content_topic, timestamp):
print(f'Sending store queries. nodes = {store_nodes}')
log_with_utc(f'Sending store queries. nodes = {store_nodes} timestamp = {timestamp}')
encoded_pubsub_topic = urllib.parse.quote(pubsub_topic, safe='')
encoded_content_topic = urllib.parse.quote(content_topic, safe='')

Expand All @@ -164,12 +171,12 @@ def send_store_queries(rest_address, store_nodes, pubsub_topic, content_topic, t


def main():
print(f'Running Sonda with args={args}')
log_with_utc(f'Running Sonda with args={args}')

store_nodes = []
if args.store_nodes is not None:
store_nodes = [s.strip() for s in args.store_nodes.split(",")]
print(f'Store nodes to query: {store_nodes}')
log_with_utc(f'Store nodes to query: {store_nodes}')

# Start Prometheus HTTP server at port 8004
start_http_server(8004)
Expand All @@ -181,7 +188,7 @@ def main():
# Send Sonda message
res = send_sonda_msg(node_rest_address, args.pubsub_topic, SONDA_CONTENT_TOPIC, timestamp)

print(f'sleeping: {args.delay_seconds} seconds')
log_with_utc(f'sleeping: {args.delay_seconds} seconds')
time.sleep(args.delay_seconds)

# Only send store query if message was successfully published
Expand Down
Loading