Skip to content

Commit

Permalink
Merge pull request #111 from jbcaron/events
Browse files Browse the repository at this point in the history
fix: 🐛 use block storage events
  • Loading branch information
antiyro authored Feb 10, 2024
2 parents f0f3373 + 28782f6 commit a9ba8cd
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 101 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ git # Madara Changelog

## Next release

- fix: get_events paging with continuation_token
- fux(getStorageAt): #28
- fix(genesis): #107

- fix(class): #32 #33 #34
- fix(class): #116
- feat(class): download classes from sequencer
Expand Down
149 changes: 59 additions & 90 deletions crates/client/rpc/src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#[cfg(test)]
mod tests;

use std::iter::Skip;
use std::vec::IntoIter;

use jsonrpsee::core::RpcResult;
use log::error;
use mc_rpc_core::utils::get_block_by_block_hash;
Expand Down Expand Up @@ -49,62 +46,49 @@ where
StarknetRpcApiError::BlockNotFound
})?;

let block_extrinsics = self
.client
.block_body(substrate_block_hash)
.map_err(|e| {
error!("Failed to retrieve block body. Substrate block hash: {substrate_block_hash}, error: {e}");
StarknetRpcApiError::InternalServerError
})?
.ok_or(StarknetRpcApiError::BlockNotFound)?;

