Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added test with all happy paths for interacting with a staking provider #35

Open
wants to merge 10 commits into
base: rc/v1.7.0
Choose a base branch
from
3 changes: 3 additions & 0 deletions testing-suite/staking-v4/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from chain_commander import add_blocks
import time

from utils.logger import logger


def force_reset_validator_statistics():
route = f"{DEFAULT_PROXY}/simulator/force-reset-validator-statistics"
Expand All @@ -14,3 +16,4 @@ def force_reset_validator_statistics():

# wait 1 sec
time.sleep(1)
logger.info("Validator statistics reset successfully and additional block added")
80 changes: 30 additions & 50 deletions testing-suite/staking-v4/chain_commander.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import requests
import json

from config import *
from network_provider.get_transaction_info import get_status_of_tx
from constants import *
import time
from core.validatorKey import ValidatorKey

from utils.logger import logger


def send_egld_to_address(egld_amount, erd_address):
logger.info(f"Sending {egld_amount} to address {erd_address}")
details = {
'address': f'{erd_address}',
'balance': f'{egld_amount}'
Expand All @@ -18,93 +19,71 @@ def send_egld_to_address(egld_amount, erd_address):
json_structure = json.dumps(details_list)
response = requests.post(f"{DEFAULT_PROXY}/simulator/set-state", data=json_structure)
response.raise_for_status()

response_data = response.json()
logger.info(f"Transfer response: {response_data.get('message', 'Balance updated successfully')}")
return response.text


def add_blocks(nr_of_blocks):
logger.info(f"Requesting generation of {nr_of_blocks} blocks")
response = requests.post(f"{DEFAULT_PROXY}/simulator/generate-blocks/{nr_of_blocks}")
response.raise_for_status()
logger.info(f"Generated {nr_of_blocks} blocks; Response status: {response.status_code}")
return response.text


def get_block() -> int:
response = requests.get(f"{DEFAULT_PROXY}/network/status/0")
response.raise_for_status()
parsed = response.json()

general_data = parsed.get("data")
general_status = general_data.get("status")
nonce = general_status.get("erd_nonce")
logger.info(f"Current block nonce: {nonce}")
return nonce


def add_blocks_until_epoch_reached(epoch_to_be_reached: int):
logger.info(f"Generating blocks until epoch {epoch_to_be_reached} is reached")
req = requests.post(f"{DEFAULT_PROXY}/simulator/generate-blocks-until-epoch-reached/{str(epoch_to_be_reached)}")
req.raise_for_status()
add_blocks(1)
logger.info(f"Epoch {epoch_to_be_reached} reached")
return req.text


def add_blocks_until_tx_fully_executed(tx_hash) -> str:
print("Checking: ", tx_hash)
logger.info(f"Checking status of transaction {tx_hash}")
counter = 0

while counter < MAX_NUM_OF_BLOCKS_UNTIL_TX_SHOULD_BE_EXECUTED:
add_blocks(1)

time.sleep(WAIT_UNTIL_API_REQUEST_IN_SEC)
if get_status_of_tx(tx_hash) == "pending":
tx_status = get_status_of_tx(tx_hash)
if tx_status == "pending":
logger.info(f"Transaction {tx_hash} still pending after {counter} blocks")
counter += 1
else:
print("Tx fully executed after", counter, " blocks.")
return get_status_of_tx(tx_hash)
logger.info(f"Transaction {tx_hash} executed after {counter} blocks")
return tx_status
raise Exception(f"Transaction {tx_hash} not executed within {MAX_NUM_OF_BLOCKS_UNTIL_TX_SHOULD_BE_EXECUTED} blocks.")


def is_chain_online() -> bool:
flag = False

while not flag:
while True:
time.sleep(1)
try:
response = requests.get(f"{DEFAULT_PROXY}/network/status/0")
print(response)
flag = True
except requests.exceptions.ConnectionError:
print("Chain not started jet")

return flag


def add_key(keys: list[ValidatorKey]) -> str:
private_keys = []
for key in keys:
private_keys.append(key.get_private_key())

post_body = {
"privateKeysBase64": private_keys
}

json_structure = json.dumps(post_body)
req = requests.post(f"{DEFAULT_PROXY}/simulator/add-keys", data=json_structure)

return req.text


def add_blocks_until_key_eligible(keys: list[ValidatorKey]) -> ValidatorKey:
flag = False
while not flag:
for key in keys:
if key.get_state() == "eligible":
eligible_key = key
print("eligible key found")
flag = True

else:
print("no eligible key found , moving to next epoch...")
current_epoch = proxy_default.get_network_status().epoch_number
add_blocks_until_epoch_reached(current_epoch+1)
add_blocks(3)

return eligible_key
response.raise_for_status()
logger.info("Chain is online")
return True
except requests.exceptions.ConnectionError as e:
logger.warning("Chain not started yet: ConnectionError")
except Exception as e:
logger.error(f"Unexpected error when checking chain status: {str(e)}")
raise


def add_blocks_until_last_block_of_current_epoch() -> str:
Expand All @@ -117,6 +96,7 @@ def add_blocks_until_last_block_of_current_epoch() -> str:
passed_nonces = status.get("erd_nonces_passed_in_current_epoch")

blocks_to_be_added = rounds_per_epoch - passed_nonces
logger.info(f"Adding {blocks_to_be_added} blocks to reach the end of the current epoch")
response_from_add_blocks = add_blocks(blocks_to_be_added)
logger.info(f"Reached the last block of the current epoch")
return response_from_add_blocks

54 changes: 42 additions & 12 deletions testing-suite/staking-v4/core/chain_simulator.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import stat

from constants import *
from config import *
import os
import signal
import subprocess
from subprocess import Popen
from threading import Thread
import threading

from utils.logger import logger


class ChainSimulator:
def __init__(self, path: Path) -> None:
Expand All @@ -20,30 +17,63 @@ def __init__(self, path: Path) -> None:
self.num_waiting_validators_meta = num_waiting_validators_meta
self.rounds_per_epoch = rounds_per_epoch
self.process = None
logger.info(f"Trying to Initialize ChainSimulator with configuration at {path}\n")

# Check if the ChainSimulator binary exists in the specified path
if not os.path.exists(self.path / "chainsimulator"):
logger.error("ChainSimulator binary not found at the specified path.")
raise FileNotFoundError("ChainSimulator binary not found at the specified path.")

def start(self):
command = f"./chainsimulator --log-level {self.log_level} --rounds-per-epoch {rounds_per_epoch}\
-num-validators-per-shard {self.num_validators_per_shard} \
-num-waiting-validators-per-shard {num_waiting_validators_per_shard} \
-num-validators-meta {num_validators_meta} \
-num-waiting-validators-meta {num_waiting_validators_meta}"
command = ' '.join(command.split())

flag = True
while flag:
if " " in command:
command = command.replace(" ", " ")
else:
flag = False
print(command)
logger.info(f"Starting ChainSimulator with command: {command}")

self.process = subprocess.Popen(command, stdout=subprocess.PIPE,
self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, preexec_fn=os.setsid, cwd=chain_simulator_path)

out, err = self.process.communicate()
if err:
print(err)
stdout_thread = threading.Thread(target=self.read_output, args=(self.process.stdout,))
stderr_thread = threading.Thread(target=self.read_output, args=(self.process.stderr, True))
stdout_thread.start()
stderr_thread.start()

def read_output(self, stream, is_error=False):
"""Reads from a stream and logs the output."""
try:
for line in iter(stream.readline, b''):
decoded_line = line.decode()
# TODO Use the code below in order to retreive the chain Simulator logs
# if is_error:
# logger.error(decoded_line.strip())
# else:
# logger.info(decoded_line.strip())
finally:
stream.close()

def stop(self):
if self.process is not None:
# Send SIGTERM to the process group to cleanly stop all processes
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)

