Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout all Web3 calls #9

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ crate-type = ["cdylib", "rlib"]

[package]
name = "rustlink"
version = "0.0.2"
version = "0.0.3"
edition = "2021"
license = "MIT OR Apache-2.0"
description = "A lightweight and easy-to-use library for periodically retrieving data from the Chainlink decentralized data feed."
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

A lightweight rust library for periodically retrieving cryptocurrency prices from the ChainLink decentralized price feed. With `rustlink`, you can easily retrieve the latest price of any cryptocurrency supported by ChainLink.

[![crates.io](https://img.shields.io/crates/v/minismtp.svg)](https://crates.io/crates/rustlink)
[![crates.io](https://img.shields.io/crates/v/rustlink.svg)](https://crates.io/crates/rustlink)
[![Documentation](https://docs.rs/rustlink/badge.svg)](https://docs.rs/rustlink)
[![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)
![GitHub branch status](https://img.shields.io/github/checks-status/starkbamse/rustlink/main)
Expand Down Expand Up @@ -34,7 +34,7 @@ To use `rustlink` in your project, add the following to your `Cargo.toml` file:

```toml
[dependencies]
rustlink = "0.0.2"
rustlink = "0.0.3"
```

## Build for WASM
Expand Down
48 changes: 32 additions & 16 deletions src/core.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::{error::Error, fetcher::fetch_rounds, interface};

use async_std::channel::{unbounded, Receiver, RecvError, Sender};
use ethers::{providers::{Http, Provider}, types::Address};
use ethers::{
providers::{Http, Provider},
types::Address,
};
use js_sys::Function;
use serde_wasm_bindgen::{from_value, to_value};
use workflow_rs::core::cfg_if;
use std::str::FromStr;
use wasm_bindgen::{prelude::wasm_bindgen, JsValue};
use wasm_bindgen_futures::spawn_local;
use workflow_rs::core::cfg_if;

/// ## Configuration
/// This struct contains the configuration for Rustlink. It contains the following fields:
Expand All @@ -19,6 +22,7 @@ pub struct Configuration {
pub fetch_interval_seconds: u64,
pub contracts: Vec<(String, Address)>,
pub provider: Provider<Http>,
pub call_timeout: std::time::Duration,
}

/// ## Rustlink instance. This is the main struct that you will interact with.
Expand Down Expand Up @@ -69,16 +73,16 @@ impl Rustlink {
/// - `fetch_interval_seconds`: How often to update data points in the database (to prevent RPC rate limitation)
/// - `reflector`: How you choose to receive the answer from your provided contracts.
/// - `contracts`: A tuple list containing a ticker name and its corresponding contract address on the
/// EVM chain.
/// EVM chain.
///
/// Example:
///
/// ```rust
/// use async_std::channel::unbounded;
/// use rustlink::core::{Reflector, Rustlink};
///
///
/// #[tokio::main]
///
///
/// async fn main(){
/// let mut contracts: Vec<(String, String)> = Vec::new();
/// contracts.push((
Expand All @@ -93,6 +97,7 @@ impl Rustlink {
/// 1,
/// Reflector::Sender(sender),
/// contracts,
/// std::time::Duration::from_secs(10),
/// )
/// .unwrap();
/// rustlink.start();
Expand All @@ -105,21 +110,28 @@ impl Rustlink {
fetch_interval_seconds: u64,
reflector: Reflector,
contracts: Vec<(String, String)>,
call_timeout: std::time::Duration,
) -> Result<Self, Error> {

let provider = Provider::try_from(rpc_url).expect("Invalid RPC URL");
let (termination_send, termination_recv) = unbounded::<()>();
let (shutdown_send, shutdown_recv) = unbounded::<()>();

let parsed_contracts=contracts.iter().map(|(identifier, address)| {
(identifier.clone(), Address::from_str(address).expect("Invalid contract address specified"))
}).collect();
let parsed_contracts = contracts
.iter()
.map(|(identifier, address)| {
(
identifier.clone(),
Address::from_str(address).expect("Invalid contract address specified"),
)
})
.collect();

Ok(Rustlink {
configuration: Configuration {
fetch_interval_seconds,
provider,
contracts:parsed_contracts,
contracts: parsed_contracts,
call_timeout,
},
reflector,
termination_send,
Expand All @@ -130,7 +142,7 @@ impl Rustlink {
}

/// Starts the Rustlink instance.
/// This method will start fetching the latest price data from the Chainlink decentralized data feed.
/// This method will start fetching the latest price data from the Chainlink decentralized data feed.
pub fn start(&self) {
#[cfg(not(target_arch = "wasm32"))]
tokio::task::spawn(fetch_rounds(self.clone()));
Expand Down Expand Up @@ -227,17 +239,21 @@ impl RustlinkJS {
contracts: Contracts,
callback: Function,
) -> Self {


// Cast `JsValue` to `Function`

let contracts: Vec<(String, String)> = from_value(contracts.into()).unwrap();

let (sender, receiver) = async_std::channel::unbounded();
let reflector = Reflector::Sender(sender);
let rustlink = Rustlink::try_new(rpc_url, fetch_interval_seconds, reflector, contracts)
.map_err(|e| JsValue::from_str(&format!("{}", e)))
.unwrap();
let rustlink = Rustlink::try_new(
rpc_url,
fetch_interval_seconds,
reflector,
contracts,
std::time::Duration::from_secs(10),
)
.map_err(|e| JsValue::from_str(&format!("{}", e)))
.unwrap();

RustlinkJS {
rustlink,
Expand Down
20 changes: 13 additions & 7 deletions src/fetcher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
use std::time::Duration;

use async_std::stream::StreamExt;
use ethers::abi::AbiError;
use ethers::providers::{Http, Provider};
use ethers::types::Address;
use futures::{select, FutureExt};

use super::interface::{ChainlinkContract, Round};
use crate::core::Reflector::Sender;
use crate::core::{Configuration, Rustlink};
use crate::interface::ContractCallError;

/// Retrieves the price of an underlying asset from a particular contract
async fn fetch_round_data_for_contract(
rustlink_configuration: &Configuration,
identifier: &str,
async fn fetch_round_data_for_contract<'a>(
rustlink_configuration: &'a Configuration,
identifier: &'a str,
address: Address,
) -> Result<Round, AbiError> {
let contract =
ChainlinkContract::new(&rustlink_configuration.provider, identifier, address).await?;
) -> Result<Round, ContractCallError<&'a Provider<Http>>> {
let contract = ChainlinkContract::new(
&rustlink_configuration.provider,
identifier,
address,
rustlink_configuration.call_timeout,
)
.await?;
contract.latest_round_data().await
}

Expand Down
83 changes: 59 additions & 24 deletions src/interface/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
use std::sync::Arc;
use async_std::future::{timeout, TimeoutError};
use ethers::{
abi::{Abi, AbiError},
contract::{Contract, ContractError},
providers::{Http, Middleware, Provider},
types::{Address, U256},
};
use serde::{Deserialize, Serialize};
use ethers::{abi::{Abi, AbiError}, contract::Contract, providers::{Http, Provider}, types::{Address, U256}};

use std::{sync::Arc, time::Duration};
use thiserror::Error;

#[derive(Clone)]
pub struct ChainlinkContract<'a> {
pub contract: Contract<&'a Provider<Http>>,
pub identifier: &'a str,
pub decimals: u8,
pub call_timeout: Duration,
}

#[derive(Error, Debug)]
pub enum ContractCallError<T: Middleware> {
#[error("Abi error: {0}")]
Abi(#[from] AbiError),
#[error("Timeout error: {0}")]
Timeout(#[from] TimeoutError),
#[error("Contract error: {0}")]
Contract(#[from] ContractError<T>),
}

/// The latest price received for this symbol.
/// This data is directly retrieved from the underlying contract.
Expand All @@ -29,41 +45,56 @@ pub struct Round {
pub answer: f64,
}

/// Type alias for the raw round call to the contract
pub type RoundCall<'a> = Result<(u128, u128, U256, U256, u128), ContractError<&'a Provider<Http>>>;

#[allow(clippy::redundant_allocation)]
async fn decimals<'a>(
contract: &ethers::contract::ContractInstance<Arc<&'a Provider<Http>>, &'a Provider<Http>>,
) -> Result<u8, ContractError<&'a Provider<Http>>> {
Ok(contract
.method::<_, U256>("decimals", ())
.unwrap()
.call()
.await?
.as_u64() as u8)
}

impl<'a> ChainlinkContract<'a> {
/// Creates a new instance of a chainlink price aggregator. This is just a wrapper
/// function to simplify the interactions with the contract.
pub async fn new(
provider: &'a Provider<Http>,
identifier: &'a str,
contract_address: Address,
) -> Result<ChainlinkContract<'a>, AbiError> {
let abi:Abi=serde_json::from_str(include_str!("IAggregatorV3Interface.json")).unwrap();
let contract = Contract::new(contract_address, abi, Arc::new(provider));
call_timeout: Duration,
) -> Result<ChainlinkContract<'a>, ContractCallError<&'a Provider<Http>>> {
let abi: Abi = serde_json::from_str(include_str!("IAggregatorV3Interface.json")).unwrap();
let contract: ethers::contract::ContractInstance<Arc<&Provider<Http>>, &Provider<Http>> =
Contract::new(contract_address, abi, Arc::new(provider));

let decimals=contract.method::<_,U256>("decimals", ()).unwrap()
.call().await.unwrap().as_u64() as u8;
let decimals = timeout(call_timeout, decimals(&contract)).await??;

Ok(ChainlinkContract {
contract,
decimals,
identifier,
call_timeout,
})
}

/// Wrapper function to call the latestRoundData method on the contract
async fn round_data(&self) -> RoundCall<'a> {
let round_call: RoundCall = self.contract.method("latestRoundData", ())?.call().await;
round_call
}

/// Retrieves the latest price of this underlying asset
/// from the chainlink decentralized data feed
pub async fn latest_round_data(&self) -> Result<Round, AbiError> {
let (round_id, answer, started_at, updated_at, answered_in_round): (
u128,
u128,
U256,
U256,
u128,
) = self
.contract
.method("latestRoundData", ())?
.call()
.await.unwrap();
pub async fn latest_round_data(&self) -> Result<Round, ContractCallError<&'a Provider<Http>>> {
// Call the contract, but timeout after 10 seconds
let (round_id, answer, started_at, updated_at, answered_in_round) =
timeout(self.call_timeout, self.round_data()).await??;

// Convert the answer on contract to a string.
let float_answer: f64 = answer.to_string().parse().unwrap();
Expand All @@ -85,18 +116,22 @@ impl<'a> ChainlinkContract<'a> {
#[cfg(test)]
mod tests {

use ethers::{abi::Address, providers::Provider};
use std::time::Duration;

use crate::interface::ChainlinkContract;
use ethers::{abi::Address, providers::Provider};

#[tokio::test]
async fn valid_answer() {

let provider=Provider::try_from("https://bsc-dataseed1.binance.org/").unwrap();
let provider = Provider::try_from("https://bsc-dataseed1.binance.org/").unwrap();

let chainlink_contract = ChainlinkContract::new(
&provider,
"ETH",
"0x9ef1B8c0E4F7dc8bF5719Ea496883DC6401d5b2e".parse::<Address>().unwrap(),
"0x9ef1B8c0E4F7dc8bF5719Ea496883DC6401d5b2e"
.parse::<Address>()
.unwrap(),
Duration::from_secs(10),
)
.await
.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
/// This library provides a simple interface to fetch price data from the Chainlink decentralized data feed.
/// Core is the main module that contains the main struct `Rustlink` that you will need to interact with.
pub mod core;
mod interface;
mod error;
mod fetcher;
mod interface;
#[cfg(test)]
mod tests {

Expand All @@ -27,6 +27,7 @@ mod tests {
1,
Reflector::Sender(sender),
contracts,
std::time::Duration::from_secs(3),
)
.unwrap();

Expand Down