let chain_id = self.client.runtime_api().chain_id(substrate_block_hash).map_err(|_| {
error!("Failed to retrieve chain id");
StarknetRpcApiError::InternalServerError
let starknet_block = get_block_by_block_hash(self.client.as_ref(), substrate_block_hash).map_err(|e| {
error!("'{e}'");
StarknetRpcApiError::BlockNotFound
})?;

let index_and_events =
self.client.runtime_api().get_starknet_events_and_their_associated_tx_index(substrate_block_hash).map_err(
|e| {
error!(
"Failed to retrieve starknet events and their associated transaction index. Substrate block \
hash: {substrate_block_hash}, chain ID: {chain_id:?}, error: {e}"
);
StarknetRpcApiError::InternalServerError
},
)?;
let block_hash = starknet_block.header().hash::<H>();

let starknet_block = get_block_by_block_hash(self.client.as_ref(), substrate_block_hash).map_err(|e| {
error!("Failed to retrieve starknet block from substrate block hash: error: {e}");
let chain_id = self.client.runtime_api().chain_id(substrate_block_hash).map_err(|_| {
error!("Failed to retrieve chain id");
StarknetRpcApiError::InternalServerError
})?;
let txn_hashes = self.get_cached_transaction_hashes(starknet_block.header().hash::<H>().into());
let block_extrinsics_len = block_extrinsics.len();
let starknet_txs =
self.client.runtime_api().extrinsic_filter(substrate_block_hash, block_extrinsics).map_err(|e| {
error!("Failed to filter extrinsics. Substrate block hash: {substrate_block_hash}, error: {e}");
StarknetRpcApiError::InternalServerError
})?;
let inherent_count = block_extrinsics_len - starknet_txs.len();
let mut tx_hash_and_events: Vec<(Felt252Wrapper, starknet_api::transaction::Event)> = vec![];
for (index, event) in index_and_events {
let tx_index = index as usize - inherent_count;
let tx_hash = self.try_txn_hash_from_cache(
tx_index,
&txn_hashes,
&starknet_txs,
chain_id,
Some(starknet_block.header().block_number),
)?;
tx_hash_and_events.push((tx_hash, event));
}

let starknet_block = match get_block_by_block_hash(self.client.as_ref(), substrate_block_hash) {
Ok(block) => block,
Err(_) => return Err(StarknetRpcApiError::BlockNotFound),
// get txs hashes from cache or compute them
let block_txs_hashes: Vec<_> = if let Some(tx_hashes) = self.get_cached_transaction_hashes(block_hash.into()) {
tx_hashes
.into_iter()
.map(|h| {
Felt252Wrapper::try_from(h)
.map(|f| f.0)
.map_err(|e| {
error!("'{e}'");
StarknetRpcApiError::InternalServerError
})
.unwrap()
})
.collect()
} else {
starknet_block
.transactions_hashes::<H>(chain_id.into(), Some(starknet_block.header().block_number))
.map(FieldElement::from)
.collect()
};

let block_hash = starknet_block.header().hash::<H>();
// get txs hashes and events from block
// the txs hashes are found by the index of the ordered event
let tx_hash_and_events: Vec<(Felt252Wrapper, _)> = starknet_block
.events()
.into_iter()
.flat_map(|ordered_event| {
let tx_hash = block_txs_hashes[ordered_event.index() as usize];
ordered_event.events().into_iter().map(move |events| (tx_hash.into(), events.clone()))
})
.collect();

let emitted_events = tx_hash_and_events
.into_iter()
Expand Down Expand Up @@ -134,52 +118,43 @@ where
// get filter values
let continuation_token = filter.continuation_token;
// skip blocks with continuation token block number
let from_block = filter.from_block + continuation_token.block_n;
let from_block = continuation_token.block_n;
let mut current_block = from_block;
let to_block = filter.to_block;
let from_address = filter.from_address;
let keys = filter.keys;
let chunk_size = filter.chunk_size;

let mut filtered_events = Vec::new();
let mut filtered_events: Vec<EmittedEvent> = Vec::new();

// Iterate on block range
while current_block <= to_block {
let emitted_events = self.get_block_events(current_block)?;
let mut unchecked_events = emitted_events.len();
let events = if current_block == from_block {
// check if continuation_token.event_n is not too big
if (unchecked_events as u64) < continuation_token.event_n {
return Err(StarknetRpcApiError::InvalidContinuationToken.into());
}
unchecked_events -= continuation_token.event_n as usize;
emitted_events.into_iter().skip(continuation_token.event_n as usize)
} else {
#[allow(clippy::iter_skip_zero)]
emitted_events.into_iter().skip(0)
};

let mut n_visited = 0;
let block_filtered_events = filter_events_by_params(
events,
from_address,
&keys,
chunk_size as usize - filtered_events.len(),
&mut n_visited,
);

filtered_events.extend(block_filtered_events);

let block_filtered_events: Vec<EmittedEvent> = filter_events_by_params(emitted_events, from_address, &keys);

if current_block == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n {
return Err(StarknetRpcApiError::InvalidContinuationToken.into());
}

#[allow(clippy::iter_skip_zero)]
let block_filtered_reduced_events: Vec<EmittedEvent> = block_filtered_events
.into_iter()
.skip(if current_block == from_block { continuation_token.event_n as usize } else { 0 })
.take(chunk_size as usize - filtered_events.len())
.collect();

let num_events = block_filtered_reduced_events.len();

filtered_events.extend(block_filtered_reduced_events);

if filtered_events.len() == chunk_size as usize {
let token = if current_block < to_block || n_visited < unchecked_events {
let mut event_n = n_visited as u64;
if continuation_token.block_n == current_block {
event_n += continuation_token.event_n;
}
Some(ContinuationToken { block_n: current_block - from_block, event_n }.to_string())
let event_n = if current_block == from_block {
continuation_token.event_n + chunk_size
} else {
None
num_events as u64
};
let token = Some(ContinuationToken { block_n: current_block, event_n }.to_string());

return Ok(EventsPage { events: filtered_events, continuation_token: token });
}
Expand All @@ -205,17 +180,14 @@ where
/// * `(block_events: Vec<EventWrapper>, continuation_token: usize)` - A tuple of the filtered
/// events and the first index which still hasn't been processed block_id and an instance of Block
pub fn filter_events_by_params<'a, 'b: 'a>(
events: Skip<IntoIter<EmittedEvent>>,
events: Vec<EmittedEvent>,
address: Option<Felt252Wrapper>,
keys: &'a [Vec<FieldElement>],
max_results: usize,
n_visited: &'b mut usize,
) -> Vec<EmittedEvent> {
let mut filtered_events = vec![];

// Iterate on block events.
for event in events {
*n_visited += 1;
let match_from_address = address.map_or(true, |addr| addr.0 == event.from_address);
// Based on https://github.com/starkware-libs/papyrus
let match_keys = keys
Expand All @@ -225,9 +197,6 @@ pub fn filter_events_by_params<'a, 'b: 'a>(

if match_from_address && match_keys {
filtered_events.push(event);
if filtered_events.len() >= max_results {
break;
}
}
}
filtered_events
Expand Down
15 changes: 8 additions & 7 deletions crates/client/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1304,13 +1304,6 @@ where
/// errors, such as `PAGE_SIZE_TOO_BIG`, `INVALID_CONTINUATION_TOKEN`, `BLOCK_NOT_FOUND`, or
/// `TOO_MANY_KEYS_IN_FILTER`, returns a `StarknetRpcApiError` indicating the specific issue.
async fn get_events(&self, filter: EventFilterWithPage) -> RpcResult<EventsPage> {
let continuation_token = match filter.result_page_request.continuation_token {
Some(token) => types::ContinuationToken::parse(token).map_err(|e| {
error!("Failed to parse continuation token: {:?}", e);
StarknetRpcApiError::InvalidContinuationToken
})?,
None => types::ContinuationToken::default(),
};
let from_address = filter.event_filter.address.map(Felt252Wrapper::from);
let keys = filter.event_filter.keys.unwrap_or_default();
let chunk_size = filter.result_page_request.chunk_size;
Expand Down Expand Up @@ -1343,6 +1336,14 @@ where
StarknetRpcApiError::BlockNotFound
})?;

let continuation_token = match filter.result_page_request.continuation_token {
Some(token) => types::ContinuationToken::parse(token).map_err(|e| {
error!("Failed to parse continuation token: {:?}", e);
StarknetRpcApiError::InvalidContinuationToken
})?,
None => types::ContinuationToken { block_n: from_block.into(), event_n: 0 },
};

// Verify that the requested range is valid
if from_block > to_block {
return Ok(EventsPage { events: vec![], continuation_token: None });
Expand Down
8 changes: 4 additions & 4 deletions crates/client/rpc/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ pub enum ParseTokenError {

impl fmt::Display for ContinuationToken {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:x},{:x}", self.block_n, self.event_n)
write!(f, "{}-{}", self.block_n, self.event_n)
}
}

impl ContinuationToken {
pub fn parse(token: String) -> Result<Self, ParseTokenError> {
let arr: Vec<&str> = token.split(',').collect();
let arr: Vec<&str> = token.split('-').collect();
if arr.len() != 2 {
return Err(ParseTokenError::WrongToken);
}
let block_n = u64::from_str_radix(arr[0], 16).map_err(ParseTokenError::ParseFailed)?;
let event_n = u64::from_str_radix(arr[1], 16).map_err(ParseTokenError::ParseFailed)?;
let block_n = u64::from_str_radix(arr[0], 10).map_err(ParseTokenError::ParseFailed)?;
let event_n = u64::from_str_radix(arr[1], 10).map_err(ParseTokenError::ParseFailed)?;

Ok(ContinuationToken { block_n, event_n })
}
Expand Down

0 comments on commit a9ba8cd

Please sign in to comment.