Skip to content

Commit

Permalink
Merge pull request #8 from clemensgg/clemensgg/add-upgrade-time
Browse files Browse the repository at this point in the history
Add `estimated_upgrade_time` & `upgrade_name`, sort response
  • Loading branch information
danbryan authored Sep 2, 2023
2 parents a6aa14d + c2f861a commit 43eea54
Showing 1 changed file with 80 additions and 70 deletions.
150 changes: 80 additions & 70 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import requests
import re
from datetime import datetime
from datetime import timedelta
from random import shuffle
# import logging
import threading
from flask import Flask, jsonify, request
from flask import Flask, jsonify, request, Response
from flask_caching import Cache
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from collections import OrderedDict
import os
import zipfile
import json
Expand Down Expand Up @@ -87,46 +89,6 @@ def download_and_extract_repo():

return repo_path

def fetch_directory_names(path):
headers = {'Accept': 'application/vnd.github.v3+json'}
url = f"{GITHUB_API_BASE_URL}/{path}"
response = requests.get(url, headers=headers)
response_data = response.json()

if isinstance(response_data, list):
dir_names = [item['name'] for item in response_data if item['type'] == 'dir' and not item['name'].startswith(('.', '_'))]
return dir_names
else:
return []

def read_chain_json_from_local(network, base_path):
"""Read the chain.json file for a given network from the local repository."""
chain_json_path = os.path.join(base_path, network, 'chain.json')
with open(chain_json_path, 'r') as f:
return json.load(f)

def process_data_for_network(network, full_data, network_type):
"""Process the data for a specific network."""
network_data = full_data.get(network, [])
file_names = [item['name'] for item in network_data if item['type'] == 'file']

return {
"type": network_type,
"network": network,
"files": file_names
}

def process_data_for_local_network(network, base_path, network_type):
"""Process the data for a specific network from local files."""
network_path = os.path.join(base_path, network)
file_names = [f for f in os.listdir(network_path) if os.path.isfile(os.path.join(network_path, f))]

return {
"type": network_type,
"network": network,
"files": file_names
}

def get_healthy_rpc_endpoints(rpc_endpoints):
with ThreadPoolExecutor(max_workers=num_workers) as executor:
healthy_rpc_endpoints = [rpc for rpc, is_healthy in executor.map(lambda rpc: (rpc, is_endpoint_healthy(rpc['address'])), rpc_endpoints) if is_healthy]
Expand Down Expand Up @@ -162,7 +124,7 @@ def check_rest_endpoint(rest_url):
"""Check the REST endpoint and return the application version and response time."""
start_time = datetime.now()
try:
response = requests.get(f"{rest_url}/node_info", timeout=3, verify=False)
response = requests.get(f"{rest_url}/node_info", timeout=1, verify=False)
response.raise_for_status()
elapsed_time = (datetime.now() - start_time).total_seconds()

Expand All @@ -175,12 +137,42 @@ def check_rest_endpoint(rest_url):
def get_latest_block_height_rpc(rpc_url):
"""Fetch the latest block height from the RPC endpoint."""
try:
response = requests.get(f"{rpc_url}/status", timeout=3)
response = requests.get(f"{rpc_url}/status", timeout=1)
response.raise_for_status()
data = response.json()
return int(data.get('result', {}).get('sync_info', {}).get('latest_block_height', 0))
except requests.RequestException as e:
return -1 # Return -1 to indicate an error

def get_block_time_rpc(rpc_url, height):
"""Fetch the block header time for a given block height from the RPC endpoint."""
try:
response = requests.get(f"{rpc_url}/block?height={height}", timeout=1)
response.raise_for_status()
data = response.json()
return data.get('result', {}).get('block', {}).get('header', {}).get('time', "")
except requests.RequestException as e:
return None

def parse_isoformat_string(date_string):
date_string = re.sub(r"(\.\d{6})\d+Z", r"\1Z", date_string)
date_string = date_string.replace("Z", "+00:00")
return datetime.fromisoformat(date_string)