def stop(self) -> None:
self.process.terminate()
self.process.wait()

# Ensure output threads are also terminated
if hasattr(self, 'stdout_thread'):
self.stdout_thread.join()
if hasattr(self, 'stderr_thread'):
self.stderr_thread.join()

logger.info("ChainSimulator process and all child processes stopped\n")
else:
logger.warning("\nNo ChainSimulator process found.\n")
14 changes: 13 additions & 1 deletion testing-suite/staking-v4/core/validatorKey.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
class ValidatorKey:
def __init__(self, path: Path) -> None:
self.path = path
logger.info(f"ValidatorKey initialized with path: {path}")

def public_address(self) -> str:
f = open(self.path)
Expand All @@ -29,9 +30,11 @@ def get_status(self, owner_address: str):
owner_address = Address.from_bech32(owner_address).to_hex()
key_status_pair = get_bls_key_status([owner_address])
if key_status_pair is None:
logger.warning("No status found for any keys")
return None
for key, status in key_status_pair.items():
if key == self.public_address():
logger.info(f"Status: {status} for BLS Key: {key} ")
return status

# is using /validator/statistics route
Expand All @@ -47,15 +50,19 @@ def get_state(self):
key_data = general_statistics.get(self.public_address())

if key_data is None:
logger.warning(f"No state data found for validator key: {key_data}")
return None
else:
status = key_data.get("validatorStatus")
logger.info(f"Validator status is: {status}")
return status

