Skip to content

Commit

Permalink
Relayer upgrades
Browse files Browse the repository at this point in the history
- Fix all config problems
- Improve Secret Contract call handling (prepare for next chain upgrade)
- Improve bad RPC handling
- Improve performance
  • Loading branch information
SecretSaturn committed Sep 19, 2024
1 parent a21d6ca commit 9cddc71
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 202 deletions.
1 change: 1 addition & 0 deletions TNLS-Relayers/Gateway.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"type":"constructor","inputs":[{"name":"secretGatewaySignerAddr","type":"address","internalType":"address"}],"stateMutability":"nonpayable"},{"type":"function","name":"VRF_routing_code_hash","inputs":[],"outputs":[{"name":"","type":"string","internalType":"string"}],"stateMutability":"view"},{"type":"function","name":"VRF_routing_info","inputs":[],"outputs":[{"name":"","type":"string","internalType":"string"}],"stateMutability":"view"},{"type":"function","name":"increaseTaskId","inputs":[{"name":"_newTaskId","type":"uint256","internalType":"uint256"}],"outputs":[],"stateMutability":"nonpayable"},{"type":"function","name":"initialize","inputs":[],"outputs":[],"stateMutability":"nonpayable"},{"type":"function","name":"owner","inputs":[],"outputs":[{"name":"","type":"address","internalType":"address"}],"stateMutability":"view"},{"type":"function","name":"payoutBalance","inputs":[],"outputs":[],"stateMutability":"nonpayable"},{"type":"function","name":"postExecution","inputs":[{"name":"_taskId","type":"uint256","internalType":"uint256"},{"name":"_sourceNetwork","type":"string","internalType":"string"},{"name":"_info","type":"tuple","internalType":"struct Gateway.PostExecutionInfo","components":[{"name":"payload_hash","type":"bytes32","internalType":"bytes32"},{"name":"packet_hash","type":"bytes32","internalType":"bytes32"},{"name":"callback_address","type":"bytes20","internalType":"bytes20"},{"name":"callback_selector","type":"bytes4","internalType":"bytes4"},{"name":"callback_gas_limit","type":"bytes4","internalType":"bytes4"},{"name":"packet_signature","type":"bytes","internalType":"bytes"},{"name":"result","type":"bytes","internalType":"bytes"}]}],"outputs":[],"stateMutability":"nonpayable"},{"type":"function","name":"renounceOwnership","inputs":[],"outputs":[],"stateMutability":"nonpayable"},{"type":"function","name":"requestRandomness","inputs":[{"name":"_numWords","type":"uint32","internalType":"uint32"},{"name":"_callbackGasLimit","type":"uint32","internalType":"uint32"}],"outputs":[{"name":"requestId","type":"uint256","internalType":"uint256"}],"stateMutability":"payable"},{"type":"function","name":"secret_gateway_signer_address","inputs":[],"outputs":[{"name":"","type":"address","internalType":"address"}],"stateMutability":"view"},{"type":"function","name":"send","inputs":[{"name":"_payloadHash","type":"bytes32","internalType":"bytes32"},{"name":"_userAddress","type":"address","internalType":"address"},{"name":"_routingInfo","type":"string","internalType":"string"},{"name":"_info","type":"tuple","internalType":"struct Gateway.ExecutionInfo","components":[{"name":"user_key","type":"bytes","internalType":"bytes"},{"name":"user_pubkey","type":"bytes","internalType":"bytes"},{"name":"routing_code_hash","type":"string","internalType":"string"},{"name":"task_destination_network","type":"string","internalType":"string"},{"name":"handle","type":"string","internalType":"string"},{"name":"nonce","type":"bytes12","internalType":"bytes12"},{"name":"callback_gas_limit","type":"uint32","internalType":"uint32"},{"name":"payload","type":"bytes","internalType":"bytes"},{"name":"payload_signature","type":"bytes","internalType":"bytes"}]}],"outputs":[{"name":"_taskId","type":"uint256","internalType":"uint256"}],"stateMutability":"payable"},{"type":"function","name":"taskId","inputs":[],"outputs":[{"name":"","type":"uint256","internalType":"uint256"}],"stateMutability":"view"},{"type":"function","name":"task_destination_network","inputs":[],"outputs":[{"name":"","type":"string","internalType":"string"}],"stateMutability":"view"},{"type":"function","name":"tasks","inputs":[{"name":"","type":"uint256","internalType":"uint256"}],"outputs":[{"name":"payload_hash_reduced","type":"bytes31","internalType":"bytes31"},{"name":"completed","type":"bool","internalType":"bool"}],"stateMutability":"view"},{"type":"function","name":"transferOwnership","inputs":[{"name":"newOwner","type":"address","internalType":"address"}],"outputs":[],"stateMutability":"nonpayable"},{"type":"function","name":"upgradeHandler","inputs":[],"outputs":[],"stateMutability":"nonpayable"},{"type":"event","name":"FulfilledRandomWords","inputs":[{"name":"requestId","type":"uint256","indexed":true,"internalType":"uint256"}],"anonymous":false},{"type":"event","name":"Initialized","inputs":[{"name":"version","type":"uint64","indexed":false,"internalType":"uint64"}],"anonymous":false},{"type":"event","name":"OwnershipTransferred","inputs":[{"name":"previousOwner","type":"address","indexed":true,"internalType":"address"},{"name":"newOwner","type":"address","indexed":true,"internalType":"address"}],"anonymous":false},{"type":"event","name":"TaskCompleted","inputs":[{"name":"taskId","type":"uint256","indexed":true,"internalType":"uint256"},{"name":"callbackSuccessful","type":"bool","indexed":false,"internalType":"bool"}],"anonymous":false},{"type":"event","name":"logNewTask","inputs":[{"name":"task_id","type":"uint256","indexed":true,"internalType":"uint256"},{"name":"source_network","type":"string","indexed":false,"internalType":"string"},{"name":"user_address","type":"address","indexed":false,"internalType":"address"},{"name":"routing_info","type":"string","indexed":false,"internalType":"string"},{"name":"payload_hash","type":"bytes32","indexed":false,"internalType":"bytes32"},{"name":"info","type":"tuple","indexed":false,"internalType":"struct Gateway.ExecutionInfo","components":[{"name":"user_key","type":"bytes","internalType":"bytes"},{"name":"user_pubkey","type":"bytes","internalType":"bytes"},{"name":"routing_code_hash","type":"string","internalType":"string"},{"name":"task_destination_network","type":"string","internalType":"string"},{"name":"handle","type":"string","internalType":"string"},{"name":"nonce","type":"bytes12","internalType":"bytes12"},{"name":"callback_gas_limit","type":"uint32","internalType":"uint32"},{"name":"payload","type":"bytes","internalType":"bytes"},{"name":"payload_signature","type":"bytes","internalType":"bytes"}]}],"anonymous":false},{"type":"error","name":"InvalidInitialization","inputs":[]},{"type":"error","name":"NotInitializing","inputs":[]},{"type":"error","name":"OwnableInvalidOwner","inputs":[{"name":"owner","type":"address","internalType":"address"}]},{"type":"error","name":"OwnableUnauthorizedAccount","inputs":[{"name":"account","type":"address","internalType":"address"}]}]
46 changes: 22 additions & 24 deletions TNLS-Relayers/eth_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from logging import getLogger, basicConfig, INFO, StreamHandler
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock, Timer
from time import sleep
from threading import Lock, Thread, Event