def reorder_data(data):
ordered_data = OrderedDict([
("type", data.get("type")),
("network", data.get("network")),
("rpc_server", data.get("rpc_server")),
("latest_block_height", data.get("latest_block_height")),
("upgrade_found", data.get("upgrade_found")),
("upgrade_name", data.get("upgrade_name")),
("source", data.get("source")),
("upgrade_block_height", data.get("upgrade_block_height")),
("estimated_upgrade_time", data.get("estimated_upgrade_time")),
("version", data.get("version"))
])
return ordered_data

def fetch_all_endpoints(network_type, base_url, request_data):
"""Fetch all the REST and RPC endpoints for all networks and store in a map."""
Expand Down Expand Up @@ -246,8 +238,8 @@ def fetch_active_upgrade_proposals(rest_url):
height = 0

if version:
return version, height
return None, None
return plan_name, version, height
return None, None, None
except requests.RequestException as e:
print(f"Error received from server {rest_url}: {e}")
raise e
Expand All @@ -263,15 +255,16 @@ def fetch_current_upgrade_plan(rest_url):

plan = data.get("plan", {})
if plan:
version_match = SEMANTIC_VERSION_PATTERN.search(plan.get("name", ""))
plan_name = plan.get("name", "")
version_match = SEMANTIC_VERSION_PATTERN.search(plan_name)
if version_match:
version = version_match.group(1)
try:
height = int(plan.get("height", 0))
except ValueError:
height = 0
return version, height
return None, None
return plan_name, version, height
return None, None, None
except requests.RequestException as e:
print(f"Error received from server {rest_url}: {e}")
raise e
Expand Down Expand Up @@ -305,7 +298,6 @@ def fetch_data_for_network(network, network_type):
print(f"Found {len(rest_endpoints)} rest endpoints and {len(rpc_endpoints)} rpc endpoints for {network}")

# Prioritize RPC endpoints for fetching the latest block height
# Shuffle RPC endpoints to avoid calling the same one over and over
latest_block_height = -1
healthy_rpc_endpoints = get_healthy_rpc_endpoints(rpc_endpoints)
healthy_rest_endpoints = get_healthy_rest_endpoints(rest_endpoints)
Expand All @@ -322,9 +314,8 @@ def fetch_data_for_network(network, network_type):
break

# Check for active upgrade proposals
# Shuffle RPC endpoints to avoid calling the same one over and over
shuffle(healthy_rest_endpoints)
upgrade_block_height = None
upgrade_name = ""
upgrade_version = ""
source = ""

Expand All @@ -334,8 +325,8 @@ def fetch_data_for_network(network, network_type):
if current_endpoint in SERVER_BLACKLIST:
continue
try:
active_upgrade_version, active_upgrade_height = fetch_active_upgrade_proposals(current_endpoint)
current_upgrade_version, current_upgrade_height = fetch_current_upgrade_plan(current_endpoint)
active_upgrade_name, active_upgrade_version, active_upgrade_height = fetch_active_upgrade_proposals(current_endpoint)
current_upgrade_name, current_upgrade_version, current_upgrade_height = fetch_current_upgrade_plan(current_endpoint)
except:
if index + 1 < len(healthy_rest_endpoints):
print(f"Failed to query rest endpoints {current_endpoint}, trying next rest endpoint")
Expand All @@ -347,24 +338,45 @@ def fetch_data_for_network(network, network_type):
if active_upgrade_version and (active_upgrade_height is not None) and active_upgrade_height > latest_block_height:
upgrade_block_height = active_upgrade_height
upgrade_version = active_upgrade_version
upgrade_name = active_upgrade_name
source = "active_upgrade_proposals"
break

if current_upgrade_version and (current_upgrade_height is not None) and current_upgrade_height > latest_block_height:
upgrade_block_height = current_upgrade_height
upgrade_version = current_upgrade_version
upgrade_name = current_upgrade_name
source = "current_upgrade_plan"
break

