From 0244330b73624ff304340a6aceb6181a0e64b5ff Mon Sep 17 00:00:00 2001 From: Luis Herasme Date: Thu, 25 Jul 2024 18:23:21 -0400 Subject: [PATCH] feat: Add requests per second to the "config.json" --- ghost-crab-common/src/config.rs | 15 +++++++++++---- ghost-crab-macros/src/lib.rs | 19 +++++++++++++++---- ghost-crab/src/block_handler.rs | 1 + ghost-crab/src/cache/manager.rs | 9 +++++++-- ghost-crab/src/event_handler.rs | 1 + ghost-crab/src/indexer.rs | 14 +++++++++++--- 6 files changed, 46 insertions(+), 13 deletions(-) diff --git a/ghost-crab-common/src/config.rs b/ghost-crab-common/src/config.rs index d771be1..7c9f659 100644 --- a/ghost-crab-common/src/config.rs +++ b/ghost-crab-common/src/config.rs @@ -39,12 +39,19 @@ pub struct BlockHandler { pub step: u64, } +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct NetworkConfig { + pub rpc_url: String, + pub requests_per_second: u64, +} + #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Config { pub data_sources: HashMap, pub templates: HashMap, - pub networks: HashMap, + pub networks: HashMap, pub block_handlers: HashMap, } @@ -103,9 +110,9 @@ fn parse_config(config_string: &str) -> Result { fn replace_env_vars(config: &mut Config) -> Result<(), ConfigError> { for (_key, value) in &mut config.networks { - if value.starts_with('$') { - *value = env::var(&value[1..]) - .map_err(|_| ConfigError::EnvVarNotFound(value[1..].to_string()))?; + if value.rpc_url.starts_with('$') { + (*value).rpc_url = env::var(&value.rpc_url[1..]) + .map_err(|_| ConfigError::EnvVarNotFound(value.rpc_url[1..].to_string()))?; } } diff --git a/ghost-crab-macros/src/lib.rs b/ghost-crab-macros/src/lib.rs index 2ed446d..8ea03c6 100644 --- a/ghost-crab-macros/src/lib.rs +++ b/ghost-crab-macros/src/lib.rs @@ -27,8 +27,10 @@ pub fn block_handler(metadata: TokenStream, input: TokenStream) -> TokenStream { let config = config::load().unwrap(); let source = config.block_handlers.get(name).expect("Source not found."); - let rpc_url = config.networks.get(&source.network).expect("RPC url not found for network"); - let rpc_url = Literal::string(&rpc_url); + let network_config = + config.networks.get(&source.network).expect("RPC url not found for network"); + let rpc_url = Literal::string(&network_config.rpc_url); + let requests_per_second = Literal::u64_suffixed(network_config.requests_per_second); let step = Literal::u64_suffixed(source.step); let start_block = Literal::u64_suffixed(source.start_block); @@ -71,6 +73,10 @@ pub fn block_handler(metadata: TokenStream, input: TokenStream) -> TokenStream { String::from(#rpc_url) } + fn rate_limit(&self) -> u64 { + #requests_per_second + } + fn start_block(&self) -> u64 { #start_block } @@ -161,8 +167,9 @@ fn create_handler(metadata: TokenStream, input: TokenStream, is_template: bool) start_block = Literal::u64_suffixed(source.start_block); }; - let rpc_url = config.networks.get(&network).expect("RPC url not found for network"); - let rpc_url = Literal::string(&rpc_url); + let network_config = config.networks.get(&network).expect("RPC url not found for network"); + let rpc_url = Literal::string(&network_config.rpc_url); + let requests_per_second = Literal::u64_suffixed(network_config.requests_per_second); let abi = Literal::string(&abi); let network = Literal::string(&network); @@ -237,6 +244,10 @@ fn create_handler(metadata: TokenStream, input: TokenStream, is_template: bool) String::from(#rpc_url) } + fn rate_limit(&self) -> u64 { + #requests_per_second + } + fn execution_mode(&self) -> ExecutionMode { #execution_mode } diff --git a/ghost-crab/src/block_handler.rs b/ghost-crab/src/block_handler.rs index bf9c30f..4f952b8 100644 --- a/ghost-crab/src/block_handler.rs +++ b/ghost-crab/src/block_handler.rs @@ -31,6 +31,7 @@ pub trait BlockHandler { fn network(&self) -> String; fn rpc_url(&self) -> String; fn start_block(&self) -> u64; + fn rate_limit(&self) -> u64; fn execution_mode(&self) -> ExecutionMode; } diff --git a/ghost-crab/src/cache/manager.rs b/ghost-crab/src/cache/manager.rs index 45b934a..5432c47 100644 --- a/ghost-crab/src/cache/manager.rs +++ b/ghost-crab/src/cache/manager.rs @@ -23,14 +23,19 @@ impl RPCManager { RPCManager { rpcs: HashMap::new() } } - pub async fn get_or_create(&mut self, network: String, rpc_url: String) -> CacheProvider { + pub async fn get_or_create( + &mut self, + network: String, + rpc_url: String, + rate_limit: u64, + ) -> CacheProvider { if let Some(provider) = self.rpcs.get(&rpc_url) { return provider.clone(); } let cache = load_cache(&network).unwrap(); let cache_layer = CacheLayer::new(cache); - let rate_limit_layer = RateLimitLayer::new(10_000, Duration::from_secs(1)); + let rate_limit_layer = RateLimitLayer::new(rate_limit, Duration::from_secs(1)); let client = ClientBuilder::default() .layer(cache_layer) diff --git a/ghost-crab/src/event_handler.rs b/ghost-crab/src/event_handler.rs index cb02d6e..e17b061 100644 --- a/ghost-crab/src/event_handler.rs +++ b/ghost-crab/src/event_handler.rs @@ -29,6 +29,7 @@ pub trait EventHandler { fn address(&self) -> Address; fn network(&self) -> String; fn rpc_url(&self) -> String; + fn rate_limit(&self) -> u64; fn execution_mode(&self) -> ExecutionMode; fn event_signature(&self) -> String; } diff --git a/ghost-crab/src/indexer.rs b/ghost-crab/src/indexer.rs index 711a3d3..b4cadbe 100644 --- a/ghost-crab/src/indexer.rs +++ b/ghost-crab/src/indexer.rs @@ -48,7 +48,10 @@ impl Indexer { return; } - let provider = self.rpc_manager.get_or_create(handler.network(), handler.rpc_url()).await; + let provider = self + .rpc_manager + .get_or_create(handler.network(), handler.rpc_url(), handler.rate_limit()) + .await; self.handlers.push(ProcessEventsInput { start_block: handler.start_block(), @@ -61,7 +64,10 @@ impl Indexer { } pub async fn load_block_handler(&mut self, handler: BlockHandlerInstance) { - let provider = self.rpc_manager.get_or_create(handler.network(), handler.rpc_url()).await; + let provider = self + .rpc_manager + .get_or_create(handler.network(), handler.rpc_url(), handler.rate_limit()) + .await; self.block_handlers.push(ProcessBlocksInput { handler, @@ -91,7 +97,9 @@ impl Indexer { while let Some(template) = self.rx.recv().await { let network = template.handler.network(); let rpc_url = template.handler.rpc_url(); - let provider = self.rpc_manager.get_or_create(network, rpc_url).await; + let rate_limit = template.handler.rate_limit(); + + let provider = self.rpc_manager.get_or_create(network, rpc_url, rate_limit).await; let handler = ProcessEventsInput { start_block: template.start_block,