Skip to content

Commit

Permalink
Ranged HAMT iteration specified via keys (#1665)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexytsu authored Feb 15, 2023
1 parent c80ad6c commit 31e7c61
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ target
!testing/integration/tests/assets/*
testing/conformance/traces
.idea/
.vscode/
lcov.info
69 changes: 69 additions & 0 deletions ipld/hamt/src/hamt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use multihash::Code;
use serde::de::DeserializeOwned;
use serde::{Serialize, Serializer};

use crate::hash_bits::HashBits;
use crate::node::Node;
use crate::{Config, Error, Hash, HashAlgorithm, Sha256};

Expand Down Expand Up @@ -362,6 +363,74 @@ where
self.root.for_each(self.store.borrow(), &mut f)
}

/// Iterates over each KV in the Hamt and runs a function on the values. If starting key is
/// provided, iteration will start from that key. If max is provided, iteration will stop after
/// max number of items have been traversed. The number of items that were traversed is
/// returned. If there are more items in the Hamt after max items have been traversed, the key
/// of the next item will be returned.
///
/// This function will constrain all values to be of the same type
///
/// # Examples
///
/// ```
/// use fvm_ipld_hamt::Hamt;
///
/// let store = fvm_ipld_blockstore::MemoryBlockstore::default();
///
/// let mut map: Hamt<_, _, u64> = Hamt::new(store);
/// map.set(1, 1).unwrap();
/// map.set(2, 2).unwrap();
/// map.set(3, 3).unwrap();
/// map.set(4, 4).unwrap();
///
/// let mut numbers = vec![];
///
/// map.for_each_ranged(None, None, |_, v: &u64| {
/// numbers.push(*v);
/// Ok(())
/// }).unwrap();
///
/// let mut subset = vec![];
///
/// let (_, next_key) = map.for_each_ranged(Some(&numbers[0]), Some(2), |_, v: &u64| {
/// subset.push(*v);
/// Ok(())
/// }).unwrap();
///
/// assert_eq!(subset, numbers[..2]);
/// assert_eq!(next_key.unwrap(), numbers[2]);
/// ```
#[inline]
pub fn for_each_ranged<Q: ?Sized, F>(
&self,
starting_key: Option<&Q>,
max: Option<usize>,
mut f: F,
) -> Result<(usize, Option<K>), Error>
where
K: Borrow<Q> + Clone,
Q: Eq + Hash,
V: DeserializeOwned,
F: FnMut(&K, &V) -> anyhow::Result<()>,
{
match starting_key {
Some(key) => {
let hash = H::hash(key);
self.root.for_each_ranged(
self.store.borrow(),
&self.conf,
Some((HashBits::new(&hash), key)),
max,
&mut f,
)
}
None => self
.root
.for_each_ranged(self.store.borrow(), &self.conf, None, max, &mut f),
}
}

/// Consumes this HAMT and returns the Blockstore it owns.
pub fn into_store(self) -> BS {
self.store
Expand Down
103 changes: 103 additions & 0 deletions ipld/hamt/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,109 @@ where
Ok(())
}

pub(crate) fn for_each_ranged<Q: ?Sized, S, F>(
&self,
store: &S,
conf: &Config,
mut starting_cursor: Option<(HashBits, &Q)>,
limit: Option<usize>,
f: &mut F,
) -> Result<(usize, Option<K>), Error>
where
K: Borrow<Q> + Clone,
Q: Eq + Hash,
F: FnMut(&K, &V) -> anyhow::Result<()>,
S: Blockstore,
{
// determine which subtree the starting_cursor is in
let cindex = match starting_cursor {
Some((ref mut bits, _)) => {
let idx = bits.next(conf.bit_width)?;
self.index_for_bit_pos(idx)
}
None => 0,
};

let mut traversed_count = 0;

// skip exploration of subtrees that are before the subtree which contains the cursor
for p in &self.pointers[cindex..] {
match p {
Pointer::Link { cid, cache } => {
if let Some(cached_node) = cache.get() {
let (traversed, key) = cached_node.for_each_ranged(
store,
conf,
starting_cursor.take(),
limit.map(|l| l.checked_sub(traversed_count).unwrap()),
f,
)?;
traversed_count += traversed;
if limit.map_or(false, |l| traversed_count >= l) && key.is_some() {
return Ok((traversed_count, key));
}
} else {
let node = if let Some(node) = store.get_cbor(cid)? {
node
} else {
#[cfg(not(feature = "ignore-dead-links"))]
return Err(Error::CidNotFound(cid.to_string()));

#[cfg(feature = "ignore-dead-links")]
continue;
};

// Ignore error intentionally, the cache value will always be the same
let cache_node = cache.get_or_init(|| node);
let (traversed, key) = cache_node.for_each_ranged(
store,
conf,
starting_cursor.take(),
limit.map(|l| l.checked_sub(traversed_count).unwrap()),
f,
)?;
traversed_count += traversed;
if limit.map_or(false, |l| traversed_count >= l) && key.is_some() {
return Ok((traversed_count, key));
}
}
}
Pointer::Dirty(node) => {
let (traversed, key) = node.for_each_ranged(
store,
conf,
starting_cursor.take(),
limit.map(|l| l.checked_sub(traversed_count).unwrap()),
f,
)?;
traversed_count += traversed;
if limit.map_or(false, |l| traversed_count >= l) && key.is_some() {
return Ok((traversed_count, key));
}
}
Pointer::Values(kvs) => {
for kv in kvs {
if limit.map_or(false, |l| traversed_count == l) {
// we have already found all requested items, return the key of the next item
return Ok((traversed_count, Some(kv.0.clone())));
} else if starting_cursor.map_or(false, |(_, key)| key.eq(kv.0.borrow())) {
// mark that we have arrived at the starting cursor
starting_cursor = None
}

if starting_cursor.is_none() {
// have already passed the start cursor
f(&kv.0, kv.1.borrow())?;
traversed_count += 1;
}
}
}
}
}

Ok((traversed_count, None))
}

/// Search for a key.
fn search<Q: ?Sized, S: Blockstore>(
&self,
Expand Down
173 changes: 173 additions & 0 deletions ipld/hamt/tests/hamt_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,168 @@ fn for_each(factory: HamtFactory, stats: Option<BSStats>, mut cids: CidChecker)
}
}

fn for_each_ranged(factory: HamtFactory, stats: Option<BSStats>, mut cids: CidChecker) {
let mem = MemoryBlockstore::default();
let store = TrackingBlockstore::new(&mem);

let mut hamt: Hamt<_, usize> = factory.new_with_bit_width(&store, 5);

const RANGE: usize = 200;
for i in 0..RANGE {
hamt.set(tstring(i), i).unwrap();
}

// collect all KV paris by iterating through the entire hamt
let mut kvs = Vec::new();
hamt.for_each(|k, v| {
assert_eq!(k, &tstring(v));
kvs.push((k.clone(), *v));
Ok(())
})
.unwrap();

// Iterate through the array, requesting pages of different sizes
for page_size in 0..RANGE {
let mut kvs_variable_page = Vec::new();
let (num_traversed, next_key) = hamt
.for_each_ranged::<BytesKey, _>(None, Some(page_size), |k, v| {
kvs_variable_page.push((k.clone(), *v));
Ok(())
})
.unwrap();

assert_eq!(num_traversed, page_size);
assert_eq!(kvs_variable_page.len(), num_traversed);
assert_eq!(next_key.unwrap(), kvs[page_size].0);

// Items iterated over should match the ordering of for_each
assert_eq!(kvs_variable_page, kvs[..page_size]);
}

// Iterate through the array, requesting more items than are remaining
let (num_traversed, next_key) = hamt
.for_each_ranged::<BytesKey, _>(None, Some(RANGE + 10), |_k, _v| Ok(()))
.unwrap();
assert_eq!(num_traversed, RANGE);
assert_eq!(next_key, None);

// Iterate through it again starting at a certain key
for start_at in 0..RANGE as usize {
let mut kvs_variable_start = Vec::new();
let (num_traversed, next_key) = hamt
.for_each_ranged(Some(&kvs[start_at].0), None, |k, v| {
assert_eq!(k, &tstring(v));
kvs_variable_start.push((k.clone(), *v));

Ok(())
})
.unwrap();

// No limit specified, iteration should be exhaustive
assert_eq!(next_key, None);
assert_eq!(num_traversed, kvs_variable_start.len());
assert_eq!(kvs_variable_start.len(), kvs.len() - start_at,);

// Items iterated over should match the ordering of for_each
assert_eq!(kvs_variable_start, kvs[start_at..]);
}

// Chain paginated requests to iterate over entire HAMT
{
let mut kvs_paginated_requests = Vec::new();
let mut iterations = 0;
let mut cursor: Option<BytesKey> = None;

// Request all items in pages of 20 items each
const PAGE_SIZE: usize = 20;
loop {
let (page_size, next) = match cursor {
Some(ref start) => hamt
.for_each_ranged::<BytesKey, _>(Some(start), Some(PAGE_SIZE), |k, v| {
kvs_paginated_requests.push((k.clone(), *v));
Ok(())
})
.unwrap(),
None => hamt
.for_each_ranged::<BytesKey, _>(None, Some(PAGE_SIZE), |k, v| {
kvs_paginated_requests.push((k.clone(), *v));
Ok(())
})
.unwrap(),
};
iterations += 1;
assert_eq!(page_size, PAGE_SIZE);
assert_eq!(kvs_paginated_requests.len(), iterations * PAGE_SIZE);

if next.is_none() {
break;
} else {
assert_eq!(next.clone().unwrap(), kvs[(iterations * PAGE_SIZE)].0);
cursor = next;
}
}

// should have retrieved all key value pairs in the same order
assert_eq!(kvs_paginated_requests.len(), kvs.len(), "{}", iterations);
assert_eq!(kvs_paginated_requests, kvs);
// should have used the expected number of iterations
assert_eq!(iterations, RANGE / PAGE_SIZE);
}

let c = hamt.flush().unwrap();
cids.check_next(c);

// Chain paginated requests over a HAMT with committed nodes
let mut hamt: Hamt<_, usize> = factory.load_with_bit_width(&c, &store, 5).unwrap();
{
let mut kvs_paginated_requests = Vec::new();
let mut iterations = 0;
let mut cursor: Option<BytesKey> = None;

// Request all items in pages of 20 items each
const PAGE_SIZE: usize = 20;
loop {
let (page_size, next) = match cursor {
Some(ref start) => hamt
.for_each_ranged::<BytesKey, _>(Some(start), Some(PAGE_SIZE), |k, v| {
kvs_paginated_requests.push((k.clone(), *v));
Ok(())
})
.unwrap(),
None => hamt
.for_each_ranged::<BytesKey, _>(None, Some(PAGE_SIZE), |k, v| {
kvs_paginated_requests.push((k.clone(), *v));
Ok(())
})
.unwrap(),
};
iterations += 1;
assert_eq!(page_size, PAGE_SIZE);
assert_eq!(kvs_paginated_requests.len(), iterations * PAGE_SIZE);

if next.is_none() {
break;
} else {
assert_eq!(next.clone().unwrap(), kvs[(iterations * PAGE_SIZE)].0);
cursor = next;
}
}

// should have retrieved all key value pairs in the same order
assert_eq!(kvs_paginated_requests.len(), kvs.len(), "{}", iterations);
assert_eq!(kvs_paginated_requests, kvs);
// should have used the expected number of iterations
assert_eq!(iterations, RANGE / PAGE_SIZE);
}

let c = hamt.flush().unwrap();
cids.check_next(c);

if let Some(stats) = stats {
assert_eq!(*store.stats.borrow(), stats);
}
}

#[cfg(feature = "identity")]
fn add_and_remove_keys(
bit_width: u32,
Expand Down Expand Up @@ -823,6 +985,17 @@ mod test_default {
super::for_each(HamtFactory::default(), Some(stats), cids);
}

#[test]
fn for_each_ranged() {
#[rustfmt::skip]
let stats = BSStats {r: 30, w: 30, br: 2895, bw: 2895};
let cids = CidChecker::new(vec![
"bafy2bzacedy4ypl2vedhdqep3llnwko6vrtfiys5flciz2f3c55pl4whlhlqm",
"bafy2bzacedy4ypl2vedhdqep3llnwko6vrtfiys5flciz2f3c55pl4whlhlqm",
]);
super::for_each_ranged(HamtFactory::default(), Some(stats), cids);
}

#[test]
fn clean_child_ordering() {
#[rustfmt::skip]
Expand Down

0 comments on commit 31e7c61

Please sign in to comment.