# Calculate average block time
current_block_time = get_block_time_rpc(rpc_server_used, latest_block_height)
past_block_time = get_block_time_rpc(rpc_server_used, latest_block_height - 10000)
avg_block_time_seconds = None

if current_block_time and past_block_time:
current_block_datetime = parse_isoformat_string(current_block_time)
past_block_datetime = parse_isoformat_string(past_block_time)
avg_block_time_seconds = (current_block_datetime - past_block_datetime).total_seconds() / 10000

# Estimate the upgrade time
estimated_upgrade_time = None
if upgrade_block_height and avg_block_time_seconds:
estimated_seconds_until_upgrade = avg_block_time_seconds * (upgrade_block_height - latest_block_height)
estimated_upgrade_datetime = datetime.utcnow() + timedelta(seconds=estimated_seconds_until_upgrade)
estimated_upgrade_time = estimated_upgrade_datetime.isoformat().replace('+00:00', 'Z')

output_data = {
"type": network_type,
"network": network,
"upgrade_found": upgrade_version != "",
"type": network_type,
"rpc_server": rpc_server_used,
"latest_block_height": latest_block_height,
"upgrade_found": upgrade_version != "",
"upgrade_name": upgrade_name,
"source": source,
"upgrade_block_height": upgrade_block_height,
"version": upgrade_version,
"rpc_server": rpc_server_used,
"source": source
"estimated_upgrade_time": estimated_upgrade_time,
"version": upgrade_version
}
print(f"Completed fetch data for network {network}")
return output_data
Expand Down Expand Up @@ -418,11 +430,9 @@ def update_data():


def start_update_data_thread():
print("Starting the update_data thread...")
update_thread = threading.Thread(target=update_data)
update_thread.daemon = True
update_thread.start()
print("update_data thread started.")

@app.route('/healthz')
def health_check():
Expand Down Expand Up @@ -455,39 +465,39 @@ def fetch_network_data():
filtered_testnet_data = [data for data in testnet_data if data['network'] in request_data.get("TESTNETS", [])]
results = filtered_mainnet_data + filtered_testnet_data

# Sort the results by 'upgrade_found' in descending order (chain upgrades first)
sorted_results = sorted(results, key=lambda x: x['upgrade_found'], reverse=True)
reordered_results = [reorder_data(result) for result in sorted_results]
return Response(json.dumps(reordered_results, indent=2) + '\n', content_type="application/json")

return jsonify(sorted_results)
except Exception as e:
return jsonify({"error": str(e)}), 500

except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route('/mainnets')
@cache.cached(timeout=600) # Cache the result for 10 minutes
# @cache.cached(timeout=600) # Cache the result for 10 minutes
def get_mainnet_data():
results = cache.get('MAINNET_DATA')
if results is None:
return jsonify({"error": "Data not available"}), 500

# Filter out None values from results
results = [r for r in results if r is not None]

sorted_results = sorted(results, key=lambda x: x['upgrade_found'], reverse=True)
return jsonify(sorted_results)
reordered_results = [reorder_data(result) for result in sorted_results]
return Response(json.dumps(reordered_results) + '\n', content_type="application/json")

@app.route('/testnets')
@cache.cached(timeout=600) # Cache the result for 10 minutes
# @cache.cached(timeout=600) # Cache the result for 10 minutes
def get_testnet_data():
results = cache.get('TESTNET_DATA')
if results is None:
return jsonify({"error": "Data not available"}), 500

# Filter out None values from results
results = [r for r in results if r is not None]

sorted_results = sorted(results, key=lambda x: x['upgrade_found'], reverse=True)
return jsonify(sorted_results)
reordered_results = [reorder_data(result) for result in sorted_results]
return Response(json.dumps(reordered_results) + '\n', content_type="application/json")

if __name__ == '__main__':
app.debug = True
Expand Down

0 comments on commit 43eea54

Please sign in to comment.