from web3 import Web3, middleware, auto
from web3.datastructures import AttributeDict
Expand Down Expand Up @@ -33,7 +32,7 @@ def __init__(self, private_key="", provider=None, contract_address="", chain_id=
self.contract_address = contract_address
self.chain_id = chain_id
self.nonce = self.provider.eth.get_transaction_count(self.address, 'pending')

# Set up logging
basicConfig(
level=INFO,
Expand All @@ -46,32 +45,30 @@ def __init__(self, private_key="", provider=None, contract_address="", chain_id=
self.nonce_lock = Lock()
self.timer = None
self.sync_interval = sync_interval
self.executor = ThreadPoolExecutor(max_workers=1)

# Schedule nonce synchronization
self.schedule_sync()
self.stop_event = Event()
self.sync_thread = Thread(target=self.sync_loop)
self.sync_thread.start()

def schedule_sync(self):
def sync_loop(self):
"""
Schedule nonce sync task with the executor and restart the timer
Continuously sync nonce at specified intervals.
"""
try:
self.executor.submit(self.sync_nonce)
except Exception as e:
self.logger.error(f"Error during Ethereum nonce sync: {e}")
finally:
# Re-run the sync at specified intervals
self.timer = Timer(self.sync_interval, self.schedule_sync)
self.timer.start()
while not self.stop_event.is_set():
try:
self.sync_nonce()
except Exception as e:
self.logger.error(f"Error during Ethereum nonce sync: {e}")
# Wait for the sync interval or until the stop event is set
self.stop_event.wait(self.sync_interval)

def sync_nonce(self):
"""
Sync the nonce with the latest data from the provider
Sync the nonce with the latest data from the provider.
"""
try:
with self.nonce_lock:
self.logger.info(f"Starting Chain-id {self.chain_id} nonce sync")
sleep(1) # Introduce a delay if needed to reduce frequency of sync errors
new_nonce = self.provider.eth.get_transaction_count(self.address, 'pending')
if self.nonce is None or new_nonce >= self.nonce:
self.nonce = new_nonce
Expand All @@ -95,7 +92,7 @@ def create_transaction(self, contract_function, *args, **kwargs):
'from': self.address,
'gas': callback_gas_limit,
'nonce': deepcopy(self.nonce),
'gasPrice': self.provider.eth.gas_price
'gasPrice': int(1.5*self.provider.eth.gas_price)
#'maxFeePerGas': self.provider.eth.max_base
#'maxPriorityFeePerGas': self.provider.eth.max_priority_fee,
})
Expand All @@ -104,7 +101,7 @@ def create_transaction(self, contract_function, *args, **kwargs):
'from': self.address,
'gas': 2000000,
'nonce': deepcopy(self.nonce),
'gasPrice': self.provider.eth.gas_price
'gasPrice': int(1.5*self.provider.eth.gas_price)
#'maxFeePerGas': self.provider.eth.max_priority_fee
#'maxPriorityFeePerGas': self.provider.eth.max_priority_fee,
})
Expand All @@ -114,7 +111,7 @@ def create_transaction(self, contract_function, *args, **kwargs):
'from': self.address,
'gas': callback_gas_limit,
'nonce': deepcopy(self.nonce),
'gasPrice': self.provider.eth.gas_price
'gasPrice': int(1.5*self.provider.eth.gas_price)
#'maxFeePerGas': self.provider.eth.max_priority_fee
#'maxPriorityFeePerGas': self.provider.eth.max_priority_fee,
})
Expand Down Expand Up @@ -155,16 +152,16 @@ def get_last_txs(self, block_number=None, contract_interface=None):
Returns: a list of transaction receipts
"""
if block_number is None:
block_number = self.get_last_block()

try:
if block_number is None:
block_number = self.get_last_block()
valid_transactions = contract_interface.contract.events.logNewTask().get_logs(
fromBlock=block_number,
toBlock=block_number
)
except Exception as e:
self.logger.warning(e)
return []

if len(valid_transactions) == 0:
return []
Expand All @@ -183,6 +180,7 @@ def get_last_txs(self, block_number=None, contract_interface=None):
correct_transactions.append(result)
except Exception as e:
self.logger.warning(e)
return []

return correct_transactions

Expand Down
30 changes: 12 additions & 18 deletions TNLS-Relayers/relayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,26 @@
Individual thread:
for each object:
get destination network
verify signature?
stringify object as json
send json string to destination network
"""
import json
from logging import getLogger, basicConfig, DEBUG, StreamHandler
from threading import Thread
from time import sleep
from typing import Dict, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed

