From 1d1f560284fcbfae0d61cd58d81730eb6486f007 Mon Sep 17 00:00:00 2001 From: starkbamse <139136798+starkbamse@users.noreply.github.com> Date: Wed, 18 Sep 2024 12:41:37 +0200 Subject: [PATCH 1/2] Add timeout Timeout all web3 calls to prevent hanging. --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/core.rs | 48 ++++++++++++++++--------- src/fetcher/mod.rs | 20 +++++++---- src/interface/mod.rs | 83 +++++++++++++++++++++++++++++++------------- src/lib.rs | 3 +- 6 files changed, 108 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a76a34..ffdeb55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3377,7 +3377,7 @@ dependencies = [ [[package]] name = "rustlink" -version = "0.0.2" +version = "0.0.3" dependencies = [ "async-std", "bincode", diff --git a/Cargo.toml b/Cargo.toml index b6f4abf..d656543 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ crate-type = ["cdylib", "rlib"] [package] name = "rustlink" -version = "0.0.2" +version = "0.0.3" edition = "2021" license = "MIT OR Apache-2.0" description = "A lightweight and easy-to-use library for periodically retrieving data from the Chainlink decentralized data feed." diff --git a/src/core.rs b/src/core.rs index fd219bc..9e2c2c9 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,13 +1,16 @@ use crate::{error::Error, fetcher::fetch_rounds, interface}; use async_std::channel::{unbounded, Receiver, RecvError, Sender}; -use ethers::{providers::{Http, Provider}, types::Address}; +use ethers::{ + providers::{Http, Provider}, + types::Address, +}; use js_sys::Function; use serde_wasm_bindgen::{from_value, to_value}; -use workflow_rs::core::cfg_if; use std::str::FromStr; use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; use wasm_bindgen_futures::spawn_local; +use workflow_rs::core::cfg_if; /// ## Configuration /// This struct contains the configuration for Rustlink. It contains the following fields: @@ -19,6 +22,7 @@ pub struct Configuration { pub fetch_interval_seconds: u64, pub contracts: Vec<(String, Address)>, pub provider: Provider, + pub call_timeout: std::time::Duration, } /// ## Rustlink instance. This is the main struct that you will interact with. @@ -69,16 +73,16 @@ impl Rustlink { /// - `fetch_interval_seconds`: How often to update data points in the database (to prevent RPC rate limitation) /// - `reflector`: How you choose to receive the answer from your provided contracts. /// - `contracts`: A tuple list containing a ticker name and its corresponding contract address on the - /// EVM chain. + /// EVM chain. /// /// Example: /// /// ```rust /// use async_std::channel::unbounded; /// use rustlink::core::{Reflector, Rustlink}; - /// + /// /// #[tokio::main] - /// + /// /// async fn main(){ /// let mut contracts: Vec<(String, String)> = Vec::new(); /// contracts.push(( @@ -93,6 +97,7 @@ impl Rustlink { /// 1, /// Reflector::Sender(sender), /// contracts, + /// std::time::Duration::from_secs(10), /// ) /// .unwrap(); /// rustlink.start(); @@ -105,21 +110,28 @@ impl Rustlink { fetch_interval_seconds: u64, reflector: Reflector, contracts: Vec<(String, String)>, + call_timeout: std::time::Duration, ) -> Result { - let provider = Provider::try_from(rpc_url).expect("Invalid RPC URL"); let (termination_send, termination_recv) = unbounded::<()>(); let (shutdown_send, shutdown_recv) = unbounded::<()>(); - let parsed_contracts=contracts.iter().map(|(identifier, address)| { - (identifier.clone(), Address::from_str(address).expect("Invalid contract address specified")) - }).collect(); + let parsed_contracts = contracts + .iter() + .map(|(identifier, address)| { + ( + identifier.clone(), + Address::from_str(address).expect("Invalid contract address specified"), + ) + }) + .collect(); Ok(Rustlink { configuration: Configuration { fetch_interval_seconds, provider, - contracts:parsed_contracts, + contracts: parsed_contracts, + call_timeout, }, reflector, termination_send, @@ -130,7 +142,7 @@ impl Rustlink { } /// Starts the Rustlink instance. - /// This method will start fetching the latest price data from the Chainlink decentralized data feed. + /// This method will start fetching the latest price data from the Chainlink decentralized data feed. pub fn start(&self) { #[cfg(not(target_arch = "wasm32"))] tokio::task::spawn(fetch_rounds(self.clone())); @@ -227,17 +239,21 @@ impl RustlinkJS { contracts: Contracts, callback: Function, ) -> Self { - - // Cast `JsValue` to `Function` let contracts: Vec<(String, String)> = from_value(contracts.into()).unwrap(); let (sender, receiver) = async_std::channel::unbounded(); let reflector = Reflector::Sender(sender); - let rustlink = Rustlink::try_new(rpc_url, fetch_interval_seconds, reflector, contracts) - .map_err(|e| JsValue::from_str(&format!("{}", e))) - .unwrap(); + let rustlink = Rustlink::try_new( + rpc_url, + fetch_interval_seconds, + reflector, + contracts, + std::time::Duration::from_secs(10), + ) + .map_err(|e| JsValue::from_str(&format!("{}", e))) + .unwrap(); RustlinkJS { rustlink, diff --git a/src/fetcher/mod.rs b/src/fetcher/mod.rs index 3a8e807..5e9478b 100644 --- a/src/fetcher/mod.rs +++ b/src/fetcher/mod.rs @@ -1,22 +1,28 @@ use std::time::Duration; use async_std::stream::StreamExt; -use ethers::abi::AbiError; +use ethers::providers::{Http, Provider}; use ethers::types::Address; use futures::{select, FutureExt}; use super::interface::{ChainlinkContract, Round}; use crate::core::Reflector::Sender; use crate::core::{Configuration, Rustlink}; +use crate::interface::ContractCallError; /// Retrieves the price of an underlying asset from a particular contract -async fn fetch_round_data_for_contract( - rustlink_configuration: &Configuration, - identifier: &str, +async fn fetch_round_data_for_contract<'a>( + rustlink_configuration: &'a Configuration, + identifier: &'a str, address: Address, -) -> Result { - let contract = - ChainlinkContract::new(&rustlink_configuration.provider, identifier, address).await?; +) -> Result>> { + let contract = ChainlinkContract::new( + &rustlink_configuration.provider, + identifier, + address, + rustlink_configuration.call_timeout, + ) + .await?; contract.latest_round_data().await } diff --git a/src/interface/mod.rs b/src/interface/mod.rs index abef608..3744f3b 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -1,15 +1,31 @@ -use std::sync::Arc; +use async_std::future::{timeout, TimeoutError}; +use ethers::{ + abi::{Abi, AbiError}, + contract::{Contract, ContractError}, + providers::{Http, Middleware, Provider}, + types::{Address, U256}, +}; use serde::{Deserialize, Serialize}; -use ethers::{abi::{Abi, AbiError}, contract::Contract, providers::{Http, Provider}, types::{Address, U256}}; - +use std::{sync::Arc, time::Duration}; +use thiserror::Error; #[derive(Clone)] pub struct ChainlinkContract<'a> { pub contract: Contract<&'a Provider>, pub identifier: &'a str, pub decimals: u8, + pub call_timeout: Duration, } +#[derive(Error, Debug)] +pub enum ContractCallError { + #[error("Abi error: {0}")] + Abi(#[from] AbiError), + #[error("Timeout error: {0}")] + Timeout(#[from] TimeoutError), + #[error("Contract error: {0}")] + Contract(#[from] ContractError), +} /// The latest price received for this symbol. /// This data is directly retrieved from the underlying contract. @@ -29,6 +45,21 @@ pub struct Round { pub answer: f64, } +/// Type alias for the raw round call to the contract +pub type RoundCall<'a> = Result<(u128, u128, U256, U256, u128), ContractError<&'a Provider>>; + +#[allow(clippy::redundant_allocation)] +async fn decimals<'a>( + contract: ðers::contract::ContractInstance>, &'a Provider>, +) -> Result>> { + Ok(contract + .method::<_, U256>("decimals", ()) + .unwrap() + .call() + .await? + .as_u64() as u8) +} + impl<'a> ChainlinkContract<'a> { /// Creates a new instance of a chainlink price aggregator. This is just a wrapper /// function to simplify the interactions with the contract. @@ -36,34 +67,34 @@ impl<'a> ChainlinkContract<'a> { provider: &'a Provider, identifier: &'a str, contract_address: Address, - ) -> Result, AbiError> { - let abi:Abi=serde_json::from_str(include_str!("IAggregatorV3Interface.json")).unwrap(); - let contract = Contract::new(contract_address, abi, Arc::new(provider)); + call_timeout: Duration, + ) -> Result, ContractCallError<&'a Provider>> { + let abi: Abi = serde_json::from_str(include_str!("IAggregatorV3Interface.json")).unwrap(); + let contract: ethers::contract::ContractInstance>, &Provider> = + Contract::new(contract_address, abi, Arc::new(provider)); - let decimals=contract.method::<_,U256>("decimals", ()).unwrap() - .call().await.unwrap().as_u64() as u8; + let decimals = timeout(call_timeout, decimals(&contract)).await??; Ok(ChainlinkContract { contract, decimals, identifier, + call_timeout, }) } + /// Wrapper function to call the latestRoundData method on the contract + async fn round_data(&self) -> RoundCall<'a> { + let round_call: RoundCall = self.contract.method("latestRoundData", ())?.call().await; + round_call + } + /// Retrieves the latest price of this underlying asset /// from the chainlink decentralized data feed - pub async fn latest_round_data(&self) -> Result { - let (round_id, answer, started_at, updated_at, answered_in_round): ( - u128, - u128, - U256, - U256, - u128, - ) = self - .contract - .method("latestRoundData", ())? - .call() - .await.unwrap(); + pub async fn latest_round_data(&self) -> Result>> { + // Call the contract, but timeout after 10 seconds + let (round_id, answer, started_at, updated_at, answered_in_round) = + timeout(self.call_timeout, self.round_data()).await??; // Convert the answer on contract to a string. let float_answer: f64 = answer.to_string().parse().unwrap(); @@ -85,18 +116,22 @@ impl<'a> ChainlinkContract<'a> { #[cfg(test)] mod tests { - use ethers::{abi::Address, providers::Provider}; + use std::time::Duration; + use crate::interface::ChainlinkContract; + use ethers::{abi::Address, providers::Provider}; #[tokio::test] async fn valid_answer() { - - let provider=Provider::try_from("https://bsc-dataseed1.binance.org/").unwrap(); + let provider = Provider::try_from("https://bsc-dataseed1.binance.org/").unwrap(); let chainlink_contract = ChainlinkContract::new( &provider, "ETH", - "0x9ef1B8c0E4F7dc8bF5719Ea496883DC6401d5b2e".parse::
().unwrap(), + "0x9ef1B8c0E4F7dc8bF5719Ea496883DC6401d5b2e" + .parse::
() + .unwrap(), + Duration::from_secs(10), ) .await .unwrap(); diff --git a/src/lib.rs b/src/lib.rs index abaf6b5..0bb98fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,9 +2,9 @@ /// This library provides a simple interface to fetch price data from the Chainlink decentralized data feed. /// Core is the main module that contains the main struct `Rustlink` that you will need to interact with. pub mod core; -mod interface; mod error; mod fetcher; +mod interface; #[cfg(test)] mod tests { @@ -27,6 +27,7 @@ mod tests { 1, Reflector::Sender(sender), contracts, + std::time::Duration::from_secs(3), ) .unwrap(); From 26e958703ec91491b87098591d54e71879424698 Mon Sep 17 00:00:00 2001 From: starkbamse <139136798+starkbamse@users.noreply.github.com> Date: Wed, 18 Sep 2024 12:43:39 +0200 Subject: [PATCH 2/2] Bump version of readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 784a04e..7524cdd 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ A lightweight rust library for periodically retrieving cryptocurrency prices from the ChainLink decentralized price feed. With `rustlink`, you can easily retrieve the latest price of any cryptocurrency supported by ChainLink. -[![crates.io](https://img.shields.io/crates/v/minismtp.svg)](https://crates.io/crates/rustlink) +[![crates.io](https://img.shields.io/crates/v/rustlink.svg)](https://crates.io/crates/rustlink) [![Documentation](https://docs.rs/rustlink/badge.svg)](https://docs.rs/rustlink) [![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE) ![GitHub branch status](https://img.shields.io/github/checks-status/starkbamse/rustlink/main) @@ -34,7 +34,7 @@ To use `rustlink` in your project, add the following to your `Cargo.toml` file: ```toml [dependencies] -rustlink = "0.0.2" +rustlink = "0.0.3" ``` ## Build for WASM