Skip to content

Commit

Permalink
fix: stream era1 block tuples (#1246)
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita authored Apr 9, 2024
1 parent 50e254a commit 778415d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
51 changes: 29 additions & 22 deletions portal-bridge/src/bridge/era1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,12 @@ impl Era1Bridge {
let era1_file = era1_files
.next()
.expect("to be able to get first era1 file");
let epoch = era1_file
.split('-')
.nth(1)
.expect("to be able to get epoch from era1 file")
.parse::<u64>()
.expect("to be able to parse epoch from era1 file");
match floor {
Some(floor) => {
let epoch = match get_epoch_from_era1_path(&era1_file) {
Ok(epoch) => epoch,
Err(e) => panic!("Failed to get epoch from era1 file: {e}"),
};
if epoch >= floor {
break era1_file;
}
Expand Down Expand Up @@ -161,9 +159,12 @@ impl Era1Bridge {
.recv_bytes()
.await
.unwrap_or_else(|_| panic!("unable to read era1 file at path: {era1_path:?}"));
let era1 = Era1::deserialize(&raw_era1).expect("to be able to deserialize era1 file");
let epoch_index = era1.block_tuples[0].header.header.number / EPOCH_SIZE as u64;
info!("Era1 file read successfully, gossiping block tuples for epoch: {epoch_index}");
let epoch_index = match get_epoch_from_era1_path(&era1_path) {
Ok(epoch) => epoch,
Err(e) => {
panic!("Failed to get epoch from era1 file: {e}");
}
};
let epoch_acc = match self.get_epoch_acc(epoch_index).await {
Ok(epoch_acc) => epoch_acc,
Err(e) => {
Expand All @@ -172,20 +173,15 @@ impl Era1Bridge {
}
};
let master_acc = Arc::new(self.header_oracle.master_acc.clone());
info!("Era1 file read successfully, gossiping block tuples for epoch: {epoch_index}");
let mut serve_block_tuple_handles = vec![];
let block_tuples = match gossip_range {
Some(range) => era1
.block_tuples
.clone()
.into_iter()
.filter(|block_tuple| {
let block_number = block_tuple.header.header.number;
block_number >= range.start && block_number <= range.end
})
.collect(),
None => era1.block_tuples,
};
for block_tuple in block_tuples {
for block_tuple in Era1::iter_tuples(raw_era1) {
let block_number = block_tuple.header.header.number;
if let Some(range) = gossip_range.clone() {
if !range.contains(&block_number) {
continue;
}
}
let permit = gossip_send_semaphore
.clone()
.acquire_owned()
Expand Down Expand Up @@ -484,3 +480,14 @@ async fn get_shuffled_era1_files(http_client: &Client) -> anyhow::Result<Vec<Str
era1_files.shuffle(&mut thread_rng());
Ok(era1_files)
}

fn get_epoch_from_era1_path(era1_path: &str) -> anyhow::Result<u64> {
let era1_path = era1_path.to_string();
ensure!(era1_path.contains("mainnet-"), "invalid era1 path");
let epoch_str = era1_path
.split('-')
.nth(1)
.ok_or_else(|| anyhow!("invalid era1 path"))?;
let epoch = epoch_str.parse::<u64>()?;
Ok(epoch)
}
17 changes: 17 additions & 0 deletions portal-bridge/src/types/era1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ impl Era1 {
Self::deserialize(&buf)
}

/// Function to iterate over block tuples in an era1 file
/// this is useful for processing large era1 files without storing the entire
/// deserialized era1 object in memory.
pub fn iter_tuples(raw_era1: Vec<u8>) -> impl Iterator<Item = BlockTuple> {
let file = E2StoreFile::deserialize(&raw_era1).expect("invalid era1 file");
let block_index = BlockIndexEntry::try_from(&file.entries[32770])
.expect("invalid block index entry")
.block_index;
(0..block_index.count).map(move |i| {
let mut entries: [Entry; 4] = Default::default();
for (j, entry) in entries.iter_mut().enumerate() {
*entry = file.entries[i as usize * 4 + j + 1].to_owned();
}
BlockTuple::try_from(&entries).expect("invalid block tuple")
})
}

pub fn deserialize(buf: &[u8]) -> anyhow::Result<Self> {
let file = E2StoreFile::deserialize(buf)?;
ensure!(
Expand Down

0 comments on commit 778415d

Please sign in to comment.