# is using /validator/auction
def get_auction_state(self):
logger.info(f"Resetting validator statistics before fetching auction state.")
force_reset_validator_statistics()

logger.info(f"Requesting auction state from {OBSERVER_META}/validator/auction.")
response = requests.get(f"{OBSERVER_META}/validator/auction")
response.raise_for_status()
parsed = response.json()
Expand All @@ -69,18 +76,23 @@ def get_auction_state(self):
if node_list.get("blsKey") == self.public_address():
state = node_list.get("qualified")
if state:
logger.info(f"BLS key {self.public_address()} is qualified in the auction.")
return "qualified"
else:
logger.info(f"BLS key {self.public_address()} is unqualified in the auction.")
return "unqualified"
else:
logger.info(f"No auction data found for BLS key {self.public_address()}.")
return None

# using getOwner vm-query
def belongs_to(self, address: str) -> bool:
owner = get_owner([self.public_address()])
if owner == address:
logger.info(f"Checked ownership: True for address: {address}")
return True
else:
logger.info(f"Checked ownership: False for address: {address}")
return False

def get_private_key(self) -> str:
Expand All @@ -93,5 +105,5 @@ def get_private_key(self) -> str:
private_key += line
if "\n" in private_key:
private_key = private_key.replace("\n", "")

logger.debug(f"Private key retrieved for {self.path}")
return private_key
36 changes: 22 additions & 14 deletions testing-suite/staking-v4/core/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,61 @@
from multiversx_sdk_wallet import UserSigner
from multiversx_sdk_core import Address

from utils.logger import logger


class Wallet:
def __init__(self, path: Path) -> None:
self.path = path
logger.info(f"Wallet initialized with path: {self.path}")

def public_address(self) -> str:
f = open(self.path)
with open(self.path) as f:
lines = f.readlines()

lines = f.readlines()
for line in lines:
if "BEGIN" in line:
line = line.split(" ")
address = line[-1].replace("-----", "")
if "\n" in address:
address = address.replace("\n", "")
break

return address
address = line[-1].replace("-----", "").strip()
return address

def get_balance(self) -> int:
response = requests.get(f"{DEFAULT_PROXY}/address/{self.public_address()}/balance")
address = self.public_address()
logger.info(f"Fetching balance for address: {address}")
response = requests.get(f"{DEFAULT_PROXY}/address/{address}/balance")
response.raise_for_status()
parsed = response.json()

general_data = parsed.get("data")
balance = general_data.get("balance")
logger.info(f"Retrieved balance: {balance} for address: {address}")

return balance


def set_balance(self, egld_amount):
address = self.public_address()
logger.info(f"Setting balance for address: {address} to {egld_amount}")
details = {
'address': f'{self.public_address()}',
'balance': f'{egld_amount}'
'address': address,
'balance': egld_amount
}

details_list = [details]
json_structure = json.dumps(details_list)
req = requests.post(f"{DEFAULT_PROXY}/simulator/set-state", data=json_structure)
logger.info(f"Set balance request status: {req.status_code}")

return req.text

def get_signer(self) -> UserSigner:
logger.info("Creating UserSigner from PEM file.")
return UserSigner.from_pem_file(self.path)

def get_address(self) -> Address:
return Address.from_bech32(self.public_address())
address = self.public_address()
return Address.from_bech32(address)

def get_account(self):
return proxy_default.get_account(self.get_address())
account = proxy_default.get_account(self.get_address())
logger.info(f"Retrieved account details for: {account.address.to_bech32()}")
return account
Loading
Loading