Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add estimated_upgrade_time & upgrade_name, sort response #8

Merged
merged 10 commits into from
Sep 2, 2023
150 changes: 80 additions & 70 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import requests
import re
from datetime import datetime
from datetime import timedelta
from random import shuffle
# import logging
import threading
from flask import Flask, jsonify, request
from flask import Flask, jsonify, request, Response
from flask_caching import Cache
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from collections import OrderedDict
import os
import zipfile
import json
Expand Down Expand Up @@ -87,46 +89,6 @@ def download_and_extract_repo():

return repo_path

def fetch_directory_names(path):
headers = {'Accept': 'application/vnd.github.v3+json'}
url = f"{GITHUB_API_BASE_URL}/{path}"
response = requests.get(url, headers=headers)
response_data = response.json()

if isinstance(response_data, list):
dir_names = [item['name'] for item in response_data if item['type'] == 'dir' and not item['name'].startswith(('.', '_'))]
return dir_names
else:
return []

def read_chain_json_from_local(network, base_path):
"""Read the chain.json file for a given network from the local repository."""
chain_json_path = os.path.join(base_path, network, 'chain.json')
with open(chain_json_path, 'r') as f:
return json.load(f)

def process_data_for_network(network, full_data, network_type):
"""Process the data for a specific network."""
network_data = full_data.get(network, [])
file_names = [item['name'] for item in network_data if item['type'] == 'file']

return {
"type": network_type,
"network": network,
"files": file_names
}

def process_data_for_local_network(network, base_path, network_type):
"""Process the data for a specific network from local files."""
network_path = os.path.join(base_path, network)
file_names = [f for f in os.listdir(network_path) if os.path.isfile(os.path.join(network_path, f))]

return {
"type": network_type,
"network": network,
"files": file_names
}

def get_healthy_rpc_endpoints(rpc_endpoints):
with ThreadPoolExecutor(max_workers=num_workers) as executor:
healthy_rpc_endpoints = [rpc for rpc, is_healthy in executor.map(lambda rpc: (rpc, is_endpoint_healthy(rpc['address'])), rpc_endpoints) if is_healthy]
Expand Down Expand Up @@ -162,7 +124,7 @@ def check_rest_endpoint(rest_url):
"""Check the REST endpoint and return the application version and response time."""
start_time = datetime.now()
try:
response = requests.get(f"{rest_url}/node_info", timeout=3, verify=False)
response = requests.get(f"{rest_url}/node_info", timeout=1, verify=False)
response.raise_for_status()
elapsed_time = (datetime.now() - start_time).total_seconds()

Expand All @@ -175,12 +137,42 @@ def check_rest_endpoint(rest_url):
def get_latest_block_height_rpc(rpc_url):
"""Fetch the latest block height from the RPC endpoint."""
try:
response = requests.get(f"{rpc_url}/status", timeout=3)
response = requests.get(f"{rpc_url}/status", timeout=1)
response.raise_for_status()
data = response.json()
return int(data.get('result', {}).get('sync_info', {}).get('latest_block_height', 0))
except requests.RequestException as e:
return -1 # Return -1 to indicate an error

def get_block_time_rpc(rpc_url, height):
"""Fetch the block header time for a given block height from the RPC endpoint."""
try:
response = requests.get(f"{rpc_url}/block?height={height}", timeout=1)
response.raise_for_status()
data = response.json()
return data.get('result', {}).get('block', {}).get('header', {}).get('time', "")
except requests.RequestException as e:
return None

def parse_isoformat_string(date_string):
date_string = re.sub(r"(\.\d{6})\d+Z", r"\1Z", date_string)
date_string = date_string.replace("Z", "+00:00")
return datetime.fromisoformat(date_string)

def reorder_data(data):
ordered_data = OrderedDict([
("type", data.get("type")),
("network", data.get("network")),
("rpc_server", data.get("rpc_server")),
("latest_block_height", data.get("latest_block_height")),
("upgrade_found", data.get("upgrade_found")),
("upgrade_name", data.get("upgrade_name")),
("source", data.get("source")),
("upgrade_block_height", data.get("upgrade_block_height")),
("estimated_upgrade_time", data.get("estimated_upgrade_time")),
("version", data.get("version"))
])
return ordered_data

def fetch_all_endpoints(network_type, base_url, request_data):
"""Fetch all the REST and RPC endpoints for all networks and store in a map."""
Expand Down Expand Up @@ -246,8 +238,8 @@ def fetch_active_upgrade_proposals(rest_url):
height = 0

if version:
return version, height
return None, None
return plan_name, version, height
return None, None, None
except requests.RequestException as e:
print(f"Error received from server {rest_url}: {e}")
raise e
Expand All @@ -263,15 +255,16 @@ def fetch_current_upgrade_plan(rest_url):

plan = data.get("plan", {})
if plan:
version_match = SEMANTIC_VERSION_PATTERN.search(plan.get("name", ""))
plan_name = plan.get("name", "")
version_match = SEMANTIC_VERSION_PATTERN.search(plan_name)
if version_match:
version = version_match.group(1)
try:
height = int(plan.get("height", 0))
except ValueError:
height = 0
return version, height
return None, None
return plan_name, version, height
return None, None, None
except requests.RequestException as e:
print(f"Error received from server {rest_url}: {e}")
raise e
Expand Down Expand Up @@ -305,7 +298,6 @@ def fetch_data_for_network(network, network_type):
print(f"Found {len(rest_endpoints)} rest endpoints and {len(rpc_endpoints)} rpc endpoints for {network}")

