From 01d829ea24e6a47e84ada4fc06a1963db1f0a387 Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Tue, 5 Nov 2024 15:13:09 +0100 Subject: [PATCH 01/10] perf(getEvents): identified potential bottlenecks --- .../v0_7_1/methods/read/get_events.rs | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index 4c12b75f2..f7bb4d82f 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -38,6 +38,7 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta return Err(StarknetRpcApiError::PageSizeTooBig); } + // TODO: this function doesn't do much, should be hoisted out // Get the block numbers for the requested range let (from_block, to_block, latest_block) = block_range(starknet, filter.event_filter.from_block, filter.event_filter.to_block)?; @@ -47,30 +48,40 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta None => ContinuationToken { block_n: from_block, event_n: 0 }, }; + // PERF: this is minor but this could happen earlier // Verify that the requested range is valid if from_block > to_block { return Ok(EventsPage { events: vec![], continuation_token: None }); } let from_block = continuation_token.block_n; + // PERF: this should at least be pre-allocated to some sensible default let mut filtered_events: Vec = Vec::new(); + // PERF: we should truncate from_block to the creation block of the contract + // if it is less than that for current_block in from_block..=to_block { + // PERF: this check can probably be hoisted out of this loop let (_pending, block) = if current_block <= latest_block { + // PERF: This is probably the main bottleneck: we should be able to + // mitigate this by implementing a db iterator (false, starknet.get_block(&BlockId::Number(current_block))?) } else { (true, starknet.get_block(&BlockId::Tag(BlockTag::Pending))?) }; + // PERF: collection needs to be more efficient let block_filtered_events: Vec = get_block_events(starknet, &block) .into_iter() .filter(|event| event_match_filter(event, from_address, &keys)) .collect(); + // PERF: this condition needs to be moved out the loop as it needs to happen only once if current_block == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n { return Err(StarknetRpcApiError::InvalidContinuationToken); } + // PERF: same here, hoist this out of the loop #[allow(clippy::iter_skip_zero)] let block_filtered_reduced_events: Vec = block_filtered_events .into_iter() @@ -80,6 +91,8 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta let num_events = block_filtered_reduced_events.len(); + // PERF: any better way to do this? Pre-allocation should reduce some + // of the allocations already filtered_events.extend(block_filtered_reduced_events); if filtered_events.len() == chunk_size as usize { @@ -96,13 +109,17 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta #[inline] fn event_match_filter(event: &EmittedEvent, address: Option, keys: &[Vec]) -> bool { let match_from_address = address.map_or(true, |addr| addr == event.from_address); + // PERF: HOLY FUCK WE CHECK ALL EVENTS EVEN IF THEY COME FROM THE WRONG + // ADDRESS let match_keys = keys .iter() .enumerate() .all(|(i, keys)| event.keys.len() > i && (keys.is_empty() || keys.contains(&event.keys[i]))); + // PERF: this can be short-circuited match_from_address && match_keys } +// TODO: remove this function, this code can be dealt with manually fn block_range( starknet: &Starknet, from_block: Option, @@ -123,6 +140,8 @@ fn block_range( } fn get_block_events(_starknet: &Starknet, block: &MadaraMaybePendingBlock) -> Vec { + // PERF:: this check can probably be removed by handling pending blocks + // separatly let (block_hash, block_number) = match &block.info { MadaraMaybePendingBlockInfo::Pending(_) => (None, None), MadaraMaybePendingBlockInfo::NotPending(block) => (Some(block.block_hash), Some(block.header.block_number)), @@ -133,6 +152,8 @@ fn get_block_events(_starknet: &Starknet, block: &MadaraMaybePendingBlock) -> Ve receipt.events().iter().map(move |events| (tx_hash, events)) }); + // PERF: clone here is brutal, there must be a way to take ownership of this + // data tx_hash_and_events .map(|(transaction_hash, event)| EmittedEvent { from_address: event.from_address, From b84ff3368c5f03ecb1af707c970ae47df530f008 Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Tue, 5 Nov 2024 16:32:42 +0100 Subject: [PATCH 02/10] perf(getEvents): refactored `get_block_events` --- .../v0_7_1/methods/read/get_events.rs | 137 ++++++++---------- crates/primitives/receipt/src/lib.rs | 10 ++ 2 files changed, 74 insertions(+), 73 deletions(-) diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index f7bb4d82f..44dd0f4e0 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -38,46 +38,46 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta return Err(StarknetRpcApiError::PageSizeTooBig); } - // TODO: this function doesn't do much, should be hoisted out - // Get the block numbers for the requested range - let (from_block, to_block, latest_block) = - block_range(starknet, filter.event_filter.from_block, filter.event_filter.to_block)?; - - let continuation_token = match filter.result_page_request.continuation_token { - Some(token) => ContinuationToken::parse(token).map_err(|_| StarknetRpcApiError::InvalidContinuationToken)?, - None => ContinuationToken { block_n: from_block, event_n: 0 }, + let latest_block = starknet.get_block_n(&BlockId::Tag(BlockTag::Latest))?; + let from_block = match filter.event_filter.from_block { + Some(BlockId::Tag(BlockTag::Pending)) => latest_block + 1, + Some(block_id) => starknet.get_block_n(&block_id)?, + None => 0, + }; + let to_block = match filter.event_filter.to_block { + Some(BlockId::Tag(BlockTag::Pending)) => latest_block + 1, + Some(block_id) => starknet.get_block_n(&block_id)?, + None => latest_block, }; - // PERF: this is minor but this could happen earlier - // Verify that the requested range is valid if from_block > to_block { return Ok(EventsPage { events: vec![], continuation_token: None }); } + let continuation_token = match filter.result_page_request.continuation_token { + Some(token) => ContinuationToken::parse(token).map_err(|_| StarknetRpcApiError::InvalidContinuationToken)?, + None => ContinuationToken { block_n: from_block, event_n: 0 }, + }; let from_block = continuation_token.block_n; - // PERF: this should at least be pre-allocated to some sensible default - let mut filtered_events: Vec = Vec::new(); // PERF: we should truncate from_block to the creation block of the contract // if it is less than that - for current_block in from_block..=to_block { + let mut filtered_events: Vec = Vec::with_capacity(16); + for block_n in from_block..=to_block { // PERF: this check can probably be hoisted out of this loop - let (_pending, block) = if current_block <= latest_block { + let block = if block_n <= latest_block { // PERF: This is probably the main bottleneck: we should be able to // mitigate this by implementing a db iterator - (false, starknet.get_block(&BlockId::Number(current_block))?) + starknet.get_block(&BlockId::Number(block_n))? } else { - (true, starknet.get_block(&BlockId::Tag(BlockTag::Pending))?) + starknet.get_block(&BlockId::Tag(BlockTag::Pending))? }; // PERF: collection needs to be more efficient - let block_filtered_events: Vec = get_block_events(starknet, &block) - .into_iter() - .filter(|event| event_match_filter(event, from_address, &keys)) - .collect(); + let block_filtered_events: Vec = get_block_events(block, from_address, &keys); // PERF: this condition needs to be moved out the loop as it needs to happen only once - if current_block == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n { + if block_n == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n { return Err(StarknetRpcApiError::InvalidContinuationToken); } @@ -85,7 +85,7 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta #[allow(clippy::iter_skip_zero)] let block_filtered_reduced_events: Vec = block_filtered_events .into_iter() - .skip(if current_block == from_block { continuation_token.event_n as usize } else { 0 }) + .skip(if block_n == from_block { continuation_token.event_n as usize } else { 0 }) .take(chunk_size as usize - filtered_events.len()) .collect(); @@ -97,8 +97,8 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta if filtered_events.len() == chunk_size as usize { let event_n = - if current_block == from_block { continuation_token.event_n + chunk_size } else { num_events as u64 }; - let token = Some(ContinuationToken { block_n: current_block, event_n }.to_string()); + if block_n == from_block { continuation_token.event_n + chunk_size } else { num_events as u64 }; + let token = Some(ContinuationToken { block_n, event_n }.to_string()); return Ok(EventsPage { events: filtered_events, continuation_token: token }); } @@ -106,40 +106,11 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta Ok(EventsPage { events: filtered_events, continuation_token: None }) } -#[inline] -fn event_match_filter(event: &EmittedEvent, address: Option, keys: &[Vec]) -> bool { - let match_from_address = address.map_or(true, |addr| addr == event.from_address); - // PERF: HOLY FUCK WE CHECK ALL EVENTS EVEN IF THEY COME FROM THE WRONG - // ADDRESS - let match_keys = keys - .iter() - .enumerate() - .all(|(i, keys)| event.keys.len() > i && (keys.is_empty() || keys.contains(&event.keys[i]))); - // PERF: this can be short-circuited - match_from_address && match_keys -} - -// TODO: remove this function, this code can be dealt with manually -fn block_range( - starknet: &Starknet, - from_block: Option, - to_block: Option, -) -> StarknetRpcResult<(u64, u64, u64)> { - let latest_block_n = starknet.get_block_n(&BlockId::Tag(BlockTag::Latest))?; - let from_block_n = match from_block { - Some(BlockId::Tag(BlockTag::Pending)) => latest_block_n + 1, - Some(block_id) => starknet.get_block_n(&block_id)?, - None => 0, - }; - let to_block_n = match to_block { - Some(BlockId::Tag(BlockTag::Pending)) => latest_block_n + 1, - Some(block_id) => starknet.get_block_n(&block_id)?, - None => latest_block_n, - }; - Ok((from_block_n, to_block_n, latest_block_n)) -} - -fn get_block_events(_starknet: &Starknet, block: &MadaraMaybePendingBlock) -> Vec { +fn get_block_events( + block: MadaraMaybePendingBlock, + address: Option, + keys: &[Vec], +) -> Vec { // PERF:: this check can probably be removed by handling pending blocks // separatly let (block_hash, block_number) = match &block.info { @@ -147,21 +118,41 @@ fn get_block_events(_starknet: &Starknet, block: &MadaraMaybePendingBlock) -> Ve MadaraMaybePendingBlockInfo::NotPending(block) => (Some(block.block_hash), Some(block.header.block_number)), }; - let tx_hash_and_events = block.inner.receipts.iter().flat_map(|receipt| { - let tx_hash = receipt.transaction_hash(); - receipt.events().iter().map(move |events| (tx_hash, events)) - }); - - // PERF: clone here is brutal, there must be a way to take ownership of this - // data - tx_hash_and_events - .map(|(transaction_hash, event)| EmittedEvent { - from_address: event.from_address, - keys: event.keys.clone(), - data: event.data.clone(), - block_hash, - block_number, - transaction_hash, + block + .inner + .receipts + .into_iter() + .flat_map(move |receipt| { + let transaction_hash = receipt.transaction_hash(); + + receipt.events_owned().into_iter().filter_map(move |event| { + if address.is_some() && address.unwrap() != event.from_address { + return None; + } + + // Keys are matched as follows: + // + // - `keys` is an array of Felt + // - `keys[n]` represents allowed value for event key at index n + // - so `event.keys[n]` needs to match any value in `keys[n]` + let match_keys = keys + .iter() + .enumerate() + .all(|(i, keys)| event.keys.len() >= i && (keys.is_empty() || keys.contains(&event.keys[i]))); + + if !match_keys { + None + } else { + Some(EmittedEvent { + from_address: event.from_address, + keys: event.keys, + data: event.data, + block_hash, + block_number, + transaction_hash, + }) + } + }) }) .collect() } diff --git a/crates/primitives/receipt/src/lib.rs b/crates/primitives/receipt/src/lib.rs index 5d8bc8bdc..c9ea7f9dc 100644 --- a/crates/primitives/receipt/src/lib.rs +++ b/crates/primitives/receipt/src/lib.rs @@ -110,6 +110,16 @@ impl TransactionReceipt { } } + pub fn events_owned(self) -> Vec { + match self { + TransactionReceipt::Invoke(receipt) => receipt.events, + TransactionReceipt::L1Handler(receipt) => receipt.events, + TransactionReceipt::Declare(receipt) => receipt.events, + TransactionReceipt::Deploy(receipt) => receipt.events, + TransactionReceipt::DeployAccount(receipt) => receipt.events, + } + } + pub fn execution_result(&self) -> ExecutionResult { match self { TransactionReceipt::Invoke(receipt) => receipt.execution_result.clone(), From c3b7c44937711c496c1bb1540f28c67e54091a72 Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Tue, 5 Nov 2024 17:42:01 +0100 Subject: [PATCH 03/10] test(getEvents): started adding tests to `getEvents` --- .gitignore | 3 + .../v0_7_1/methods/read/get_events.rs | 147 ++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/.gitignore b/.gitignore index bf4f3d9f7..57bd87118 100644 --- a/.gitignore +++ b/.gitignore @@ -60,6 +60,9 @@ starknet-e2e-test/contracts/build # rpc output output* +# test output +test_output* + tmp/ *.info diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index 44dd0f4e0..a92fa8ac3 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -156,3 +156,150 @@ fn get_block_events( }) .collect() } + +#[cfg(test)] +mod test { + use jsonrpsee::http_client::HttpClientBuilder; + + use crate::{ + test_utils::rpc_test_setup, + versions::v0_7_1::{StarknetReadRpcApiV0_7_1Client, StarknetReadRpcApiV0_7_1Server}, + }; + + fn block_info(n: u64) -> mp_block::MadaraMaybePendingBlockInfo { + mp_block::MadaraMaybePendingBlockInfo::NotPending(mp_block::MadaraBlockInfo { + header: mp_block::Header { + parent_block_hash: starknet_core::types::Felt::from(n), + block_number: n, + ..Default::default() + }, + block_hash: starknet_core::types::Felt::from(n), + tx_hashes: vec![], + }) + } + + fn block_events(n: u64) -> Vec { + vec![ + mp_receipt::Event { + from_address: starknet_core::types::Felt::from(n), + keys: vec![ + starknet_core::types::Felt::ZERO, + starknet_core::types::Felt::ONE, + starknet_core::types::Felt::from(n), + ], + data: vec![], + }, + mp_receipt::Event { + from_address: starknet_core::types::Felt::from(n), + keys: vec![ + starknet_core::types::Felt::ZERO, + starknet_core::types::Felt::TWO, + starknet_core::types::Felt::from(n), + ], + data: vec![], + }, + mp_receipt::Event { from_address: starknet_core::types::Felt::from(n), keys: vec![], data: vec![] }, + ] + } + + fn block_inner(n: u64) -> mp_block::MadaraBlockInner { + mp_block::MadaraBlockInner { + transactions: vec![], + receipts: vec![ + mp_receipt::TransactionReceipt::Invoke(mp_receipt::InvokeTransactionReceipt { + events: block_events(n), + transaction_hash: starknet_core::types::Felt::from(n), + ..Default::default() + }), + mp_receipt::TransactionReceipt::Invoke(mp_receipt::InvokeTransactionReceipt { + events: block_events(n), + transaction_hash: starknet_core::types::Felt::from(n + 1), + ..Default::default() + }), + ], + } + } + + fn block_generator( + backend: &mc_db::MadaraBackend, + ) -> impl Iterator> + '_ { + (0..).map(|n| { + let info = block_info(n); + let inner = block_inner(n); + + backend + .store_block( + mp_block::MadaraMaybePendingBlock { info: info.clone(), inner: inner.clone() }, + mp_state_update::StateDiff::default(), + vec![], + ) + .expect("Storing block"); + + inner + .receipts + .into_iter() + .flat_map(move |receipt| { + let block_hash = info.block_hash(); + let block_number = info.block_n(); + let transaction_hash = receipt.transaction_hash(); + receipt.events_owned().into_iter().map(move |event| starknet_core::types::EmittedEvent { + from_address: event.from_address, + keys: event.keys, + data: event.data, + block_hash, + block_number, + transaction_hash, + }) + }) + .collect() + }) + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let expected = generator.next().expect("Retrieving event from backend"); + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: 10, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::open("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::open("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } +} From 41cfec4d9c61d0f415082b41e6bc7a8658875c9d Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Tue, 5 Nov 2024 18:10:04 +0100 Subject: [PATCH 04/10] test(getEvents): added simple key filtering tests --- crates/client/rpc/src/lib.rs | 2 + .../v0_7_1/methods/read/get_events.rs | 64 ++++++++++++++++++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/crates/client/rpc/src/lib.rs b/crates/client/rpc/src/lib.rs index b99d2b5cc..cb9e402af 100644 --- a/crates/client/rpc/src/lib.rs +++ b/crates/client/rpc/src/lib.rs @@ -2,6 +2,8 @@ //! //! It uses the madara client and backend in order to answer queries. +#![feature(iter_collect_into)] + mod constants; mod errors; pub mod providers; diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index a92fa8ac3..0c0103481 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -138,7 +138,7 @@ fn get_block_events( let match_keys = keys .iter() .enumerate() - .all(|(i, keys)| event.keys.len() >= i && (keys.is_empty() || keys.contains(&event.keys[i]))); + .all(|(i, keys)| event.keys.len() > i && (keys.is_empty() || keys.contains(&event.keys[i]))); if !match_keys { None @@ -287,11 +287,69 @@ mod test { .events; if events != expected { - let file_events = std::fs::File::open("./test_output_actual.json").expect("Opening file"); + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); let writter = std::io::BufWriter::new(file_events); serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); - let file_expected = std::fs::File::open("./test_output_events.json").expect("Opening file"); + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_with_keys(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let mut expected = Vec::default(); + + for _ in 0..3 { + generator + .next() + .expect("Retrieving event from backend") + .into_iter() + .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO) + .take(10 - expected.len()) + .collect_into(&mut expected); + } + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: Some(vec![vec![starknet_core::types::Felt::ZERO]]), + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: 10, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); From 9139b8b119cfa8334415ca405a11c4151b253ce3 Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Wed, 6 Nov 2024 09:06:55 +0100 Subject: [PATCH 05/10] test(getEvents): added some more key tests --- .../v0_7_1/methods/read/get_events.rs | 196 ++++++++++++++++++ 1 file changed, 196 insertions(+) diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index 0c0103481..ab7cf7bc1 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -360,4 +360,200 @@ mod test { ) } } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_with_keys_hard(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let mut expected = Vec::default(); + + for _ in 0..3 { + generator + .next() + .expect("Retrieving event from backend") + .into_iter() + .filter(|event| event.keys.len() > 1 && event.keys[1] == starknet_core::types::Felt::ONE) + .take(10 - expected.len()) + .collect_into(&mut expected); + } + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: Some(vec![vec![], vec![starknet_core::types::Felt::ONE]]), + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: 10, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_with_keys_single(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let mut expected = Vec::default(); + + for _ in 0..3 { + generator + .next() + .expect("Retrieving event from backend") + .into_iter() + .filter(|event| event.keys.len() > 2 && event.keys[2] == starknet_core::types::Felt::TWO) + .take(10 - expected.len()) + .collect_into(&mut expected); + } + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: Some(vec![vec![], vec![], vec![starknet_core::types::Felt::TWO]]), + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: 10, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_block_no(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (_, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + let expected = crate::StarknetRpcApiError::BlockNotFound; + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: 10, + }, + }) + .await + .err() + .expect("starknet_getEvents"); + + let jsonrpsee::core::client::Error::Call(error_object) = events else { + panic!("starknet_getEvents"); + }; + + assert_eq!(error_object.code(), Into::::into(&expected)); + assert_eq!(error_object.message(), expected.to_string()); + assert!(error_object.data().is_none()); + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_block_invalid(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let _ = generator.next().expect("Retrieving event from backend"); + + let expected = crate::StarknetRpcApiError::BlockNotFound; + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: Some(starknet_core::types::BlockId::Number(1)), + to_block: None, + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: 10, + }, + }) + .await + .err() + .expect("starknet_getEvents"); + + let jsonrpsee::core::client::Error::Call(error_object) = events else { + panic!("starknet_getEvents"); + }; + + assert_eq!(error_object.code(), Into::::into(&expected)); + assert_eq!(error_object.message(), expected.to_string()); + assert!(error_object.data().is_none()); + } } From c25e0477bf63fafee7bed8114edc010d2aa42073 Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Wed, 6 Nov 2024 10:22:49 +0100 Subject: [PATCH 06/10] test(getEvents): added more tests --- crates/client/rpc/src/constants.rs | 2 +- .../v0_7_1/methods/read/get_events.rs | 309 +++++++++++++++++- 2 files changed, 306 insertions(+), 5 deletions(-) diff --git a/crates/client/rpc/src/constants.rs b/crates/client/rpc/src/constants.rs index 5997a15cd..a4bc7700d 100644 --- a/crates/client/rpc/src/constants.rs +++ b/crates/client/rpc/src/constants.rs @@ -1,4 +1,4 @@ /// Maximum number of filter keys that can be passed to the `get_events` RPC. pub const MAX_EVENTS_KEYS: usize = 100; /// Maximum number of events that can be fetched in a single chunk for the `get_events` RPC. -pub const MAX_EVENTS_CHUNK_SIZE: usize = 1000; +pub const MAX_EVENTS_CHUNK_SIZE: u64 = 1000; diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index ab7cf7bc1..adfade002 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -34,22 +34,28 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta if keys.len() > MAX_EVENTS_KEYS { return Err(StarknetRpcApiError::TooManyKeysInFilter); } - if chunk_size > MAX_EVENTS_CHUNK_SIZE as u64 { + if chunk_size > MAX_EVENTS_CHUNK_SIZE { return Err(StarknetRpcApiError::PageSizeTooBig); } let latest_block = starknet.get_block_n(&BlockId::Tag(BlockTag::Latest))?; + let from_block = match filter.event_filter.from_block { Some(BlockId::Tag(BlockTag::Pending)) => latest_block + 1, Some(block_id) => starknet.get_block_n(&block_id)?, None => 0, }; + let to_block = match filter.event_filter.to_block { Some(BlockId::Tag(BlockTag::Pending)) => latest_block + 1, Some(block_id) => starknet.get_block_n(&block_id)?, None => latest_block, }; + if from_block > latest_block || to_block > latest_block { + return Err(StarknetRpcApiError::BlockNotFound); + } + if from_block > to_block { return Ok(EventsPage { events: vec![], continuation_token: None }); } @@ -479,7 +485,97 @@ mod test { #[tokio::test] #[rstest::rstest] - async fn get_events_block_no(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + async fn get_events_with_continuation_token( + rpc_test_setup: (std::sync::Arc, crate::Starknet), + ) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let mut expected = Vec::default(); + + for _ in 0..3 { + generator + .next() + .expect("Retrieving event from backend") + .into_iter() + .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO) + .collect_into(&mut expected); + } + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: Some(vec![vec![starknet_core::types::Felt::ZERO]]), + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: 10, + }, + }) + .await + .expect("starknet_getEvents"); + + if events.events != expected[..10] { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: Some(vec![vec![starknet_core::types::Felt::ZERO]]), + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: events.continuation_token, + chunk_size: 10, + }, + }) + .await + .expect("starknet_getEvents"); + + if events.events != expected[10..] { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_block_not_found(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { let (_, starknet) = rpc_test_setup; let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); @@ -517,7 +613,7 @@ mod test { #[tokio::test] #[rstest::rstest] - async fn get_events_block_invalid(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + async fn get_events_block_invalid_from(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { let (backend, starknet) = rpc_test_setup; let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); @@ -528,7 +624,6 @@ mod test { let mut generator = block_generator(&backend); let _ = generator.next().expect("Retrieving event from backend"); - let expected = crate::StarknetRpcApiError::BlockNotFound; let events = client @@ -556,4 +651,210 @@ mod test { assert_eq!(error_object.message(), expected.to_string()); assert!(error_object.data().is_none()); } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_block_invalid_to(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let _ = generator.next().expect("Retrieving event from backend"); + let expected = crate::StarknetRpcApiError::BlockNotFound; + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: Some(starknet_core::types::BlockId::Number(1)), + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: 10, + }, + }) + .await + .err() + .expect("starknet_getEvents"); + + let jsonrpsee::core::client::Error::Call(error_object) = events else { + panic!("starknet_getEvents"); + }; + + assert_eq!(error_object.code(), Into::::into(&expected)); + assert_eq!(error_object.message(), expected.to_string()); + assert!(error_object.data().is_none()); + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_page_size_too_big(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let _ = generator.next().expect("Retrieving event from backend"); + let expected = crate::StarknetRpcApiError::PageSizeTooBig; + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE + 1, + }, + }) + .await + .err() + .expect("starknet_getEvents"); + + let jsonrpsee::core::client::Error::Call(error_object) = events else { + panic!("starknet_getEvents"); + }; + + assert_eq!(error_object.code(), Into::::into(&expected)); + assert_eq!(error_object.message(), expected.to_string()); + assert!(error_object.data().is_none()); + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_invalid_continuation_token( + rpc_test_setup: (std::sync::Arc, crate::Starknet), + ) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let _ = generator.next().expect("Retrieving event from backend"); + let expected = crate::StarknetRpcApiError::InvalidContinuationToken; + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: Some("".to_string()), + chunk_size: 10, + }, + }) + .await + .err() + .expect("starknet_getEvents"); + + let jsonrpsee::core::client::Error::Call(error_object) = events else { + panic!("starknet_getEvents"); + }; + + assert_eq!(error_object.code(), Into::::into(&expected)); + assert_eq!(error_object.message(), expected.to_string()); + assert!(error_object.data().is_none()); + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_invalid_continuation_token2( + rpc_test_setup: (std::sync::Arc, crate::Starknet), + ) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = block_generator(&backend); + let _ = generator.next().expect("Retrieving event from backend"); + let expected = crate::StarknetRpcApiError::InvalidContinuationToken; + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: Some("0-100".to_string()), + chunk_size: 10, + }, + }) + .await + .err() + .expect("starknet_getEvents"); + + let jsonrpsee::core::client::Error::Call(error_object) = events else { + panic!("starknet_getEvents"); + }; + + assert_eq!(error_object.code(), Into::::into(&expected)); + assert_eq!(error_object.message(), expected.to_string()); + assert!(error_object.data().is_none()); + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_too_many_keys(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (_, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + let expected = crate::StarknetRpcApiError::TooManyKeysInFilter; + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: None, + keys: Some(vec![vec![]; crate::constants::MAX_EVENTS_KEYS + 1]), + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: 10, + }, + }) + .await + .err() + .expect("starknet_getEvents"); + + let jsonrpsee::core::client::Error::Call(error_object) = events else { + panic!("starknet_getEvents"); + }; + + assert_eq!(error_object.code(), Into::::into(&expected)); + assert_eq!(error_object.message(), expected.to_string()); + assert!(error_object.data().is_none()); + } } From 31d46792d2376c78f9b7ba3017cc999abe4d023f Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Wed, 6 Nov 2024 14:22:43 +0100 Subject: [PATCH 07/10] feat(db): added block stream iterator --- crates/client/db/src/block_db.rs | 27 ++++++- crates/client/rpc/src/lib.rs | 6 +- .../v0_7_1/methods/read/get_events.rs | 75 ++++++++++++++++--- crates/primitives/block/src/lib.rs | 2 +- 4 files changed, 96 insertions(+), 14 deletions(-) diff --git a/crates/client/db/src/block_db.rs b/crates/client/db/src/block_db.rs index 44d368140..9f2bbc8e9 100644 --- a/crates/client/db/src/block_db.rs +++ b/crates/client/db/src/block_db.rs @@ -8,7 +8,7 @@ use mp_block::{ MadaraMaybePendingBlockInfo, MadaraPendingBlock, MadaraPendingBlockInfo, }; use mp_state_update::StateDiff; -use rocksdb::WriteOptions; +use rocksdb::{Direction, IteratorMode, WriteOptions}; use starknet_api::core::ChainId; use starknet_types_core::felt::Felt; @@ -370,6 +370,31 @@ impl MadaraBackend { Ok(Some(MadaraMaybePendingBlock { info, inner })) } + #[tracing::instrument(skip(self), fields(module = "BlockDB"))] + pub fn get_block_stream(&self, block_n: usize) -> Result + '_> { + let handle_block_info = self.db.get_column(Column::BlockNToBlockInfo); + let handle_block_innr = self.db.get_column(Column::BlockNToBlockInner); + + let key = bincode::serialize(&block_n)?; + let iter_mode = IteratorMode::From(&key, Direction::Forward); + + let iter_block_info = self.db.iterator_cf(&handle_block_info, iter_mode.clone()); + let iter_block_innr = self.db.iterator_cf(&handle_block_innr, iter_mode); + + let iter = iter_block_info.zip(iter_block_innr).enumerate().map_while(move |(i, kvs)| { + if let (Ok((_, bytes_info)), Ok((_, bytes_innr))) = kvs { + let block_info = bincode::deserialize::(&bytes_info).ok(); + let block_innr = bincode::deserialize::(&bytes_innr).ok(); + + block_info.zip(block_innr).map(|(info, inner)| MadaraBlock { info, inner }) + } else { + None + } + }); + + Ok(iter) + } + // Tx hashes and tx status /// Returns the index of the tx. diff --git a/crates/client/rpc/src/lib.rs b/crates/client/rpc/src/lib.rs index cb9e402af..ab27bc936 100644 --- a/crates/client/rpc/src/lib.rs +++ b/crates/client/rpc/src/lib.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use mc_db::db_block_id::DbBlockIdResolvable; use mc_db::MadaraBackend; -use mp_block::{MadaraMaybePendingBlock, MadaraMaybePendingBlockInfo}; +use mp_block::{MadaraBlock, MadaraMaybePendingBlock, MadaraMaybePendingBlockInfo}; use mp_chain_config::{ChainConfig, RpcVersion}; use mp_convert::ToFelt; @@ -71,6 +71,10 @@ impl Starknet { .ok_or(StarknetRpcApiError::BlockNotFound) } + pub fn get_block_stream(&self, block_n: usize) -> StarknetRpcResult + '_> { + self.backend.get_block_stream(block_n).map_err(|_| StarknetRpcApiError::BlockNotFound) + } + pub fn chain_id(&self) -> Felt { self.backend.chain_config().chain_id.clone().to_felt() } diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index adfade002..c63c48390 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -226,7 +226,7 @@ mod test { } } - fn block_generator( + fn generator_events( backend: &mc_db::MadaraBackend, ) -> impl Iterator> + '_ { (0..).map(|n| { @@ -261,6 +261,59 @@ mod test { }) } + fn generator_blocks(backend: &mc_db::MadaraBackend) -> impl Iterator + '_ { + (0..).map(|n| { + let block = mp_block::MadaraMaybePendingBlock { info: block_info(n), inner: block_inner(n) }; + + backend.store_block(block.clone(), mp_state_update::StateDiff::default(), vec![]).expect("Storing block"); + + mp_block::MadaraBlock::try_from(block).unwrap() + }) + } + + #[tokio::test] + #[rstest::rstest] + async fn get_block_stream(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + + let generator = generator_blocks(&backend); + let expected = Vec::from_iter(generator.take(10)); + + let block_stream = starknet.get_block_stream(0).expect("Retrieving block stream"); + let blocks = block_stream.collect::>(); + + if blocks != expected { + let file_blocks = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_blocks); + serde_json::to_writer_pretty(writter, &blocks).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&blocks).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_block_stream2(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + + let block_stream = starknet.get_block_stream(0).expect("Retrieving block stream"); + + let generator = generator_blocks(&backend); + let _ = generator.take(10).last(); + + let blocks = block_stream.collect::>(); + + assert_eq!(blocks, []); + } + #[tokio::test] #[rstest::rstest] async fn get_events(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { @@ -272,7 +325,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let expected = generator.next().expect("Retrieving event from backend"); let events = client @@ -320,7 +373,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let mut expected = Vec::default(); for _ in 0..3 { @@ -378,7 +431,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let mut expected = Vec::default(); for _ in 0..3 { @@ -436,7 +489,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let mut expected = Vec::default(); for _ in 0..3 { @@ -496,7 +549,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let mut expected = Vec::default(); for _ in 0..3 { @@ -622,7 +675,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let _ = generator.next().expect("Retrieving event from backend"); let expected = crate::StarknetRpcApiError::BlockNotFound; @@ -663,7 +716,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let _ = generator.next().expect("Retrieving event from backend"); let expected = crate::StarknetRpcApiError::BlockNotFound; @@ -704,7 +757,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let _ = generator.next().expect("Retrieving event from backend"); let expected = crate::StarknetRpcApiError::PageSizeTooBig; @@ -747,7 +800,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let _ = generator.next().expect("Retrieving event from backend"); let expected = crate::StarknetRpcApiError::InvalidContinuationToken; @@ -790,7 +843,7 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = block_generator(&backend); + let mut generator = generator_events(&backend); let _ = generator.next().expect("Retrieving event from backend"); let expected = crate::StarknetRpcApiError::InvalidContinuationToken; diff --git a/crates/primitives/block/src/lib.rs b/crates/primitives/block/src/lib.rs index 354109bab..8d2fdb905 100644 --- a/crates/primitives/block/src/lib.rs +++ b/crates/primitives/block/src/lib.rs @@ -260,7 +260,7 @@ impl MadaraMaybePendingBlock { } /// Starknet block definition. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct MadaraBlock { pub info: MadaraBlockInfo, pub inner: MadaraBlockInner, From 7d3ba8c36ab05d54dcc20723cbb4d5066f9e61da Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Wed, 6 Nov 2024 15:08:56 +0100 Subject: [PATCH 08/10] test(getEvents): added more tests for pending blocks and `from_address` --- crates/client/db/src/block_db.rs | 2 +- .../v0_7_1/methods/read/get_events.rs | 326 ++++++++++++++++-- 2 files changed, 305 insertions(+), 23 deletions(-) diff --git a/crates/client/db/src/block_db.rs b/crates/client/db/src/block_db.rs index 9f2bbc8e9..913d9eaa6 100644 --- a/crates/client/db/src/block_db.rs +++ b/crates/client/db/src/block_db.rs @@ -381,7 +381,7 @@ impl MadaraBackend { let iter_block_info = self.db.iterator_cf(&handle_block_info, iter_mode.clone()); let iter_block_innr = self.db.iterator_cf(&handle_block_innr, iter_mode); - let iter = iter_block_info.zip(iter_block_innr).enumerate().map_while(move |(i, kvs)| { + let iter = iter_block_info.zip(iter_block_innr).map_while(move |kvs| { if let (Ok((_, bytes_info)), Ok((_, bytes_innr))) = kvs { let block_info = bincode::deserialize::(&bytes_info).ok(); let block_innr = bincode::deserialize::(&bytes_innr).ok(); diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index c63c48390..b38b47f20 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -42,20 +42,28 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta let from_block = match filter.event_filter.from_block { Some(BlockId::Tag(BlockTag::Pending)) => latest_block + 1, - Some(block_id) => starknet.get_block_n(&block_id)?, + Some(block_id) => { + let block_n = starknet.get_block_n(&block_id)?; + if block_n > latest_block { + return Err(StarknetRpcApiError::BlockNotFound); + } + block_n + } None => 0, }; let to_block = match filter.event_filter.to_block { Some(BlockId::Tag(BlockTag::Pending)) => latest_block + 1, - Some(block_id) => starknet.get_block_n(&block_id)?, + Some(block_id) => { + let block_n = starknet.get_block_n(&block_id)?; + if block_n > latest_block { + return Err(StarknetRpcApiError::BlockNotFound); + } + block_n + } None => latest_block, }; - if from_block > latest_block || to_block > latest_block { - return Err(StarknetRpcApiError::BlockNotFound); - } - if from_block > to_block { return Ok(EventsPage { events: vec![], continuation_token: None }); } @@ -184,6 +192,16 @@ mod test { }) } + fn block_info_pending(n: u64) -> mp_block::MadaraMaybePendingBlockInfo { + mp_block::MadaraMaybePendingBlockInfo::Pending(mp_block::MadaraPendingBlockInfo { + header: mp_block::header::PendingHeader { + parent_block_hash: starknet_core::types::Felt::from(n), + ..Default::default() + }, + tx_hashes: vec![], + }) + } + fn block_events(n: u64) -> Vec { vec![ mp_receipt::Event { @@ -196,7 +214,7 @@ mod test { data: vec![], }, mp_receipt::Event { - from_address: starknet_core::types::Felt::from(n), + from_address: starknet_core::types::Felt::from(n.saturating_add(1)), keys: vec![ starknet_core::types::Felt::ZERO, starknet_core::types::Felt::TWO, @@ -204,7 +222,11 @@ mod test { ], data: vec![], }, - mp_receipt::Event { from_address: starknet_core::types::Felt::from(n), keys: vec![], data: vec![] }, + mp_receipt::Event { + from_address: starknet_core::types::Felt::from(n.saturating_add(1)), + keys: vec![], + data: vec![], + }, ] } @@ -219,7 +241,7 @@ mod test { }), mp_receipt::TransactionReceipt::Invoke(mp_receipt::InvokeTransactionReceipt { events: block_events(n), - transaction_hash: starknet_core::types::Felt::from(n + 1), + transaction_hash: starknet_core::types::Felt::from(n), ..Default::default() }), ], @@ -261,6 +283,37 @@ mod test { }) } + fn generator_events_pending(backend: &mc_db::MadaraBackend) -> Vec { + let info = block_info_pending(u64::MAX); + let inner = block_inner(u64::MAX); + + backend + .store_block( + mp_block::MadaraMaybePendingBlock { info: info.clone(), inner: inner.clone() }, + mp_state_update::StateDiff::default(), + vec![], + ) + .expect("Storing block"); + + inner + .receipts + .into_iter() + .flat_map(move |receipt| { + let block_hash = info.block_hash(); + let block_number = info.block_n(); + let transaction_hash = receipt.transaction_hash(); + receipt.events_owned().into_iter().map(move |event| starknet_core::types::EmittedEvent { + from_address: event.from_address, + keys: event.keys, + data: event.data, + block_hash, + block_number, + transaction_hash, + }) + }) + .collect() + } + fn generator_blocks(backend: &mc_db::MadaraBackend) -> impl Iterator + '_ { (0..).map(|n| { let block = mp_block::MadaraMaybePendingBlock { info: block_info(n), inner: block_inner(n) }; @@ -338,7 +391,177 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: None, - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_pending(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = generator_events(&backend); + let mut expected = Vec::default(); + + expected.extend(generator.next().expect("Retrieving event from backend")); + expected.extend(generator_events_pending(&backend)); + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: Some(starknet_core::types::BlockId::Tag(starknet_core::types::BlockTag::Pending)), + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_from_address(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = generator_events(&backend); + let mut expected = Vec::default(); + + for _ in 0..3 { + generator + .next() + .expect("Retrieving event from backend") + .into_iter() + .filter(|event| event.from_address == starknet_core::types::Felt::TWO) + .collect_into(&mut expected); + } + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: None, + address: Some(starknet_core::types::Felt::TWO), + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_from_address_pending(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = generator_events(&backend); + let mut expected = Vec::default(); + + for _ in 0..3 { + generator + .next() + .expect("Retrieving event from backend") + .into_iter() + .filter(|event| event.from_address == starknet_core::types::Felt::from(u64::MAX)) + .collect_into(&mut expected); + } + + generator_events_pending(&backend) + .into_iter() + .filter(|event| event.from_address == starknet_core::types::Felt::from(u64::MAX)) + .collect_into(&mut expected); + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: Some(starknet_core::types::BlockId::Tag(starknet_core::types::BlockTag::Pending)), + address: Some(starknet_core::types::Felt::from(u64::MAX)), + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await @@ -382,7 +605,6 @@ mod test { .expect("Retrieving event from backend") .into_iter() .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO) - .take(10 - expected.len()) .collect_into(&mut expected); } @@ -396,7 +618,7 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: None, - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await @@ -440,7 +662,6 @@ mod test { .expect("Retrieving event from backend") .into_iter() .filter(|event| event.keys.len() > 1 && event.keys[1] == starknet_core::types::Felt::ONE) - .take(10 - expected.len()) .collect_into(&mut expected); } @@ -454,7 +675,7 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: None, - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await @@ -498,7 +719,6 @@ mod test { .expect("Retrieving event from backend") .into_iter() .filter(|event| event.keys.len() > 2 && event.keys[2] == starknet_core::types::Felt::TWO) - .take(10 - expected.len()) .collect_into(&mut expected); } @@ -512,7 +732,69 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: None, - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!( + "actual: {}\nexpected:{}", + serde_json::to_string_pretty(&events).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ) + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_with_keys_pending(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut generator = generator_events(&backend); + let mut expected = Vec::default(); + + for _ in 0..3 { + generator + .next() + .expect("Retrieving event from backend") + .into_iter() + .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO) + .collect_into(&mut expected); + } + + generator_events_pending(&backend) + .into_iter() + .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO) + .collect_into(&mut expected); + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: Some(starknet_core::types::BlockId::Tag(starknet_core::types::BlockTag::Pending)), + address: None, + keys: Some(vec![vec![starknet_core::types::Felt::ZERO]]), + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await @@ -648,7 +930,7 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: None, - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await @@ -689,7 +971,7 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: None, - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await @@ -730,7 +1012,7 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: None, - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await @@ -814,7 +1096,7 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: Some("".to_string()), - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await @@ -857,7 +1139,7 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: Some("0-100".to_string()), - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await @@ -895,7 +1177,7 @@ mod test { }, result_page_request: starknet_core::types::ResultPageRequest { continuation_token: None, - chunk_size: 10, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, }, }) .await From 47b1d781e76e30f76afa5285d9a3fac1709638f9 Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Wed, 6 Nov 2024 15:19:54 +0100 Subject: [PATCH 09/10] refactor(getEvents): block number retrieval --- .../v0_7_1/methods/read/get_events.rs | 48 +++++++++---------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index b38b47f20..b21e8c640 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -38,31 +38,9 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta return Err(StarknetRpcApiError::PageSizeTooBig); } - let latest_block = starknet.get_block_n(&BlockId::Tag(BlockTag::Latest))?; - - let from_block = match filter.event_filter.from_block { - Some(BlockId::Tag(BlockTag::Pending)) => latest_block + 1, - Some(block_id) => { - let block_n = starknet.get_block_n(&block_id)?; - if block_n > latest_block { - return Err(StarknetRpcApiError::BlockNotFound); - } - block_n - } - None => 0, - }; - - let to_block = match filter.event_filter.to_block { - Some(BlockId::Tag(BlockTag::Pending)) => latest_block + 1, - Some(block_id) => { - let block_n = starknet.get_block_n(&block_id)?; - if block_n > latest_block { - return Err(StarknetRpcApiError::BlockNotFound); - } - block_n - } - None => latest_block, - }; + let block_latest = starknet.get_block_n(&BlockId::Tag(BlockTag::Latest))?; + let from_block = to_block_n(filter.event_filter.from_block, starknet, 0)?; + let to_block = to_block_n(filter.event_filter.to_block, starknet, block_latest)?; if from_block > to_block { return Ok(EventsPage { events: vec![], continuation_token: None }); @@ -79,7 +57,7 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta let mut filtered_events: Vec = Vec::with_capacity(16); for block_n in from_block..=to_block { // PERF: this check can probably be hoisted out of this loop - let block = if block_n <= latest_block { + let block = if block_n <= block_latest { // PERF: This is probably the main bottleneck: we should be able to // mitigate this by implementing a db iterator starknet.get_block(&BlockId::Number(block_n))? @@ -120,6 +98,24 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta Ok(EventsPage { events: filtered_events, continuation_token: None }) } +fn to_block_n( + id: Option, + starknet: &Starknet, + default: u64, +) -> Result { + match id { + Some(BlockId::Tag(BlockTag::Pending)) => Ok(default + 1), + Some(block_id) => starknet.get_block_n(&block_id).and_then(|block_n| { + if block_n > default { + Err(StarknetRpcApiError::BlockNotFound) + } else { + Ok(block_n) + } + }), + None => Ok(default), + } +} + fn get_block_events( block: MadaraMaybePendingBlock, address: Option, From ec20242dbcb96126b31ecc62dcf1862215758220 Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Wed, 6 Nov 2024 18:09:27 +0100 Subject: [PATCH 10/10] perf(getEvents): got updated version of `getEvents` to work + more tests --- crates/client/db/src/block_db.rs | 2 +- crates/client/rpc/src/lib.rs | 2 +- .../v0_7_1/methods/read/get_events.rs | 715 +++++++++++------- 3 files changed, 451 insertions(+), 268 deletions(-) diff --git a/crates/client/db/src/block_db.rs b/crates/client/db/src/block_db.rs index 913d9eaa6..554b71277 100644 --- a/crates/client/db/src/block_db.rs +++ b/crates/client/db/src/block_db.rs @@ -371,7 +371,7 @@ impl MadaraBackend { } #[tracing::instrument(skip(self), fields(module = "BlockDB"))] - pub fn get_block_stream(&self, block_n: usize) -> Result + '_> { + pub fn get_block_stream(&self, block_n: u64) -> Result + '_> { let handle_block_info = self.db.get_column(Column::BlockNToBlockInfo); let handle_block_innr = self.db.get_column(Column::BlockNToBlockInner); diff --git a/crates/client/rpc/src/lib.rs b/crates/client/rpc/src/lib.rs index ab27bc936..74e697346 100644 --- a/crates/client/rpc/src/lib.rs +++ b/crates/client/rpc/src/lib.rs @@ -71,7 +71,7 @@ impl Starknet { .ok_or(StarknetRpcApiError::BlockNotFound) } - pub fn get_block_stream(&self, block_n: usize) -> StarknetRpcResult + '_> { + pub fn get_block_stream(&self, block_n: u64) -> StarknetRpcResult + '_> { self.backend.get_block_stream(block_n).map_err(|_| StarknetRpcApiError::BlockNotFound) } diff --git a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs index b21e8c640..0df4d4efd 100644 --- a/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs +++ b/crates/client/rpc/src/versions/v0_7_1/methods/read/get_events.rs @@ -1,4 +1,3 @@ -use mp_block::{MadaraMaybePendingBlock, MadaraMaybePendingBlockInfo}; use starknet_core::types::{BlockId, BlockTag, EmittedEvent, EventFilterWithPage, EventsPage, Felt}; use crate::constants::{MAX_EVENTS_CHUNK_SIZE, MAX_EVENTS_KEYS}; @@ -27,6 +26,10 @@ use crate::Starknet; /// errors, such as `PAGE_SIZE_TOO_BIG`, `INVALID_CONTINUATION_TOKEN`, `BLOCK_NOT_FOUND`, or /// `TOO_MANY_KEYS_IN_FILTER`, returns a `StarknetRpcApiError` indicating the specific issue. pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> StarknetRpcResult { + // ===================================================================== // + // Pre-search validation + // ===================================================================== // + let from_address = filter.event_filter.address; let keys = filter.event_filter.keys.unwrap_or_default(); let chunk_size = filter.result_page_request.chunk_size; @@ -39,132 +42,156 @@ pub async fn get_events(starknet: &Starknet, filter: EventFilterWithPage) -> Sta } let block_latest = starknet.get_block_n(&BlockId::Tag(BlockTag::Latest))?; - let from_block = to_block_n(filter.event_filter.from_block, starknet, 0)?; - let to_block = to_block_n(filter.event_filter.to_block, starknet, block_latest)?; + let (block_from, pending_from) = to_block_n(filter.event_filter.from_block, starknet, block_latest, 0)?; + + let (block_to, pending_to) = to_block_n(filter.event_filter.to_block, starknet, block_latest, block_latest)?; - if from_block > to_block { + if block_from > block_to || (pending_from && !pending_to) { return Ok(EventsPage { events: vec![], continuation_token: None }); } + // ===================================================================== // + // Continuation Token + // ===================================================================== // + let continuation_token = match filter.result_page_request.continuation_token { Some(token) => ContinuationToken::parse(token).map_err(|_| StarknetRpcApiError::InvalidContinuationToken)?, - None => ContinuationToken { block_n: from_block, event_n: 0 }, + None => ContinuationToken { block_n: block_from, event_n: 0 }, }; - let from_block = continuation_token.block_n; - - // PERF: we should truncate from_block to the creation block of the contract - // if it is less than that - let mut filtered_events: Vec = Vec::with_capacity(16); - for block_n in from_block..=to_block { - // PERF: this check can probably be hoisted out of this loop - let block = if block_n <= block_latest { - // PERF: This is probably the main bottleneck: we should be able to - // mitigate this by implementing a db iterator - starknet.get_block(&BlockId::Number(block_n))? - } else { - starknet.get_block(&BlockId::Tag(BlockTag::Pending))? - }; + let block_from = continuation_token.block_n; + + let mut events_filtered = Vec::with_capacity(16); + + if !pending_from { + // ================================================================= // + // Initial loop + // ================================================================= // - // PERF: collection needs to be more efficient - let block_filtered_events: Vec = get_block_events(block, from_address, &keys); + let mut block_stream = starknet.get_block_stream(block_from)?; + let block = block_stream.next().ok_or(StarknetRpcApiError::BlockNotFound)?; + let block_hash = block.info.block_hash; + let block_n = block.info.header.block_number; + let block_receipts = block.inner.receipts; - // PERF: this condition needs to be moved out the loop as it needs to happen only once - if block_n == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n { + let events_filtered_block = + get_block_events(Some(block_hash), Some(block_n), block_receipts, from_address, &keys).collect::>(); + let event_n = events_filtered_block.len() as u64; + + if event_n < continuation_token.event_n { return Err(StarknetRpcApiError::InvalidContinuationToken); } - // PERF: same here, hoist this out of the loop - #[allow(clippy::iter_skip_zero)] - let block_filtered_reduced_events: Vec = block_filtered_events + events_filtered_block .into_iter() - .skip(if block_n == from_block { continuation_token.event_n as usize } else { 0 }) - .take(chunk_size as usize - filtered_events.len()) - .collect(); + .skip(continuation_token.event_n as usize) + .take(chunk_size as usize) + .collect_into(&mut events_filtered); + + if events_filtered.len() == chunk_size as usize { + let event_n = continuation_token.event_n + chunk_size; + let token = Some(ContinuationToken { block_n, event_n }.to_string()); - let num_events = block_filtered_reduced_events.len(); + return Ok(EventsPage { events: events_filtered, continuation_token: token }); + } - // PERF: any better way to do this? Pre-allocation should reduce some - // of the allocations already - filtered_events.extend(block_filtered_reduced_events); + // ================================================================= // + // Main loop + // ================================================================= // - if filtered_events.len() == chunk_size as usize { - let event_n = - if block_n == from_block { continuation_token.event_n + chunk_size } else { num_events as u64 }; - let token = Some(ContinuationToken { block_n, event_n }.to_string()); + // FIXME: block stream should return a `Result` so we can + // detect db errors and they are not silenced! + for block in block_stream.take((block_to - block_from) as usize) { + let block_hash = block.info.block_hash; + let block_n = block.info.header.block_number; + + let event_n = events_filtered.len(); + get_block_events(Some(block_hash), Some(block_n), block.inner.receipts, from_address, &keys) + .take(chunk_size as usize - events_filtered.len()) + .collect_into(&mut events_filtered); + let event_n = (events_filtered.len() - event_n) as u64; - return Ok(EventsPage { events: filtered_events, continuation_token: token }); + if events_filtered.len() == chunk_size as usize { + let token = Some(ContinuationToken { block_n, event_n }.to_string()); + + return Ok(EventsPage { events: events_filtered, continuation_token: token }); + } } } - Ok(EventsPage { events: filtered_events, continuation_token: None }) + + if pending_to { + // ================================================================= // + // Pending loop + // ================================================================= // + + let block_id = &starknet_core::types::BlockId::Tag(starknet_core::types::BlockTag::Pending); + let block = starknet.get_block(block_id)?; + + get_block_events(None, None, block.inner.receipts, from_address, &keys) + .take(chunk_size as usize - events_filtered.len()) + .collect_into(&mut events_filtered); + } + + Ok(EventsPage { events: events_filtered, continuation_token: None }) } fn to_block_n( id: Option, starknet: &Starknet, + latest: u64, default: u64, -) -> Result { +) -> Result<(u64, bool), StarknetRpcApiError> { match id { - Some(BlockId::Tag(BlockTag::Pending)) => Ok(default + 1), + Some(BlockId::Tag(BlockTag::Pending)) => Ok((latest, true)), Some(block_id) => starknet.get_block_n(&block_id).and_then(|block_n| { - if block_n > default { + if block_n > latest { Err(StarknetRpcApiError::BlockNotFound) } else { - Ok(block_n) + Ok((block_n, false)) } }), - None => Ok(default), + None => Ok((default, false)), } } fn get_block_events( - block: MadaraMaybePendingBlock, + block_hash: Option, + block_number: Option, + receipts: Vec, address: Option, keys: &[Vec], -) -> Vec { - // PERF:: this check can probably be removed by handling pending blocks - // separatly - let (block_hash, block_number) = match &block.info { - MadaraMaybePendingBlockInfo::Pending(_) => (None, None), - MadaraMaybePendingBlockInfo::NotPending(block) => (Some(block.block_hash), Some(block.header.block_number)), - }; +) -> impl Iterator + '_ { + receipts.into_iter().flat_map(move |receipt| { + let transaction_hash = receipt.transaction_hash(); - block - .inner - .receipts - .into_iter() - .flat_map(move |receipt| { - let transaction_hash = receipt.transaction_hash(); - - receipt.events_owned().into_iter().filter_map(move |event| { - if address.is_some() && address.unwrap() != event.from_address { - return None; - } - - // Keys are matched as follows: - // - // - `keys` is an array of Felt - // - `keys[n]` represents allowed value for event key at index n - // - so `event.keys[n]` needs to match any value in `keys[n]` - let match_keys = keys - .iter() - .enumerate() - .all(|(i, keys)| event.keys.len() > i && (keys.is_empty() || keys.contains(&event.keys[i]))); - - if !match_keys { - None - } else { - Some(EmittedEvent { - from_address: event.from_address, - keys: event.keys, - data: event.data, - block_hash, - block_number, - transaction_hash, - }) - } - }) + receipt.events_owned().into_iter().filter_map(move |event| { + if address.is_some() && address.unwrap() != event.from_address { + return None; + } + + // Keys are matched as follows: + // + // - `keys` is an array of Felt + // - `keys[n]` represents allowed value for event key at index n + // - so `event.keys[n]` needs to match any value in `keys[n]` + let match_keys = keys + .iter() + .enumerate() + .all(|(i, keys)| event.keys.len() > i && (keys.is_empty() || keys.contains(&event.keys[i]))); + + if !match_keys { + None + } else { + Some(EmittedEvent { + from_address: event.from_address, + keys: event.keys, + data: event.data, + block_hash, + block_number, + transaction_hash, + }) + } }) - .collect() + }) } #[cfg(test)] @@ -340,11 +367,7 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&blocks).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } @@ -403,17 +426,13 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } #[tokio::test] #[rstest::rstest] - async fn get_events_pending(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + async fn get_events_from_block(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { let (backend, starknet) = rpc_test_setup; let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); @@ -422,10 +441,253 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = generator_events(&backend); - let mut expected = Vec::default(); + let blocks = + generator_events(&backend).take(3).flatten().filter(|event| event.block_number.is_some_and(|n| n >= 1)); + let expected = Vec::from_iter(blocks); + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: Some(starknet_core::types::BlockId::Number(1)), + to_block: None, + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!("Events do not match (see test output for more details)"); + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_from_block_pending(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let _ = generator_events(&backend).take(3).last(); + let expected = generator_events_pending(&backend); + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: Some(starknet_core::types::BlockId::Tag(starknet_core::types::BlockTag::Pending)), + to_block: Some(starknet_core::types::BlockId::Tag(starknet_core::types::BlockTag::Pending)), + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!("Events do not match (see test output for more details)"); + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_from_block_invalid(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let _ = generator_events(&backend).next().expect("Retrieving event from backend"); + let expected = crate::StarknetRpcApiError::BlockNotFound; + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: Some(starknet_core::types::BlockId::Number(1)), + to_block: None, + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .err() + .expect("starknet_getEvents"); + + let jsonrpsee::core::client::Error::Call(error_object) = events else { + panic!("starknet_getEvents"); + }; + + assert_eq!(error_object.code(), Into::::into(&expected)); + assert_eq!(error_object.message(), expected.to_string()); + assert!(error_object.data().is_none()); + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_from_block_out_of_order( + rpc_test_setup: (std::sync::Arc, crate::Starknet), + ) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let _ = generator_events(&backend).take(3).last(); + let _ = generator_events_pending(&backend); + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: Some(starknet_core::types::BlockId::Number(1)), + to_block: Some(starknet_core::types::BlockId::Number(0)), + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + assert_eq!(events, []); + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_from_block_out_of_order_pending( + rpc_test_setup: (std::sync::Arc, crate::Starknet), + ) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); - expected.extend(generator.next().expect("Retrieving event from backend")); + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let _ = generator_events(&backend).take(3).last(); + let _ = generator_events_pending(&backend); + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: Some(starknet_core::types::BlockId::Tag(starknet_core::types::BlockTag::Pending)), + to_block: None, + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + assert_eq!(events, []); + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_to_block(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let blocks = + generator_events(&backend).take(3).flatten().filter(|event| event.block_number.is_some_and(|n| n <= 1)); + let expected = Vec::from_iter(blocks); + + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: Some(starknet_core::types::BlockId::Number(1)), + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .expect("starknet_getEvents") + .events; + + if events != expected { + let file_events = std::fs::File::create("./test_output_actual.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_events); + serde_json::to_writer_pretty(writter, &events).unwrap_or_default(); + + let file_expected = std::fs::File::create("./test_output_events.json").expect("Opening file"); + let writter = std::io::BufWriter::new(file_expected); + serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); + + panic!("Events do not match (see test output for more details)"); + } + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_to_block_pending(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let mut expected = Vec::from_iter(generator_events(&backend).take(3).flatten()); expected.extend(generator_events_pending(&backend)); let events = client @@ -454,17 +716,13 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } #[tokio::test] #[rstest::rstest] - async fn get_events_from_address(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + async fn get_events_to_block_invalid(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { let (backend, starknet) = rpc_test_setup; let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); @@ -473,17 +731,51 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = generator_events(&backend); - let mut expected = Vec::default(); + let _ = generator_events(&backend).take(3).last(); + let expected = crate::StarknetRpcApiError::BlockNotFound; - for _ in 0..3 { - generator - .next() - .expect("Retrieving event from backend") - .into_iter() - .filter(|event| event.from_address == starknet_core::types::Felt::TWO) - .collect_into(&mut expected); - } + let events = client + .get_events(starknet_core::types::EventFilterWithPage { + event_filter: starknet_core::types::EventFilter { + from_block: None, + to_block: Some(starknet_core::types::BlockId::Number(3)), + address: None, + keys: None, + }, + result_page_request: starknet_core::types::ResultPageRequest { + continuation_token: None, + chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, + }, + }) + .await + .err() + .expect("starknet_getEvents"); + + let jsonrpsee::core::client::Error::Call(error_object) = events else { + panic!("starknet_getEvents"); + }; + + assert_eq!(error_object.code(), Into::::into(&expected)); + assert_eq!(error_object.message(), expected.to_string()); + assert!(error_object.data().is_none()); + } + + #[tokio::test] + #[rstest::rstest] + async fn get_events_from_address(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); + + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); + let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); + + let blocks = generator_events(&backend) + .take(3) + .flatten() + .filter(|event| event.from_address == starknet_core::types::Felt::TWO); + let expected = Vec::from_iter(blocks); let events = client .get_events(starknet_core::types::EventFilterWithPage { @@ -511,11 +803,7 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } @@ -530,17 +818,11 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = generator_events(&backend); - let mut expected = Vec::default(); - - for _ in 0..3 { - generator - .next() - .expect("Retrieving event from backend") - .into_iter() - .filter(|event| event.from_address == starknet_core::types::Felt::from(u64::MAX)) - .collect_into(&mut expected); - } + let blocks = generator_events(&backend) + .take(3) + .flatten() + .filter(|event| event.from_address == starknet_core::types::Felt::from(u64::MAX)); + let mut expected = Vec::from_iter(blocks); generator_events_pending(&backend) .into_iter() @@ -573,11 +855,7 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } @@ -592,17 +870,11 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = generator_events(&backend); - let mut expected = Vec::default(); - - for _ in 0..3 { - generator - .next() - .expect("Retrieving event from backend") - .into_iter() - .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO) - .collect_into(&mut expected); - } + let blocks = generator_events(&backend) + .take(3) + .flatten() + .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO); + let expected = Vec::from_iter(blocks); let events = client .get_events(starknet_core::types::EventFilterWithPage { @@ -630,17 +902,13 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } #[tokio::test] #[rstest::rstest] - async fn get_events_with_keys_hard(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { + async fn get_events_with_keys2(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { let (backend, starknet) = rpc_test_setup; let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); @@ -649,17 +917,11 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = generator_events(&backend); - let mut expected = Vec::default(); - - for _ in 0..3 { - generator - .next() - .expect("Retrieving event from backend") - .into_iter() - .filter(|event| event.keys.len() > 1 && event.keys[1] == starknet_core::types::Felt::ONE) - .collect_into(&mut expected); - } + let blocks = generator_events(&backend) + .take(3) + .flatten() + .filter(|event| event.keys.len() > 1 && event.keys[1] == starknet_core::types::Felt::ONE); + let expected = Vec::from_iter(blocks); let events = client .get_events(starknet_core::types::EventFilterWithPage { @@ -687,11 +949,7 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } @@ -706,17 +964,11 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = generator_events(&backend); - let mut expected = Vec::default(); - - for _ in 0..3 { - generator - .next() - .expect("Retrieving event from backend") - .into_iter() - .filter(|event| event.keys.len() > 2 && event.keys[2] == starknet_core::types::Felt::TWO) - .collect_into(&mut expected); - } + let blocks = generator_events(&backend) + .take(3) + .flatten() + .filter(|event| event.keys.len() > 2 && event.keys[2] == starknet_core::types::Felt::TWO); + let expected = Vec::from_iter(blocks); let events = client .get_events(starknet_core::types::EventFilterWithPage { @@ -744,11 +996,7 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } @@ -763,17 +1011,11 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = generator_events(&backend); - let mut expected = Vec::default(); - - for _ in 0..3 { - generator - .next() - .expect("Retrieving event from backend") - .into_iter() - .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO) - .collect_into(&mut expected); - } + let blocks = generator_events(&backend) + .take(3) + .flatten() + .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO); + let mut expected = Vec::from_iter(blocks); generator_events_pending(&backend) .into_iter() @@ -806,11 +1048,7 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } @@ -827,17 +1065,11 @@ mod test { let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - let mut generator = generator_events(&backend); - let mut expected = Vec::default(); - - for _ in 0..3 { - generator - .next() - .expect("Retrieving event from backend") - .into_iter() - .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO) - .collect_into(&mut expected); - } + let blocks = generator_events(&backend) + .take(3) + .flatten() + .filter(|event| !event.keys.is_empty() && event.keys[0] == starknet_core::types::Felt::ZERO); + let expected = Vec::from_iter(blocks); let events = client .get_events(starknet_core::types::EventFilterWithPage { @@ -864,11 +1096,7 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } let events = client @@ -896,11 +1124,7 @@ mod test { let writter = std::io::BufWriter::new(file_expected); serde_json::to_writer_pretty(writter, &expected).unwrap_or_default(); - panic!( - "actual: {}\nexpected:{}", - serde_json::to_string_pretty(&events).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ) + panic!("Events do not match (see test output for more details)"); } } @@ -942,47 +1166,6 @@ mod test { assert!(error_object.data().is_none()); } - #[tokio::test] - #[rstest::rstest] - async fn get_events_block_invalid_from(rpc_test_setup: (std::sync::Arc, crate::Starknet)) { - let (backend, starknet) = rpc_test_setup; - let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); - let server_url = format!("http://{}", server.local_addr().expect("Retrieving server local address")); - - // Server will be stopped once this is dropped - let _server_handle = server.start(StarknetReadRpcApiV0_7_1Server::into_rpc(starknet)); - let client = HttpClientBuilder::default().build(&server_url).expect("Building client"); - - let mut generator = generator_events(&backend); - let _ = generator.next().expect("Retrieving event from backend"); - let expected = crate::StarknetRpcApiError::BlockNotFound; - - let events = client - .get_events(starknet_core::types::EventFilterWithPage { - event_filter: starknet_core::types::EventFilter { - from_block: Some(starknet_core::types::BlockId::Number(1)), - to_block: None, - address: None, - keys: None, - }, - result_page_request: starknet_core::types::ResultPageRequest { - continuation_token: None, - chunk_size: crate::constants::MAX_EVENTS_CHUNK_SIZE, - }, - }) - .await - .err() - .expect("starknet_getEvents"); - - let jsonrpsee::core::client::Error::Call(error_object) = events else { - panic!("starknet_getEvents"); - }; - - assert_eq!(error_object.code(), Into::::into(&expected)); - assert_eq!(error_object.message(), expected.to_string()); - assert!(error_object.data().is_none()); - } - #[tokio::test] #[rstest::rstest] async fn get_events_block_invalid_to(rpc_test_setup: (std::sync::Arc, crate::Starknet)) {