from base_interface import Task, BaseContractInterface, BaseChainInterface, eth_chains, scrt_chains
import warnings

warnings.filterwarnings("ignore")
from concurrent.futures import ThreadPoolExecutor, as_completed


class Relayer:
def __init__(self,
dict_of_names_to_interfaces: Dict[str, Tuple[BaseChainInterface, BaseContractInterface, str, str]],
num_loops=None):

"""
NOTE: the below default private key is for testing only, and does not correspond to any real account/wallet
"""

# Create the dictionary and add the tuple
self.dict_of_names_to_interfaces = dict_of_names_to_interfaces
"""
Expand Down Expand Up @@ -85,8 +81,9 @@ def fetch_transactions(block_num):
tasks_tmp.extend(contract_interface.parse_event_from_txn(evt_name, transaction))
return block_num, tasks_tmp

with ThreadPoolExecutor(max_workers = 30) as executor2:
futures2 = [executor2.submit(fetch_transactions, block_num) for block_num in range(prev_height + 1, curr_height + 1)]
with ThreadPoolExecutor(max_workers=30) as executor2:
futures2 = [executor2.submit(fetch_transactions, block_num) for block_num in
range(prev_height + 1, curr_height + 1)]
for future in futures2:
block_num, tasks = future.result()
self.logger.info(f'Processed block {block_num} on {name}')
Expand All @@ -95,19 +92,17 @@ def fetch_transactions(block_num):
self.task_ids_to_statuses[task_id] = 'Received from {}'.format(name)
self.task_list.extend(tasks)


with ThreadPoolExecutor(max_workers = 200) as executor:
with ThreadPoolExecutor(max_workers=200) as executor:
# Filter out secret chains if needed
futures = [executor.submit(process_chain, chain) for chain in chains_to_poll]


def route_transaction(self, task: Task):
"""
Given a Task, routes it where it's supposed to go
Args:
task: the Task to be routed
"""
self.logger.info('Routing task {}',vars(task))
self.logger.info('Routing task {}', vars(task))
if task.task_destination_network is None:
self.logger.warning(f'Task {task} has no destination network, not routing')
self.task_ids_to_statuses[task.task_data['task_id']] = 'Failed to route'
Expand All @@ -119,8 +114,8 @@ def route_transaction(self, task: Task):
contract_for_txn = self.dict_of_names_to_interfaces[task.task_destination_network][1]
function_name = self.dict_of_names_to_interfaces[task.task_destination_network][3]
if task.task_destination_network in scrt_chains:
ntasks, _ = contract_for_txn.call_function(function_name, str(task))
self.task_list.extend(ntasks)
new_tasks, _ = contract_for_txn.call_function(function_name, str(task))
self.task_list.extend(new_tasks)
else:
contract_for_txn.call_function(function_name, str(task))
self.task_ids_to_statuses[str(task.task_data['task_id'])] = 'Routed to {}'.format(task.task_destination_network)
Expand All @@ -133,6 +128,7 @@ def task_list_handle(self):
Spins up threads to handle each task in the task list
"""

def _thread_func():
while len(self.task_list) > 0:
task = self.task_list.pop()
Expand All @@ -154,12 +150,10 @@ def run(self):
"""
self.logger.info('Starting relayer')
self.loops_run = 0
while (self.num_loops is not None and self.loops_run < self.num_loops) or self.num_loops is None:
while True:
self.poll_for_transactions()
self.logger.info('Polled for transactions, now have {} remaining'.format(len(self.task_list)))
self.task_list_handle()
self.loops_run += 1
sleep(1)
pass

Expand Down
Loading

0 comments on commit 9cddc71

Please sign in to comment.