Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use an Iterator for insert_many and other clean up #399

Merged
merged 5 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions recon/src/libp2p/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ where
type Key = K;
type Hash = H;

async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult<bool> {
async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> ReconResult<bool> {
self.as_error()?;

self.inner.insert(item).await
}

async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult<InsertResult> {
async fn insert_many<'a>(&self, items: &[ReconItem<'a, K>]) -> ReconResult<InsertResult> {
self.as_error()?;

self.inner.insert_many(items).await
Expand Down
8 changes: 4 additions & 4 deletions recon/src/recon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,12 @@ pub trait Store {

/// Insert a new key into the key space. Returns true if the key did not exist.
/// The value will be updated if included
async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> Result<bool>;
async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> Result<bool>;

/// Insert new keys into the key space.
/// Returns true for each key if it did not previously exist, in the
/// same order as the input iterator.
async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> Result<InsertResult>;
async fn insert_many<'a>(&self, items: &[ReconItem<'a, Self::Key>]) -> Result<InsertResult>;

/// Return the hash of all keys in the range between left_fencepost and right_fencepost.
/// The upper range bound is exclusive.
Expand Down Expand Up @@ -575,11 +575,11 @@ where
type Key = K;
type Hash = H;

async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> Result<bool> {
async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> Result<bool> {
self.as_ref().insert(item).await
}

async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> Result<InsertResult> {
async fn insert_many<'a>(&self, items: &[ReconItem<'a, Self::Key>]) -> Result<InsertResult> {
self.as_ref().insert_many(items).await
}

Expand Down
4 changes: 2 additions & 2 deletions recon/src/recon/btreestore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ where
type Key = K;
type Hash = H;

