Skip to content

Commit

Permalink
including UTC time in logs and logging timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer committed Jul 23, 2024
1 parent a29eca7 commit bf9512b
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions apps/sonda/sonda.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import urllib.parse
import requests
import argparse
from datetime import datetime
from prometheus_client import Counter, Gauge, start_http_server

# Content topic where Sona messages are going to be sent
Expand All @@ -32,6 +33,12 @@
args = parser.parse_args()


# Logs message including current UTC time
def log_with_utc(message):
utc_time = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{utc_time} UTC] {message}")


# Sends Sonda message. Returns True if successful, False otherwise
def send_sonda_msg(rest_address, pubsub_topic, content_topic, timestamp):
message = "Hi, I'm Sonda"
Expand All @@ -47,22 +54,22 @@ def send_sonda_msg(rest_address, pubsub_topic, content_topic, timestamp):
url = f'{rest_address}/relay/v1/messages/{encoded_pubsub_topic}'
headers = {'content-type': 'application/json'}

print(f'Waku REST API: {url} PubSubTopic: {pubsub_topic}, ContentTopic: {content_topic}')
log_with_utc(f'Sending Sonda message via REST: {url} PubSubTopic: {pubsub_topic}, ContentTopic: {content_topic}, timestamp: {timestamp}')

try:
start_time = time.time()
response = requests.post(url, json=body, headers=headers, timeout=10)
elapsed_seconds = time.time() - start_time

print(f'Response from {rest_address}: status:{response.status_code} content:{response.text} [{elapsed_seconds:.4f} s.]')
log_with_utc(f'Response from {rest_address}: status:{response.status_code} content:{response.text} [{elapsed_seconds:.4f} s.]')

if response.status_code == 200:
successful_sonda_msgs.inc()
return True
else:
response.raise_for_status()
except requests.RequestException as e:
print(f'Error sending request: {e}')
log_with_utc(f'Error sending request: {e}')

failed_sonda_msgs.inc()
return False
Expand All @@ -74,7 +81,7 @@ def check_store_response(json_response, store_node, timestamp):
# Check for the store node status code
if json_response.get('statusCode') != 200:
error = f"{json_response.get('statusCode')} {json_response.get('statusDesc')}"
print(f'Failed performing store query {error}')
log_with_utc(f'Failed performing store query {error}')
failed_store_queries.labels(node=store_node, error=error).inc()
consecutive_successful_responses.labels(node=store_node).set(0)

Expand All @@ -83,7 +90,7 @@ def check_store_response(json_response, store_node, timestamp):
messages = json_response.get('messages')
# If there's no message in the response, increase counters and return
if not messages:
print("No messages in store response")
log_with_utc("No messages in store response")
empty_store_responses.labels(node=store_node).inc()
consecutive_successful_responses.labels(node=store_node).set(0)
return True
Expand All @@ -92,12 +99,12 @@ def check_store_response(json_response, store_node, timestamp):
for message in messages:
# If message field is missing in current message, continue
if not message.get("message"):
print("Could not retrieve message")
log_with_utc("Could not retrieve message")
continue

# If a message is found with the same timestamp as sonda message, increase counters and return
if timestamp == message.get('message').get('timestamp'):
print(f'Found Sonda message in store response node={store_node}')
log_with_utc(f'Found Sonda message in store response node={store_node}')
successful_store_queries.labels(node=store_node).inc()
consecutive_successful_responses.labels(node=store_node).inc()
return True
Expand All @@ -121,16 +128,16 @@ def send_store_query(rest_address, store_node, encoded_pubsub_topic, encoded_con
s_time = time.time()

try:
print(f'Sending store request to {store_node}')
log_with_utc(f'Sending store request to {store_node}')
response = requests.get(url, params=params)
except Exception as e:
print(f'Error sending request: {e}')
log_with_utc(f'Error sending request: {e}')
failed_store_queries.labels(node=store_node, error=str(e)).inc()
consecutive_successful_responses.labels(node=store_node).set(0)
return False

elapsed_seconds = time.time() - s_time
print(f'Response from {rest_address}: status:{response.status_code} [{elapsed_seconds:.4f} s.]')
log_with_utc(f'Response from {rest_address}: status:{response.status_code} [{elapsed_seconds:.4f} s.]')

if response.status_code != 200:
failed_store_queries.labels(node=store_node, error=f'{response.status_code} {response.content}').inc()
Expand All @@ -141,7 +148,7 @@ def send_store_query(rest_address, store_node, encoded_pubsub_topic, encoded_con
try:
json_response = response.json()
except Exception as e:
print(f'Error parsing response JSON: {e}')
log_with_utc(f'Error parsing response JSON: {e}')
failed_store_queries.labels(node=store_node, error="JSON parse error").inc()
consecutive_successful_responses.labels(node=store_node).set(0)
return False
Expand All @@ -155,7 +162,7 @@ def send_store_query(rest_address, store_node, encoded_pubsub_topic, encoded_con


def send_store_queries(rest_address, store_nodes, pubsub_topic, content_topic, timestamp):
print(f'Sending store queries. nodes = {store_nodes}')
log_with_utc(f'Sending store queries. nodes = {store_nodes} timestamp = {timestamp}')
encoded_pubsub_topic = urllib.parse.quote(pubsub_topic, safe='')
encoded_content_topic = urllib.parse.quote(content_topic, safe='')

Expand All @@ -164,12 +171,12 @@ def send_store_queries(rest_address, store_nodes, pubsub_topic, content_topic, t


def main():
print(f'Running Sonda with args={args}')
log_with_utc(f'Running Sonda with args={args}')

store_nodes = []
if args.store_nodes is not None:
store_nodes = [s.strip() for s in args.store_nodes.split(",")]
print(f'Store nodes to query: {store_nodes}')
log_with_utc(f'Store nodes to query: {store_nodes}')

# Start Prometheus HTTP server at port 8004
start_http_server(8004)
Expand All @@ -181,7 +188,7 @@ def main():
# Send Sonda message
res = send_sonda_msg(node_rest_address, args.pubsub_topic, SONDA_CONTENT_TOPIC, timestamp)

print(f'sleeping: {args.delay_seconds} seconds')
log_with_utc(f'sleeping: {args.delay_seconds} seconds')
time.sleep(args.delay_seconds)

# Only send store query if message was successfully published
Expand Down

0 comments on commit bf9512b

Please sign in to comment.