Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid dlp validator crashes eg if rpc goes down #23

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 77 additions & 36 deletions vana/chain_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
from vana.utils.misc import get_block_explorer_url
from vana.utils.web3 import decode_custom_error

import threading
from utils.circuit_breaker import CircuitBreaker

logger = native_logging.getLogger("opendata")

Balance = Union[int, Decimal]
Expand Down Expand Up @@ -119,6 +122,25 @@ def __init__(
self.web3 = Web3(Web3.HTTPProvider(self.config.chain.chain_endpoint))
self.web3.middleware_onion.inject(geth_poa_middleware, layer=0)

self.circuit_breaker = CircuitBreaker(failure_threshold=5, reset_timeout=3600) # 1 hour timeout
self.rpc_health_thread = threading.Thread(target=self._check_rpc_health, daemon=True)
self.rpc_health_thread.start()

def _check_rpc_health(self):
while True:
self.check_rpc_health()
# Check every minute
time.sleep(60)

def check_rpc_health(self):
try:
block_number = self.get_current_block()
vana.logging.info(f"RPC is healthy. Current block number: {block_number}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this, I feel this may get too noisy in the logs.

return True
except Exception as e:
vana.logging.warning(f"RPC health check failed: {e}")
return False

@staticmethod
def config() -> "config":
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -241,6 +263,7 @@ def read_contract_fn(self, function: ContractFunction):
except Exception as e:
vana.logging.error(f"Failed to read from contract function: {e}")

@retry(exceptions=(Exception,), tries=10, delay=1, backoff=2, max_delay=60)
def get_current_block(self) -> int:
"""
Returns the current block number on the blockchain. This function provides the latest block
Expand All @@ -252,13 +275,21 @@ def get_current_block(self) -> int:
Knowing the current block number is essential for querying real-time data and performing time-sensitive
operations on the blockchain. It serves as a reference point for network activities and data synchronization.
"""
return self.web3.eth.block_number
return self.circuit_breaker.call(self._get_current_block)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be used at a higher level, e.g. where the server is spawned?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably misunderstanding, but does the circuit breaker give us anything additional that the @Retry doesn't?


def _get_current_block(self) -> int:
try:
return self.web3.eth.block_number
except Exception as e:
vana.logging.error(f"Error fetching current block number: {e}")
raise

def close(self):
"""
Cleans up resources for this ChainManager instance like active websocket connection and active extensions
"""
pass
if hasattr(self, 'rpc_health_thread') and self.rpc_health_thread.is_alive():
self.rpc_health_thread.join(timeout=5)

def get_total_stake_for_coldkey(
self, h160_address: str, block: Optional[int] = None
Expand Down Expand Up @@ -325,6 +356,7 @@ def determine_chain_endpoint_and_network(network: str):
#### Legacy ####
################

@retry(exceptions=(Exception,), tries=10, delay=1, backoff=2, max_delay=60)
def get_balance(self, address: str, block: Optional[int] = None) -> Balance:
"""
Retrieves the token balance of a specific address within the Vana network. This function queries
Expand All @@ -340,21 +372,26 @@ def get_balance(self, address: str, block: Optional[int] = None) -> Balance:
This function is important for monitoring account holdings and managing financial transactions
within the Vana ecosystem. It helps in assessing the economic status and capacity of network participants.
"""
return self.circuit_breaker.call(self._get_balance, address, block)

def _get_balance(self, address: str, block: Optional[int] = None) -> Balance:
vana.logging.info(f"Fetching balance for address {address}")
try:
@retry(delay=2, tries=3, backoff=2, max_delay=4, logger=logger)
def make_web3_call_with_retry():
vana.logging.info(f"Fetching balance for address {address}")
return self.web3.eth.get_balance(address, block_identifier=block)

result = make_web3_call_with_retry()
result = self.web3.eth.get_balance(address, block_identifier=block)
except Exception as e:
vana.logging.error(f"Error fetching balance for address {address}: {e}")
return 0
raise

return Web3.from_wei(result, "ether")

def transfer(
@retry(exceptions=(Exception,), tries=10, delay=1, backoff=2, max_delay=60)
def transfer(self, wallet: "vana.Wallet", dest: str, amount: Union[Balance, float],
wait_for_inclusion: bool = True, wait_for_finalization: bool = False,
prompt: bool = False) -> bool:
return self.circuit_breaker.call(self._transfer, wallet, dest, amount,
wait_for_inclusion, wait_for_finalization, prompt)

def _transfer(
self,
wallet: "vana.Wallet",
dest: str,
Expand Down Expand Up @@ -422,32 +459,36 @@ def transfer(

# Send the transaction.
logger.info("Sending transaction...")
txn_hash = self.web3.eth.send_raw_transaction(signed_txn.rawTransaction)

# Wait for transaction inclusion.
if wait_for_inclusion:
try:
receipt = self.web3.eth.wait_for_transaction_receipt(txn_hash, timeout=120)
if receipt.status == 1:
logger.info(f"Transaction included in block {receipt.blockNumber}. Hash: {txn_hash.hex()}")
else:
logger.error("Transaction failed.")
try:
txn_hash = self.web3.eth.send_raw_transaction(signed_txn.rawTransaction)

# Wait for transaction inclusion.
if wait_for_inclusion:
try:
receipt = self.web3.eth.wait_for_transaction_receipt(txn_hash, timeout=120)
if receipt.status == 1:
logger.info(f"Transaction included in block {receipt.blockNumber}. Hash: {txn_hash.hex()}")
else:
logger.error("Transaction failed.")
return False
except TransactionNotFound:
logger.error("Transaction not found within timeout period.")
return False
except TransactionNotFound:
logger.error("Transaction not found within timeout period.")
return False

# Wait for transaction finalization.
if wait_for_finalization:
try:
while True:
receipt = self.web3.eth.get_transaction_receipt(txn_hash)
if receipt.blockNumber is not None:
logger.info(f"Transaction finalized in block {receipt.blockNumber}. Hash: {txn_hash.hex()}")
break
time.sleep(2)
except TransactionNotFound:
logger.error("Transaction not found within timeout period.")
return False
# Wait for transaction finalization.
if wait_for_finalization:
try:
while True:
receipt = self.web3.eth.get_transaction_receipt(txn_hash)
if receipt.blockNumber is not None:
logger.info(f"Transaction finalized in block {receipt.blockNumber}. Hash: {txn_hash.hex()}")
break
time.sleep(2)
except TransactionNotFound:
logger.error("Transaction not found within timeout period.")
return False

return True
return True
except Exception as e:
logger.error(f"Error during transfer: {e}")
raise
32 changes: 29 additions & 3 deletions vana/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,27 @@ def __init__(
if sync:
self.sync(block=None, lite=lite)

self.cache_file = f"state_cache_{network}_{dlp_uid}.json"

def save_cache(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between the cache and the state file? Seems to be saving the same data?

cache_data = {
"node_servers": [ns.__dict__ for ns in self.node_servers],
"weights": self.weights,
"last_update": self.last_update,
"block": self.block
}
with open(self.cache_file, 'w') as f:
json.dump(cache_data, f)

def load_cache(self):
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r') as f:
cache_data = json.load(f)
self.node_servers = set([vana.NodeServerInfo(**ns) for ns in cache_data['node_servers']])
self.weights = cache_data['weights']
self.last_update = cache_data['last_update']
self.block = cache_data['block']

def sync(
self,
block: Optional[int] = None,
Expand All @@ -93,9 +114,14 @@ def sync(
"""
Synchronizes the state with the network's current state.
"""
self.node_servers = chain_manager.get_active_node_servers()
self.last_update = time.time()
self.block = chain_manager.get_current_block()
try:
self.node_servers = chain_manager.get_active_node_servers()
self.last_update = time.time()
self.block = chain_manager.get_current_block()
self.save_cache()
except Exception as e:
vana.logging.error(f"Failed to sync state: {e}")
self.load_cache()

def set_hotkeys(self, hotkeys: List[str]):
"""
Expand Down
30 changes: 30 additions & 0 deletions vana/utils/circuit_breaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import time


class CircuitBreaker:
def __init__(self, failure_threshold, reset_timeout):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failures = 0
self.last_failure_time = 0
self.state = "CLOSED"

def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "HALF-OPEN"
else:
raise Exception("Circuit is OPEN")

try:
result = func(*args, **kwargs)
if self.state == "HALF-OPEN":
self.state = "CLOSED"
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
raise e