From 2daca057a02aa201e34e424d04d7fd9df10fb468 Mon Sep 17 00:00:00 2001 From: jb caron Date: Mon, 29 Jan 2024 18:06:55 +0100 Subject: [PATCH 1/3] fix: :bug: use block storage events remove runtime_api() in get_events and use the events storage --- crates/client/rpc/src/events/mod.rs | 56 +++++++++++++++++------------ 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/crates/client/rpc/src/events/mod.rs b/crates/client/rpc/src/events/mod.rs index a825887439..588c6c608e 100644 --- a/crates/client/rpc/src/events/mod.rs +++ b/crates/client/rpc/src/events/mod.rs @@ -49,35 +49,47 @@ where StarknetRpcApiError::BlockNotFound })?; - let block_extrinsics = self - .client - .block_body(substrate_block_hash) - .map_err(|e| { - error!("Failed to retrieve block body. Substrate block hash: {substrate_block_hash}, error: {e}"); - StarknetRpcApiError::InternalServerError - })? + let starknet_block = get_block_by_block_hash(self.client.as_ref(), substrate_block_hash) .ok_or(StarknetRpcApiError::BlockNotFound)?; + let block_hash = starknet_block.header().hash::(); + let chain_id = self.client.runtime_api().chain_id(substrate_block_hash).map_err(|_| { error!("Failed to retrieve chain id"); StarknetRpcApiError::InternalServerError })?; - let tx_hash_and_events = self - .client - .runtime_api() - .get_starknet_events_and_their_associated_tx_hash(substrate_block_hash, block_extrinsics, chain_id) - .map_err(|e| { - error!( - "Failed to retrieve starknet events and their associated transaction hash. Substrate block hash: \ - {substrate_block_hash}, chain ID: {chain_id:?}, error: {e}" - ); - StarknetRpcApiError::InternalServerError - })?; - - let starknet_block = get_block_by_block_hash(self.client.as_ref(), substrate_block_hash) - .ok_or(StarknetRpcApiError::BlockNotFound)?; - let block_hash = starknet_block.header().hash::(); + // get txs hashes from cache or compute them + let block_txs_hashes: Vec<_> = if let Some(tx_hashes) = self.get_cached_transaction_hashes(block_hash.into()) { + tx_hashes + .into_iter() + .map(|h| { + Felt252Wrapper::try_from(h) + .map(|f| f.0) + .map_err(|e| { + error!("'{e}'"); + StarknetRpcApiError::InternalServerError + }) + .unwrap() + }) + .collect() + } else { + starknet_block + .transactions_hashes::(chain_id.into(), Some(starknet_block.header().block_number)) + .map(FieldElement::from) + .collect() + }; + + // get txs hashes and events from block + // the txs hashes are found by the index of the ordered event + let tx_hash_and_events: Vec<(Felt252Wrapper, _)> = starknet_block + .events() + .into_iter() + .flat_map(|ordered_event| { + let tx_hash = block_txs_hashes[ordered_event.index() as usize]; + ordered_event.events().into_iter().map(move |events| (tx_hash.into(), events.clone())) + }) + .collect(); let emitted_events = tx_hash_and_events .into_iter() From a6f9b113a31d605f037d9d569ad6c665b5681702 Mon Sep 17 00:00:00 2001 From: jb caron Date: Fri, 9 Feb 2024 20:32:57 +0100 Subject: [PATCH 2/3] fix: :bug: fix pagination change format of comtinuation_token --- .devcontainer/devcontainer.json | 48 +++++++++---------- crates/client/rpc/src/events/mod.rs | 74 +++++++++++++---------------- crates/client/rpc/src/lib.rs | 15 +++--- crates/client/rpc/src/types.rs | 8 ++-- 4 files changed, 68 insertions(+), 77 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 5be3e5c489..f958ebbd46 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,28 +1,28 @@ { - "build": { - "dockerfile": "Dockerfile" - }, + "build": { + "dockerfile": "Dockerfile" + }, - // make sure to update `.devcontainer/Dockerfile` with you uid !!! - "remoteUser": "vscode", + // make sure to update `.devcontainer/Dockerfile` with you uid !!! + "remoteUser": "vscode", - "customizations": { - "vscode": { - "settings": { - "terminal.integrated.defaultProfile.linux": "zsh", - "terminal.integrated.profiles.linux": { "zsh": { "path": "/bin/zsh" } } - }, - "extensions": [ - "rust-lang.rust-analyzer", - "1YiB.rust-bundle", - "tamasfe.even-better-toml", - "serayuzgur.crates", - "vivaxy.vscode-conventional-commits", - "streetsidesoftware.code-spell-checker" - ] - } - }, + "customizations": { + "vscode": { + "settings": { + "terminal.integrated.defaultProfile.linux": "zsh", + "terminal.integrated.profiles.linux": { "zsh": { "path": "/bin/zsh" } } + }, + "extensions": [ + "rust-lang.rust-analyzer", + "1YiB.rust-bundle", + "tamasfe.even-better-toml", + "serayuzgur.crates", + "vivaxy.vscode-conventional-commits", + "streetsidesoftware.code-spell-checker" + ] + } + }, - "postStartCommand": "git config --global --add safe.directory ${containerWorkspaceFolder}", - "postAttachCommand": "${containerWorkspaceFolder}/.devcontainer/setup.sh" -} \ No newline at end of file + "postStartCommand": "git config --global --add safe.directory ${containerWorkspaceFolder}", + "postAttachCommand": "${containerWorkspaceFolder}/.devcontainer/setup.sh" +} diff --git a/crates/client/rpc/src/events/mod.rs b/crates/client/rpc/src/events/mod.rs index 588c6c608e..c7d9ca9dde 100644 --- a/crates/client/rpc/src/events/mod.rs +++ b/crates/client/rpc/src/events/mod.rs @@ -1,9 +1,6 @@ #[cfg(test)] mod tests; -use std::iter::Skip; -use std::vec::IntoIter; - use jsonrpsee::core::RpcResult; use log::error; use mc_rpc_core::utils::get_block_by_block_hash; @@ -119,52 +116,51 @@ where // get filter values let continuation_token = filter.continuation_token; // skip blocks with continuation token block number - let from_block = filter.from_block + continuation_token.block_n; + let from_block = continuation_token.block_n; let mut current_block = from_block; let to_block = filter.to_block; let from_address = filter.from_address; let keys = filter.keys; let chunk_size = filter.chunk_size; - let mut filtered_events = Vec::new(); + let mut filtered_events: Vec = Vec::new(); + + println!("Filtering events from block {} to block {}", from_block, to_block); + println!("continuation_token: {:?}", continuation_token); // Iterate on block range while current_block <= to_block { let emitted_events = self.get_block_events(current_block)?; - let mut unchecked_events = emitted_events.len(); - let events = if current_block == from_block { - // check if continuation_token.event_n is not too big - if (unchecked_events as u64) < continuation_token.event_n { - return Err(StarknetRpcApiError::InvalidContinuationToken.into()); - } - unchecked_events -= continuation_token.event_n as usize; - emitted_events.into_iter().skip(continuation_token.event_n as usize) - } else { - #[allow(clippy::iter_skip_zero)] - emitted_events.into_iter().skip(0) - }; - - let mut n_visited = 0; - let block_filtered_events = filter_events_by_params( - events, - from_address, - &keys, - chunk_size as usize - filtered_events.len(), - &mut n_visited, - ); - - filtered_events.extend(block_filtered_events); + + let block_filtered_events: Vec = filter_events_by_params(emitted_events, from_address, &keys); + + if current_block == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n { + eprintln!( + "Invalid continuation token: block_filtered_events len = {}, continuation_token.event_n = {}", + block_filtered_events.len(), + continuation_token.event_n + ); + return Err(StarknetRpcApiError::InvalidContinuationToken.into()); + } + + #[allow(clippy::iter_skip_zero)] + let block_filtered_reduced_events: Vec = block_filtered_events + .into_iter() + .skip(if current_block == from_block { continuation_token.event_n as usize } else { 0 }) + .take(chunk_size as usize - filtered_events.len()) + .collect(); + + let num_events = block_filtered_reduced_events.len(); + + filtered_events.extend(block_filtered_reduced_events); if filtered_events.len() == chunk_size as usize { - let token = if current_block < to_block || n_visited < unchecked_events { - let mut event_n = n_visited as u64; - if continuation_token.block_n == current_block { - event_n += continuation_token.event_n; - } - Some(ContinuationToken { block_n: current_block - from_block, event_n }.to_string()) + let event_n = if current_block == from_block { + continuation_token.event_n + chunk_size } else { - None + num_events as u64 }; + let token = Some(ContinuationToken { block_n: current_block, event_n }.to_string()); return Ok(EventsPage { events: filtered_events, continuation_token: token }); } @@ -190,17 +186,14 @@ where /// * `(block_events: Vec, continuation_token: usize)` - A tuple of the filtered /// events and the first index which still hasn't been processed block_id and an instance of Block pub fn filter_events_by_params<'a, 'b: 'a>( - events: Skip>, + events: Vec, address: Option, keys: &'a [Vec], - max_results: usize, - n_visited: &'b mut usize, ) -> Vec { let mut filtered_events = vec![]; // Iterate on block events. for event in events { - *n_visited += 1; let match_from_address = address.map_or(true, |addr| addr.0 == event.from_address); // Based on https://github.com/starkware-libs/papyrus let match_keys = keys @@ -210,9 +203,6 @@ pub fn filter_events_by_params<'a, 'b: 'a>( if match_from_address && match_keys { filtered_events.push(event); - if filtered_events.len() >= max_results { - break; - } } } filtered_events diff --git a/crates/client/rpc/src/lib.rs b/crates/client/rpc/src/lib.rs index b8579d142a..e14105d0f4 100644 --- a/crates/client/rpc/src/lib.rs +++ b/crates/client/rpc/src/lib.rs @@ -1312,13 +1312,6 @@ where /// errors, such as `PAGE_SIZE_TOO_BIG`, `INVALID_CONTINUATION_TOKEN`, `BLOCK_NOT_FOUND`, or /// `TOO_MANY_KEYS_IN_FILTER`, returns a `StarknetRpcApiError` indicating the specific issue. async fn get_events(&self, filter: EventFilterWithPage) -> RpcResult { - let continuation_token = match filter.result_page_request.continuation_token { - Some(token) => types::ContinuationToken::parse(token).map_err(|e| { - error!("Failed to parse continuation token: {:?}", e); - StarknetRpcApiError::InvalidContinuationToken - })?, - None => types::ContinuationToken::default(), - }; let from_address = filter.event_filter.address.map(Felt252Wrapper::from); let keys = filter.event_filter.keys.unwrap_or_default(); let chunk_size = filter.result_page_request.chunk_size; @@ -1351,6 +1344,14 @@ where StarknetRpcApiError::BlockNotFound })?; + let continuation_token = match filter.result_page_request.continuation_token { + Some(token) => types::ContinuationToken::parse(token).map_err(|e| { + error!("Failed to parse continuation token: {:?}", e); + StarknetRpcApiError::InvalidContinuationToken + })?, + None => types::ContinuationToken { block_n: from_block.into(), event_n: 0 }, + }; + // Verify that the requested range is valid if from_block > to_block { return Ok(EventsPage { events: vec![], continuation_token: None }); diff --git a/crates/client/rpc/src/types.rs b/crates/client/rpc/src/types.rs index c098eb4360..2a21022410 100644 --- a/crates/client/rpc/src/types.rs +++ b/crates/client/rpc/src/types.rs @@ -27,18 +27,18 @@ pub enum ParseTokenError { impl fmt::Display for ContinuationToken { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{:x},{:x}", self.block_n, self.event_n) + write!(f, "{}-{}", self.block_n, self.event_n) } } impl ContinuationToken { pub fn parse(token: String) -> Result { - let arr: Vec<&str> = token.split(',').collect(); + let arr: Vec<&str> = token.split('-').collect(); if arr.len() != 2 { return Err(ParseTokenError::WrongToken); } - let block_n = u64::from_str_radix(arr[0], 16).map_err(ParseTokenError::ParseFailed)?; - let event_n = u64::from_str_radix(arr[1], 16).map_err(ParseTokenError::ParseFailed)?; + let block_n = u64::from_str_radix(arr[0], 10).map_err(ParseTokenError::ParseFailed)?; + let event_n = u64::from_str_radix(arr[1], 10).map_err(ParseTokenError::ParseFailed)?; Ok(ContinuationToken { block_n, event_n }) } From b2add9597cc311c7131dc0dd6645b77a055f1c2e Mon Sep 17 00:00:00 2001 From: jb caron Date: Fri, 9 Feb 2024 21:03:21 +0100 Subject: [PATCH 3/3] docs: :memo: update changelog fix merge --- CHANGELOG.md | 1 + crates/client/rpc/src/events/mod.rs | 14 ++++---------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 55d6b8e50b..d5b872701b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ git # Madara Changelog ## Next release +- fix: get_events paging with continuation_token - fix(class): #32 #33 #34 - fix(class): #116 - feat(class): download classes from sequencer diff --git a/crates/client/rpc/src/events/mod.rs b/crates/client/rpc/src/events/mod.rs index c7d9ca9dde..1779eeb8f3 100644 --- a/crates/client/rpc/src/events/mod.rs +++ b/crates/client/rpc/src/events/mod.rs @@ -46,8 +46,10 @@ where StarknetRpcApiError::BlockNotFound })?; - let starknet_block = get_block_by_block_hash(self.client.as_ref(), substrate_block_hash) - .ok_or(StarknetRpcApiError::BlockNotFound)?; + let starknet_block = get_block_by_block_hash(self.client.as_ref(), substrate_block_hash).map_err(|e| { + error!("'{e}'"); + StarknetRpcApiError::BlockNotFound + })?; let block_hash = starknet_block.header().hash::(); @@ -125,9 +127,6 @@ where let mut filtered_events: Vec = Vec::new(); - println!("Filtering events from block {} to block {}", from_block, to_block); - println!("continuation_token: {:?}", continuation_token); - // Iterate on block range while current_block <= to_block { let emitted_events = self.get_block_events(current_block)?; @@ -135,11 +134,6 @@ where let block_filtered_events: Vec = filter_events_by_params(emitted_events, from_address, &keys); if current_block == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n { - eprintln!( - "Invalid continuation token: block_filtered_events len = {}, continuation_token.event_n = {}", - block_filtered_events.len(), - continuation_token.event_n - ); return Err(StarknetRpcApiError::InvalidContinuationToken.into()); }