async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> Result<bool> {
async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> Result<bool> {
let mut inner = self.inner.lock().await;
let new = inner
.keys
Expand All @@ -143,7 +143,7 @@ where
Ok(new)
}

async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> Result<InsertResult> {
async fn insert_many<'a>(&self, items: &[ReconItem<'a, K>]) -> Result<InsertResult> {
let mut new = vec![false; items.len()];
for (idx, item) in items.iter().enumerate() {
new[idx] = self.insert(item).await?;
Expand Down
2 changes: 1 addition & 1 deletion service/src/event/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Migrator {
Ok(())
}
async fn write_batch(&mut self, sql_pool: &SqlitePool) -> Result<()> {
CeramicOneEvent::insert_many(sql_pool, &self.batch).await?;
CeramicOneEvent::insert_many(sql_pool, self.batch.iter()).await?;
self.event_count += self.batch.len();
self.batch.truncate(0);
Ok(())
Expand Down
114 changes: 65 additions & 49 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,18 @@ use crate::Result;
use super::service::EventMetadata;

pub(crate) struct OrderEvents {
pub(crate) deliverable: Vec<(EventInsertable, EventMetadata)>,
pub(crate) missing_history: Vec<(EventInsertable, EventMetadata)>,
deliverable: Vec<(EventInsertable, EventMetadata)>,
missing_history: Vec<(EventInsertable, EventMetadata)>,
}

impl OrderEvents {
pub fn deliverable(&self) -> &[(EventInsertable, EventMetadata)] {
&self.deliverable
}

pub fn missing_history(&self) -> &[(EventInsertable, EventMetadata)] {
&self.missing_history
}
}

impl OrderEvents {
Expand All @@ -28,11 +38,14 @@ impl OrderEvents {
pool: &SqlitePool,
mut candidate_events: Vec<(EventInsertable, EventMetadata)>,
) -> Result<Self> {
let mut new_cids: HashMap<Cid, bool> = HashMap::from_iter(
candidate_events
.iter()
.map(|(e, _)| (e.cid(), e.deliverable())),
);
let mut new_cids: HashMap<Cid, bool> =
HashMap::from_iter(candidate_events.iter_mut().map(|(e, meta)| {
// all init events are deliverable so we mark them as such before we do anything else
if matches!(meta, EventMetadata::Init { .. }) {
e.body.set_deliverable(true);
}
(e.cid(), e.deliverable())
}));
let mut deliverable = Vec::with_capacity(candidate_events.len());
candidate_events.retain(|(e, h)| {
if e.deliverable() {
Expand Down Expand Up @@ -150,6 +163,43 @@ mod test {
(stream_1, stream_2, to_insert)
}

/// Asserts the events are deliverable and returns IDs for events in stream_1 as the first value and things in stream_2 as the second
fn split_deliverable_order_by_stream(
stream_1: &[(EventId, Vec<u8>)],
stream_2: &[(EventId, Vec<u8>)],
events: &[(EventInsertable, EventMetadata)],
) -> (Vec<EventId>, Vec<EventId>) {
let mut after_1 = Vec::with_capacity(stream_1.len());
let mut after_2 = Vec::with_capacity(stream_2.len());
for (event, _) in events {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.0 == event.order_key) {
after_1.push(event.order_key.clone());
} else {
after_2.push(event.order_key.clone());
}
}

(after_1, after_2)
}

async fn get_insertable_events(
events: &[(EventId, Vec<u8>)],
) -> Vec<(EventInsertable, EventMetadata)> {
let mut insertable = Vec::with_capacity(events.len());
for event in events {
let new = CeramicEventService::validate_discovered_event(
event.0.to_owned(),
event.1.as_slice(),
)
.await
.unwrap();
insertable.push(new);
}

insertable
}

#[test(tokio::test)]
async fn out_of_order_streams_valid() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
Expand All @@ -164,16 +214,8 @@ mod test {
ordered.missing_history.len(),
ordered.missing_history
);
let mut after_1 = Vec::with_capacity(10);
let mut after_2 = Vec::with_capacity(10);
for (event, _) in ordered.deliverable {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.0 == event.order_key) {
after_1.push(event.order_key.clone());
} else {
after_2.push(event.order_key.clone());
}
}
let (after_1, after_2) =
split_deliverable_order_by_stream(&stream_1, &stream_2, ordered.deliverable());

assert_eq!(
stream_1.into_iter().map(|e| e.0).collect::<Vec<_>>(),
Expand Down Expand Up @@ -201,16 +243,8 @@ mod test {
"Missing history: {:?}",
ordered.missing_history
);
let mut after_1 = Vec::with_capacity(10);
let mut after_2 = Vec::with_capacity(10);
for (event, _) in ordered.deliverable {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.0 == event.order_key) {
after_1.push(event.order_key.clone());
} else {
after_2.push(event.order_key.clone());
}
}
let (after_1, after_2) =
split_deliverable_order_by_stream(&stream_1, &stream_2, ordered.deliverable());

assert_eq!(vec![stream_1[0].0.clone()], after_1);
assert_eq!(
Expand All @@ -227,23 +261,14 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();

let stream_1 = get_n_events(10).await;
let mut insertable = Vec::with_capacity(10);
for event in stream_1.iter() {
let new = CeramicEventService::validate_discovered_event(
event.0.to_owned(),
event.1.as_slice(),
)
.await
.unwrap();
insertable.push(new);
}
let insertable = get_insertable_events(&stream_1).await;
let to_insert = insertable
.iter()
.take(3)
.map(|(i, _)| i.clone())
.collect::<Vec<_>>();
let mut remaining = insertable.into_iter().skip(3).collect::<Vec<_>>();
CeramicOneEvent::insert_many(&pool, &to_insert[..])
CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();

Expand All @@ -265,16 +290,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();

let stream_1 = get_n_events(10).await;
let mut insertable = Vec::with_capacity(10);
for event in stream_1.iter() {
let new = CeramicEventService::validate_discovered_event(
event.0.to_owned(),
event.1.as_slice(),
)
.await
.unwrap();
insertable.push(new);
}
let mut insertable = get_insertable_events(&stream_1).await;
let to_insert = insertable
.iter_mut()
.take(3)
Expand All @@ -284,7 +300,7 @@ mod test {
})
.collect::<Vec<_>>();
let mut remaining = insertable.into_iter().skip(3).collect::<Vec<_>>();
CeramicOneEvent::insert_many(&pool, &to_insert[..])
CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();

Expand Down
13 changes: 8 additions & 5 deletions service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ impl StreamEvents {
/// Returns `false` if we have more work to do and should be retained for future processing
fn processing_completed(&mut self) -> bool {
// if we're done, we don't need to bother cleaning up since we get dropped
if !self
if self
.cid_map
.iter()
.any(|(_, ev)| matches!(ev, StreamEvent::Undelivered(_)))
.all(|(_, ev)| !matches!(ev, StreamEvent::Undelivered(_)))
{
true
} else {
Expand Down Expand Up @@ -636,17 +636,20 @@ mod test {

async fn insert_10_with_9_undelivered(pool: &SqlitePool) {
let insertable = get_n_insertable_events(10).await;
let init = insertable.first().unwrap().to_owned();
let mut init = insertable.first().unwrap().to_owned();
init.body.set_deliverable(true);
let undelivered = insertable.into_iter().skip(1).collect::<Vec<_>>();

let new = CeramicOneEvent::insert_many(pool, &undelivered[..])
let new = CeramicOneEvent::insert_many(pool, undelivered.iter())
.await
.unwrap();

assert_eq!(9, new.inserted.len());
assert_eq!(0, new.inserted.iter().filter(|e| e.deliverable).count());

let new = CeramicOneEvent::insert_many(pool, &[init]).await.unwrap();
let new = CeramicOneEvent::insert_many(pool, [&init].into_iter())
.await
.unwrap();
assert_eq!(1, new.inserted.len());
assert_eq!(1, new.inserted.iter().filter(|e| e.deliverable).count());
}
Expand Down
Loading
Loading