diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ec3409b42..26539630d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,10 @@ git # Madara Changelog ## Next release +- fix: get_events paging with continuation_token - fux(getStorageAt): #28 - fix(genesis): #107 + - fix(class): #32 #33 #34 - fix(class): #116 - feat(class): download classes from sequencer diff --git a/crates/client/rpc/src/events/mod.rs b/crates/client/rpc/src/events/mod.rs index 94d9a44b0b..1779eeb8f3 100644 --- a/crates/client/rpc/src/events/mod.rs +++ b/crates/client/rpc/src/events/mod.rs @@ -1,9 +1,6 @@ #[cfg(test)] mod tests; -use std::iter::Skip; -use std::vec::IntoIter; - use jsonrpsee::core::RpcResult; use log::error; use mc_rpc_core::utils::get_block_by_block_hash; @@ -49,62 +46,49 @@ where StarknetRpcApiError::BlockNotFound })?; - let block_extrinsics = self - .client - .block_body(substrate_block_hash) - .map_err(|e| { - error!("Failed to retrieve block body. Substrate block hash: {substrate_block_hash}, error: {e}"); - StarknetRpcApiError::InternalServerError - })? - .ok_or(StarknetRpcApiError::BlockNotFound)?; - - let chain_id = self.client.runtime_api().chain_id(substrate_block_hash).map_err(|_| { - error!("Failed to retrieve chain id"); - StarknetRpcApiError::InternalServerError + let starknet_block = get_block_by_block_hash(self.client.as_ref(), substrate_block_hash).map_err(|e| { + error!("'{e}'"); + StarknetRpcApiError::BlockNotFound })?; - let index_and_events = - self.client.runtime_api().get_starknet_events_and_their_associated_tx_index(substrate_block_hash).map_err( - |e| { - error!( - "Failed to retrieve starknet events and their associated transaction index. Substrate block \ - hash: {substrate_block_hash}, chain ID: {chain_id:?}, error: {e}" - ); - StarknetRpcApiError::InternalServerError - }, - )?; + let block_hash = starknet_block.header().hash::(); - let starknet_block = get_block_by_block_hash(self.client.as_ref(), substrate_block_hash).map_err(|e| { - error!("Failed to retrieve starknet block from substrate block hash: error: {e}"); + let chain_id = self.client.runtime_api().chain_id(substrate_block_hash).map_err(|_| { + error!("Failed to retrieve chain id"); StarknetRpcApiError::InternalServerError })?; - let txn_hashes = self.get_cached_transaction_hashes(starknet_block.header().hash::().into()); - let block_extrinsics_len = block_extrinsics.len(); - let starknet_txs = - self.client.runtime_api().extrinsic_filter(substrate_block_hash, block_extrinsics).map_err(|e| { - error!("Failed to filter extrinsics. Substrate block hash: {substrate_block_hash}, error: {e}"); - StarknetRpcApiError::InternalServerError - })?; - let inherent_count = block_extrinsics_len - starknet_txs.len(); - let mut tx_hash_and_events: Vec<(Felt252Wrapper, starknet_api::transaction::Event)> = vec![]; - for (index, event) in index_and_events { - let tx_index = index as usize - inherent_count; - let tx_hash = self.try_txn_hash_from_cache( - tx_index, - &txn_hashes, - &starknet_txs, - chain_id, - Some(starknet_block.header().block_number), - )?; - tx_hash_and_events.push((tx_hash, event)); - } - let starknet_block = match get_block_by_block_hash(self.client.as_ref(), substrate_block_hash) { - Ok(block) => block, - Err(_) => return Err(StarknetRpcApiError::BlockNotFound), + // get txs hashes from cache or compute them + let block_txs_hashes: Vec<_> = if let Some(tx_hashes) = self.get_cached_transaction_hashes(block_hash.into()) { + tx_hashes + .into_iter() + .map(|h| { + Felt252Wrapper::try_from(h) + .map(|f| f.0) + .map_err(|e| { + error!("'{e}'"); + StarknetRpcApiError::InternalServerError + }) + .unwrap() + }) + .collect() + } else { + starknet_block + .transactions_hashes::(chain_id.into(), Some(starknet_block.header().block_number)) + .map(FieldElement::from) + .collect() }; - let block_hash = starknet_block.header().hash::(); + // get txs hashes and events from block + // the txs hashes are found by the index of the ordered event + let tx_hash_and_events: Vec<(Felt252Wrapper, _)> = starknet_block + .events() + .into_iter() + .flat_map(|ordered_event| { + let tx_hash = block_txs_hashes[ordered_event.index() as usize]; + ordered_event.events().into_iter().map(move |events| (tx_hash.into(), events.clone())) + }) + .collect(); let emitted_events = tx_hash_and_events .into_iter() @@ -134,52 +118,43 @@ where // get filter values let continuation_token = filter.continuation_token; // skip blocks with continuation token block number - let from_block = filter.from_block + continuation_token.block_n; + let from_block = continuation_token.block_n; let mut current_block = from_block; let to_block = filter.to_block; let from_address = filter.from_address; let keys = filter.keys; let chunk_size = filter.chunk_size; - let mut filtered_events = Vec::new(); + let mut filtered_events: Vec = Vec::new(); // Iterate on block range while current_block <= to_block { let emitted_events = self.get_block_events(current_block)?; - let mut unchecked_events = emitted_events.len(); - let events = if current_block == from_block { - // check if continuation_token.event_n is not too big - if (unchecked_events as u64) < continuation_token.event_n { - return Err(StarknetRpcApiError::InvalidContinuationToken.into()); - } - unchecked_events -= continuation_token.event_n as usize; - emitted_events.into_iter().skip(continuation_token.event_n as usize) - } else { - #[allow(clippy::iter_skip_zero)] - emitted_events.into_iter().skip(0) - }; - - let mut n_visited = 0; - let block_filtered_events = filter_events_by_params( - events, - from_address, - &keys, - chunk_size as usize - filtered_events.len(), - &mut n_visited, - ); - - filtered_events.extend(block_filtered_events); + + let block_filtered_events: Vec = filter_events_by_params(emitted_events, from_address, &keys); + + if current_block == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n { + return Err(StarknetRpcApiError::InvalidContinuationToken.into()); + } + + #[allow(clippy::iter_skip_zero)] + let block_filtered_reduced_events: Vec = block_filtered_events + .into_iter() + .skip(if current_block == from_block { continuation_token.event_n as usize } else { 0 }) + .take(chunk_size as usize - filtered_events.len()) + .collect(); + + let num_events = block_filtered_reduced_events.len(); + + filtered_events.extend(block_filtered_reduced_events); if filtered_events.len() == chunk_size as usize { - let token = if current_block < to_block || n_visited < unchecked_events { - let mut event_n = n_visited as u64; - if continuation_token.block_n == current_block { - event_n += continuation_token.event_n; - } - Some(ContinuationToken { block_n: current_block - from_block, event_n }.to_string()) + let event_n = if current_block == from_block { + continuation_token.event_n + chunk_size } else { - None + num_events as u64 }; + let token = Some(ContinuationToken { block_n: current_block, event_n }.to_string()); return Ok(EventsPage { events: filtered_events, continuation_token: token }); } @@ -205,17 +180,14 @@ where /// * `(block_events: Vec, continuation_token: usize)` - A tuple of the filtered /// events and the first index which still hasn't been processed block_id and an instance of Block pub fn filter_events_by_params<'a, 'b: 'a>( - events: Skip>, + events: Vec, address: Option, keys: &'a [Vec], - max_results: usize, - n_visited: &'b mut usize, ) -> Vec { let mut filtered_events = vec![]; // Iterate on block events. for event in events { - *n_visited += 1; let match_from_address = address.map_or(true, |addr| addr.0 == event.from_address); // Based on https://github.com/starkware-libs/papyrus let match_keys = keys @@ -225,9 +197,6 @@ pub fn filter_events_by_params<'a, 'b: 'a>( if match_from_address && match_keys { filtered_events.push(event); - if filtered_events.len() >= max_results { - break; - } } } filtered_events diff --git a/crates/client/rpc/src/lib.rs b/crates/client/rpc/src/lib.rs index 00c7664462..3c4e790ce5 100644 --- a/crates/client/rpc/src/lib.rs +++ b/crates/client/rpc/src/lib.rs @@ -1304,13 +1304,6 @@ where /// errors, such as `PAGE_SIZE_TOO_BIG`, `INVALID_CONTINUATION_TOKEN`, `BLOCK_NOT_FOUND`, or /// `TOO_MANY_KEYS_IN_FILTER`, returns a `StarknetRpcApiError` indicating the specific issue. async fn get_events(&self, filter: EventFilterWithPage) -> RpcResult { - let continuation_token = match filter.result_page_request.continuation_token { - Some(token) => types::ContinuationToken::parse(token).map_err(|e| { - error!("Failed to parse continuation token: {:?}", e); - StarknetRpcApiError::InvalidContinuationToken - })?, - None => types::ContinuationToken::default(), - }; let from_address = filter.event_filter.address.map(Felt252Wrapper::from); let keys = filter.event_filter.keys.unwrap_or_default(); let chunk_size = filter.result_page_request.chunk_size; @@ -1343,6 +1336,14 @@ where StarknetRpcApiError::BlockNotFound })?; + let continuation_token = match filter.result_page_request.continuation_token { + Some(token) => types::ContinuationToken::parse(token).map_err(|e| { + error!("Failed to parse continuation token: {:?}", e); + StarknetRpcApiError::InvalidContinuationToken + })?, + None => types::ContinuationToken { block_n: from_block.into(), event_n: 0 }, + }; + // Verify that the requested range is valid if from_block > to_block { return Ok(EventsPage { events: vec![], continuation_token: None }); diff --git a/crates/client/rpc/src/types.rs b/crates/client/rpc/src/types.rs index c098eb4360..2a21022410 100644 --- a/crates/client/rpc/src/types.rs +++ b/crates/client/rpc/src/types.rs @@ -27,18 +27,18 @@ pub enum ParseTokenError { impl fmt::Display for ContinuationToken { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{:x},{:x}", self.block_n, self.event_n) + write!(f, "{}-{}", self.block_n, self.event_n) } } impl ContinuationToken { pub fn parse(token: String) -> Result { - let arr: Vec<&str> = token.split(',').collect(); + let arr: Vec<&str> = token.split('-').collect(); if arr.len() != 2 { return Err(ParseTokenError::WrongToken); } - let block_n = u64::from_str_radix(arr[0], 16).map_err(ParseTokenError::ParseFailed)?; - let event_n = u64::from_str_radix(arr[1], 16).map_err(ParseTokenError::ParseFailed)?; + let block_n = u64::from_str_radix(arr[0], 10).map_err(ParseTokenError::ParseFailed)?; + let event_n = u64::from_str_radix(arr[1], 10).map_err(ParseTokenError::ParseFailed)?; Ok(ContinuationToken { block_n, event_n }) }