# Prioritize RPC endpoints for fetching the latest block height
# Shuffle RPC endpoints to avoid calling the same one over and over
latest_block_height = -1
healthy_rpc_endpoints = get_healthy_rpc_endpoints(rpc_endpoints)
healthy_rest_endpoints = get_healthy_rest_endpoints(rest_endpoints)
Expand All @@ -322,9 +314,8 @@ def fetch_data_for_network(network, network_type):
break

# Check for active upgrade proposals
# Shuffle RPC endpoints to avoid calling the same one over and over
shuffle(healthy_rest_endpoints)
upgrade_block_height = None
upgrade_name = ""
upgrade_version = ""
source = ""

Expand All @@ -334,8 +325,8 @@ def fetch_data_for_network(network, network_type):
if current_endpoint in SERVER_BLACKLIST:
continue
try:
active_upgrade_version, active_upgrade_height = fetch_active_upgrade_proposals(current_endpoint)
current_upgrade_version, current_upgrade_height = fetch_current_upgrade_plan(current_endpoint)
active_upgrade_name, active_upgrade_version, active_upgrade_height = fetch_active_upgrade_proposals(current_endpoint)
current_upgrade_name, current_upgrade_version, current_upgrade_height = fetch_current_upgrade_plan(current_endpoint)
except:
if index + 1 < len(healthy_rest_endpoints):
print(f"Failed to query rest endpoints {current_endpoint}, trying next rest endpoint")
Expand All @@ -347,24 +338,45 @@ def fetch_data_for_network(network, network_type):
if active_upgrade_version and (active_upgrade_height is not None) and active_upgrade_height > latest_block_height:
upgrade_block_height = active_upgrade_height
upgrade_version = active_upgrade_version
upgrade_name = active_upgrade_name
source = "active_upgrade_proposals"
break

if current_upgrade_version and (current_upgrade_height is not None) and current_upgrade_height > latest_block_height:
upgrade_block_height = current_upgrade_height
upgrade_version = current_upgrade_version
upgrade_name = current_upgrade_name
source = "current_upgrade_plan"
break

# Calculate average block time
current_block_time = get_block_time_rpc(rpc_server_used, latest_block_height)
past_block_time = get_block_time_rpc(rpc_server_used, latest_block_height - 10000)
avg_block_time_seconds = None

if current_block_time and past_block_time:
current_block_datetime = parse_isoformat_string(current_block_time)
past_block_datetime = parse_isoformat_string(past_block_time)
avg_block_time_seconds = (current_block_datetime - past_block_datetime).total_seconds() / 10000

# Estimate the upgrade time
estimated_upgrade_time = None
if upgrade_block_height and avg_block_time_seconds:
estimated_seconds_until_upgrade = avg_block_time_seconds * (upgrade_block_height - latest_block_height)
estimated_upgrade_datetime = datetime.utcnow() + timedelta(seconds=estimated_seconds_until_upgrade)
estimated_upgrade_time = estimated_upgrade_datetime.isoformat().replace('+00:00', 'Z')

output_data = {
"type": network_type,
"network": network,
"upgrade_found": upgrade_version != "",
"type": network_type,
"rpc_server": rpc_server_used,
"latest_block_height": latest_block_height,
"upgrade_found": upgrade_version != "",
"upgrade_name": upgrade_name,
"source": source,
"upgrade_block_height": upgrade_block_height,
"version": upgrade_version,
"rpc_server": rpc_server_used,
"source": source
"estimated_upgrade_time": estimated_upgrade_time,
"version": upgrade_version
}
print(f"Completed fetch data for network {network}")
return output_data
Expand Down Expand Up @@ -418,11 +430,9 @@ def update_data():


def start_update_data_thread():
print("Starting the update_data thread...")
update_thread = threading.Thread(target=update_data)
update_thread.daemon = True
update_thread.start()
print("update_data thread started.")

@app.route('/healthz')
def health_check():
Expand Down Expand Up @@ -455,39 +465,39 @@ def fetch_network_data():
filtered_testnet_data = [data for data in testnet_data if data['network'] in request_data.get("TESTNETS", [])]
results = filtered_mainnet_data + filtered_testnet_data

# Sort the results by 'upgrade_found' in descending order (chain upgrades first)
sorted_results = sorted(results, key=lambda x: x['upgrade_found'], reverse=True)
reordered_results = [reorder_data(result) for result in sorted_results]
return Response(json.dumps(reordered_results, indent=2) + '\n', content_type="application/json")

return jsonify(sorted_results)
except Exception as e:
return jsonify({"error": str(e)}), 500

except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route('/mainnets')
@cache.cached(timeout=600) # Cache the result for 10 minutes
# @cache.cached(timeout=600) # Cache the result for 10 minutes
def get_mainnet_data():
results = cache.get('MAINNET_DATA')
if results is None:
return jsonify({"error": "Data not available"}), 500

# Filter out None values from results
results = [r for r in results if r is not None]

sorted_results = sorted(results, key=lambda x: x['upgrade_found'], reverse=True)
return jsonify(sorted_results)
reordered_results = [reorder_data(result) for result in sorted_results]
return Response(json.dumps(reordered_results) + '\n', content_type="application/json")

@app.route('/testnets')
@cache.cached(timeout=600) # Cache the result for 10 minutes
# @cache.cached(timeout=600) # Cache the result for 10 minutes
def get_testnet_data():
results = cache.get('TESTNET_DATA')
if results is None:
return jsonify({"error": "Data not available"}), 500

# Filter out None values from results
results = [r for r in results if r is not None]

sorted_results = sorted(results, key=lambda x: x['upgrade_found'], reverse=True)
return jsonify(sorted_results)
reordered_results = [reorder_data(result) for result in sorted_results]
return Response(json.dumps(reordered_results) + '\n', content_type="application/json")

if __name__ == '__main__':
app.debug = True
Expand Down
Loading