From 9d1ee07c7059c94da046833bfd355fcadef0555c Mon Sep 17 00:00:00 2001 From: David Estes <5317198+dav1do@users.noreply.github.com> Date: Tue, 18 Jun 2024 12:58:29 -0600 Subject: [PATCH] refactor: modify store API and rename some things (#390) * refactor: rename entity DeliveredEventRow * refactor: rename some things, fix don't update deliverable flag on deliverable events * refactor: change the insert event response returns more info than simply whether the event was new --- service/src/event/ordering_task.rs | 2 +- service/src/event/service.rs | 39 +++++++--- service/src/event/store.rs | 22 ++++-- service/src/tests/mod.rs | 2 +- service/src/tests/ordering.rs | 12 +-- store/src/lib.rs | 4 +- store/src/metrics.rs | 4 +- store/src/sql/access/event.rs | 104 +++++++++++++++++++++----- store/src/sql/access/mod.rs | 2 +- store/src/sql/entities/event.rs | 26 +++++-- store/src/sql/entities/event_block.rs | 4 + store/src/sql/entities/hash.rs | 2 +- store/src/sql/entities/mod.rs | 2 +- store/src/sql/entities/utils.rs | 4 +- store/src/sql/mod.rs | 5 +- store/src/sql/query.rs | 8 +- store/src/sql/test.rs | 12 +-- 17 files changed, 185 insertions(+), 69 deletions(-) diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index dfe611684..9956e44f9 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -312,7 +312,7 @@ impl OrderingState { // once we find the first event that's deliverable, we can go back through and find the rest continue; } else { - let (exists, delivered) = CeramicOneEvent::delivered_by_cid(pool, &prev).await?; + let (exists, delivered) = CeramicOneEvent::deliverable_by_cid(pool, &prev).await?; if delivered { trace!(deliverable=?ev_cid, "Found delivered prev in database. Adding to ready list"); deliverable.push_back(ev_cid); diff --git a/service/src/event/service.rs b/service/src/event/service.rs index daddd3531..c5ffb6b57 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -5,7 +5,7 @@ use ceramic_event::unvalidated; use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool}; use cid::Cid; use ipld_core::ipld::Ipld; -use recon::{InsertResult, ReconItem}; +use recon::ReconItem; use tracing::{trace, warn}; use super::ordering_task::{ @@ -126,10 +126,10 @@ impl CeramicEventService { /// This is likely used in API contexts when a user is trying to insert events. Events discovered from /// peers can come in any order and we will discover the prev chain over time. Use /// `insert_events_from_carfiles_remote_history` for that case. - pub(crate) async fn insert_events_from_carfiles_local_history<'a>( + pub(crate) async fn insert_events_from_carfiles_local_api<'a>( &self, items: &[recon::ReconItem<'a, EventId>], - ) -> Result { + ) -> Result { if items.is_empty() { return Ok(InsertResult::default()); } @@ -144,19 +144,31 @@ impl CeramicEventService { /// This is used in recon contexts when we are discovering events from peers in a recon but not ceramic order and /// don't have the complete order. To enforce that the history is local, e.g. in API contexts, use /// `insert_events_from_carfiles_local_history`. - pub(crate) async fn insert_events_from_carfiles_remote_history<'a>( + pub(crate) async fn insert_events_from_carfiles_recon<'a>( &self, items: &[recon::ReconItem<'a, EventId>], ) -> Result { if items.is_empty() { - return Ok(InsertResult::default()); + return Ok(recon::InsertResult::default()); } let ordering = InsertEventOrdering::discover_deliverable_remote_history(items).await?; - self.process_events(ordering).await + let res = self.process_events(ordering).await?; + // we need to put things back in the right order that the recon trait expects, even though we don't really care about the result + let mut keys = vec![false; items.len()]; + for (i, item) in items.iter().enumerate() { + let new_key = res + .store_result + .inserted + .iter() + .find(|e| e.order_key == *item.key) + .map_or(false, |e| e.new_key); // TODO: should we error if it's not in this set + keys[i] = new_key; + } + Ok(recon::InsertResult::new(keys)) } - async fn process_events(&self, ordering: InsertEventOrdering) -> Result { + async fn process_events(&self, ordering: InsertEventOrdering) -> Result { let res = CeramicOneEvent::insert_many(&self.pool, &ordering.insert_now[..]).await?; for ev in ordering.background_task_deliverable { @@ -194,7 +206,10 @@ impl CeramicEventService { } } } - Ok(res) + Ok(InsertResult { + store_result: res, + missing_history: vec![], + }) } } @@ -285,7 +300,7 @@ impl InsertEventOrdering { } fn mark_event_deliverable_now(&mut self, mut ev: EventInsertable, init_cid: Cid) { - ev.deliverable(true); + ev.set_deliverable(true); self.notify_task_new .push(DeliveredEvent::new(ev.body.cid, init_cid)); self.insert_now.push(ev); @@ -374,3 +389,9 @@ impl InsertEventOrdering { } } } + +#[derive(Debug, PartialEq, Eq, Default)] +pub struct InsertResult { + pub(crate) store_result: ceramic_store::InsertResult, + pub(crate) missing_history: Vec, +} diff --git a/service/src/event/store.rs b/service/src/event/store.rs index e61506bcf..51ae5ac22 100644 --- a/service/src/event/store.rs +++ b/service/src/event/store.rs @@ -5,7 +5,7 @@ use ceramic_core::EventId; use ceramic_store::{CeramicOneBlock, CeramicOneEvent}; use cid::Cid; use iroh_bitswap::Block; -use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a}; +use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; use crate::event::CeramicEventService; @@ -16,7 +16,7 @@ impl recon::Store for CeramicEventService { async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { let res = self - .insert_events_from_carfiles_remote_history(&[item.to_owned()]) + .insert_events_from_carfiles_recon(&[item.to_owned()]) .await?; Ok(res.keys.first().copied().unwrap_or(false)) @@ -25,10 +25,11 @@ impl recon::Store for CeramicEventService { /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> ReconResult { - let res = self - .insert_events_from_carfiles_remote_history(items) - .await?; + async fn insert_many( + &self, + items: &[ReconItem<'_, Self::Key>], + ) -> ReconResult { + let res = self.insert_events_from_carfiles_recon(items).await?; Ok(res) } @@ -112,9 +113,14 @@ impl ceramic_api::EventStore for CeramicEventService { .map(|(key, val)| ReconItem::new(key, val.as_slice())) .collect::>(); let res = self - .insert_events_from_carfiles_local_history(&items[..]) + .insert_events_from_carfiles_local_api(&items[..]) .await?; - Ok(res.keys) + Ok(res + .store_result + .inserted + .iter() + .map(|r| r.new_key) + .collect()) } async fn range_with_values( diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index de36fb97a..644d3081b 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -129,7 +129,7 @@ pub(crate) async fn check_deliverable( cid: &Cid, deliverable: bool, ) { - let (exists, delivered) = ceramic_store::CeramicOneEvent::delivered_by_cid(pool, cid) + let (exists, delivered) = ceramic_store::CeramicOneEvent::deliverable_by_cid(pool, cid) .await .unwrap(); assert!(exists); diff --git a/service/src/tests/ordering.rs b/service/src/tests/ordering.rs index 8e103173f..976e9b1cd 100644 --- a/service/src/tests/ordering.rs +++ b/service/src/tests/ordering.rs @@ -19,7 +19,7 @@ async fn setup_service() -> CeramicEventService { async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { let new = store - .insert_events_from_carfiles_remote_history(&[item]) + .insert_events_from_carfiles_recon(&[item]) .await .unwrap(); let new = new.keys.into_iter().filter(|k| *k).count(); @@ -28,10 +28,10 @@ async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: Recon async fn add_and_assert_new_local_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { let new = store - .insert_events_from_carfiles_local_history(&[item]) + .insert_events_from_carfiles_local_api(&[item]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(1, new); } @@ -51,7 +51,7 @@ async fn test_missing_prev_error_history_required() { let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ReconItem::new(&data.0, &data.1)]) + .insert_events_from_carfiles_local_api(&[ReconItem::new(&data.0, &data.1)]) .await; match new { Ok(v) => panic!("should have errored: {:?}", v), @@ -100,13 +100,13 @@ async fn test_prev_in_same_write_history_required() { let init: &(EventId, Vec) = &events[0]; let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ + .insert_events_from_carfiles_local_api(&[ ReconItem::new(&data.0, &data.1), ReconItem::new(&init.0, &init.1), ]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(2, new); check_deliverable(&store.pool, &init.0.cid().unwrap(), true).await; check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await; diff --git a/store/src/lib.rs b/store/src/lib.rs index 217c8751d..39747ffbd 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -10,8 +10,8 @@ pub use error::Error; pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use sql::{ entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent, - CeramicOneEventBlock, CeramicOneInterest, Migrations, SqlitePool, SqliteRootStore, - SqliteTransaction, + CeramicOneEventBlock, CeramicOneInterest, InsertResult, InsertedEvent, Migrations, SqlitePool, + SqliteRootStore, SqliteTransaction, }; pub(crate) type Result = std::result::Result; diff --git a/store/src/metrics.rs b/store/src/metrics.rs index 855d4e960..be8442e72 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -13,7 +13,7 @@ use prometheus_client::{ }, registry::Registry, }; -use recon::{AssociativeHash, HashCount, InsertResult, ReconItem, Result as ReconResult}; +use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult}; use tokio::time::Instant; #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -253,7 +253,7 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { + async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 88014e1c9..eaa0a9e33 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -7,12 +7,12 @@ use std::{ use anyhow::anyhow; use ceramic_core::{event_id::InvalidEventId, EventId}; use cid::Cid; -use recon::{AssociativeHash, HashCount, InsertResult, Key, Result as ReconResult, Sha256a}; +use recon::{AssociativeHash, HashCount, Key, Result as ReconResult, Sha256a}; use crate::{ sql::{ entities::{ - rebuild_car, BlockRow, CountRow, DeliveredEvent, EventInsertable, OrderKey, + rebuild_car, BlockRow, CountRow, DeliveredEventRow, EventInsertable, OrderKey, ReconEventBlockRaw, ReconHash, }, query::{EventQuery, ReconQuery, ReconType, SqlBackend}, @@ -23,17 +23,73 @@ use crate::{ static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0); +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event that was inserted into the database +pub struct InsertedEvent { + /// The event order key that was inserted + pub order_key: EventId, + /// Whether the event was marked as deliverable + pub deliverable: bool, + /// Whether the event was a new key + pub new_key: bool, +} + +impl InsertedEvent { + /// Create a new delivered event + fn new(order_key: EventId, new_key: bool, deliverable: bool) -> Self { + Self { + order_key, + deliverable, + new_key, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +/// The result of inserting events into the database +pub struct InsertResult { + /// The events that were marked as delivered in this batch + pub inserted: Vec, +} + +impl InsertResult { + /// The count of new keys added in this batch + pub fn count_new_keys(&self) -> usize { + self.inserted.iter().filter(|e| e.new_key).count() + } +} + +impl InsertResult { + fn new(inserted: Vec) -> Self { + Self { inserted } + } +} + /// Access to the ceramic event table and related logic pub struct CeramicOneEvent {} impl CeramicOneEvent { - async fn insert_key(tx: &mut SqliteTransaction<'_>, key: &EventId) -> Result { + fn next_deliverable() -> i64 { + GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst) + } + + /// Insert the event and its hash into the ceramic_one_event table + async fn insert_event( + tx: &mut SqliteTransaction<'_>, + key: &EventId, + deliverable: bool, + ) -> Result { let id = key.as_bytes(); let cid = key .cid() .map(|cid| cid.to_bytes()) .ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))?; let hash = Sha256a::digest(key); + let delivered: Option = if deliverable { + Some(Self::next_deliverable()) + } else { + None + }; let resp = sqlx::query(ReconQuery::insert_event()) .bind(id) @@ -46,6 +102,7 @@ impl CeramicOneEvent { .bind(hash.as_u32s()[5]) .bind(hash.as_u32s()[6]) .bind(hash.as_u32s()[7]) + .bind(delivered) .execute(&mut **tx.inner()) .await; @@ -87,7 +144,7 @@ impl CeramicOneEvent { pub async fn mark_ready_to_deliver(conn: &mut SqliteTransaction<'_>, key: &Cid) -> Result<()> { // Fetch add happens with an open transaction (on one writer for the db) so we're guaranteed to get a unique value sqlx::query(EventQuery::mark_ready_to_deliver()) - .bind(GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst)) + .bind(Self::next_deliverable()) .bind(&key.to_bytes()) .execute(&mut **conn.inner()) .await?; @@ -95,29 +152,41 @@ impl CeramicOneEvent { Ok(()) } - /// Insert many events into the database. This is the main function to use when storing events. + /// Insert many events into the database. The events and their blocks and metadata are inserted in a single + /// transaction and either all successful or rolled back. + /// + /// IMPORTANT: + /// It is the caller's responsibility to order events marked deliverable correctly. + /// That is, events will be processed in the order they are given so earlier events are given a lower global ordering + /// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events + /// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers. pub async fn insert_many( pool: &SqlitePool, to_add: &[EventInsertable], ) -> Result { - let mut new_keys = vec![false; to_add.len()]; + let mut inserted = Vec::with_capacity(to_add.len()); let mut tx = pool.begin_tx().await.map_err(Error::from)?; - for (idx, item) in to_add.iter().enumerate() { - let new_key = Self::insert_key(&mut tx, &item.order_key).await?; + for item in to_add { + let new_key = Self::insert_event(&mut tx, &item.order_key, item.deliverable()).await?; + inserted.push(InsertedEvent::new( + item.order_key.clone(), + new_key, + item.deliverable(), + )); if new_key { - for block in item.body.blocks.iter() { + for block in item.blocks().iter() { CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; CeramicOneEventBlock::insert(&mut tx, block).await?; } } - if item.body.deliverable { - Self::mark_ready_to_deliver(&mut tx, &item.body.cid).await?; + // the item already existed so we didn't mark it as deliverable on insert + if !new_key && item.deliverable() { + Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?; } - new_keys[idx] = new_key; } tx.commit().await.map_err(Error::from)?; - let res = InsertResult::new(new_keys); + let res = InsertResult::new(inserted); Ok(res) } @@ -220,13 +289,13 @@ impl CeramicOneEvent { delivered: i64, limit: i64, ) -> Result<(i64, Vec)> { - let rows: Vec = sqlx::query_as(EventQuery::new_delivered_events()) + let rows: Vec = sqlx::query_as(EventQuery::new_delivered_events()) .bind(delivered) .bind(limit) .fetch_all(pool.reader()) .await?; - DeliveredEvent::parse_query_results(delivered, rows) + DeliveredEventRow::parse_query_results(delivered, rows) } /// Finds the event data by a given EventId i.e. "order key". @@ -248,8 +317,9 @@ impl CeramicOneEvent { } /// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered. - /// (bool, bool) = (exists, delivered) - pub async fn delivered_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { + /// returns (bool, bool) = (exists, deliverable) + /// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could. + pub async fn deliverable_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { #[derive(sqlx::FromRow)] struct CidExists { exists: bool, diff --git a/store/src/sql/access/mod.rs b/store/src/sql/access/mod.rs index 912a462ac..84a9131ff 100644 --- a/store/src/sql/access/mod.rs +++ b/store/src/sql/access/mod.rs @@ -4,6 +4,6 @@ mod event_block; mod interest; pub use block::CeramicOneBlock; -pub use event::CeramicOneEvent; +pub use event::{CeramicOneEvent, InsertResult, InsertedEvent}; pub use event_block::CeramicOneEventBlock; pub use interest::CeramicOneInterest; diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 32610eb12..fb8ae1692 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -62,8 +62,24 @@ impl EventInsertable { Ok(Self { order_key, body }) } - /// change the deliverable status of the event - pub fn deliverable(&mut self, deliverable: bool) { + /// Get the CID of the event + pub fn cid(&self) -> Cid { + self.body.cid + } + + /// Whether this event is deliverable currently + pub fn deliverable(&self) -> bool { + self.body.deliverable + } + + /// Whether this event is deliverable currently + pub fn blocks(&self) -> &Vec { + &self.body.blocks + } + + /// Mark the event as deliverable. + /// This will be used when inserting the event to make sure the field is updated accordingly. + pub fn set_deliverable(&mut self, deliverable: bool) { self.body.deliverable = deliverable; } } @@ -82,10 +98,10 @@ pub struct EventInsertableBody { impl EventInsertableBody { /// Create a new EventInsertRaw struct. Deliverable is set to false by default. - pub fn new(cid: Cid, blocks: Vec) -> Self { + pub fn new(cid: Cid, blocks: Vec, deliverable: bool) -> Self { Self { cid, - deliverable: false, + deliverable, blocks, } } @@ -136,6 +152,6 @@ impl EventInsertableBody { blocks.push(ebr); idx += 1; } - Ok(Self::new(event_cid, blocks)) + Ok(Self::new(event_cid, blocks, false)) } } diff --git a/store/src/sql/entities/event_block.rs b/store/src/sql/entities/event_block.rs index e40698a69..58b80f2f6 100644 --- a/store/src/sql/entities/event_block.rs +++ b/store/src/sql/entities/event_block.rs @@ -134,4 +134,8 @@ impl EventBlockRaw { bytes, }) } + + pub fn cid(&self) -> Cid { + Cid::new_v1(self.codec as u64, self.multihash.clone().into_inner()) + } } diff --git a/store/src/sql/entities/hash.rs b/store/src/sql/entities/hash.rs index 926e6af1e..a914b808f 100644 --- a/store/src/sql/entities/hash.rs +++ b/store/src/sql/entities/hash.rs @@ -4,7 +4,7 @@ use sqlx::{sqlite::SqliteRow, Row as _}; use crate::{Error, Result}; -#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::Type)] +#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)] pub struct BlockHash(Multihash<64>); impl BlockHash { diff --git a/store/src/sql/entities/mod.rs b/store/src/sql/entities/mod.rs index 8103296af..9b8219104 100644 --- a/store/src/sql/entities/mod.rs +++ b/store/src/sql/entities/mod.rs @@ -9,4 +9,4 @@ pub use event::{rebuild_car, EventInsertable, EventInsertableBody}; pub use event_block::{EventBlockRaw, ReconEventBlockRaw}; pub use hash::{BlockHash, ReconHash}; -pub use utils::{CountRow, DeliveredEvent, OrderKey}; +pub use utils::{CountRow, DeliveredEventRow, OrderKey}; diff --git a/store/src/sql/entities/utils.rs b/store/src/sql/entities/utils.rs index d8efc5f5a..3b6555607 100644 --- a/store/src/sql/entities/utils.rs +++ b/store/src/sql/entities/utils.rs @@ -24,12 +24,12 @@ impl TryFrom for EventId { } #[derive(sqlx::FromRow)] -pub struct DeliveredEvent { +pub struct DeliveredEventRow { pub cid: Vec, pub new_highwater_mark: i64, } -impl DeliveredEvent { +impl DeliveredEventRow { /// assumes rows are sorted by `delivered` ascending pub fn parse_query_results(current: i64, rows: Vec) -> Result<(i64, Vec)> { let max: i64 = rows.last().map_or(current, |r| r.new_highwater_mark + 1); diff --git a/store/src/sql/mod.rs b/store/src/sql/mod.rs index 884b04b9b..437de753a 100644 --- a/store/src/sql/mod.rs +++ b/store/src/sql/mod.rs @@ -6,7 +6,10 @@ mod sqlite; #[cfg(test)] mod test; -pub use access::{CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest}; +pub use access::{ + CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, InsertResult, + InsertedEvent, +}; pub use root::SqliteRootStore; pub use sqlite::{SqlitePool, SqliteTransaction}; diff --git a/store/src/sql/query.rs b/store/src/sql/query.rs index 9282e079c..fc652dfcf 100644 --- a/store/src/sql/query.rs +++ b/store/src/sql/query.rs @@ -121,7 +121,7 @@ impl EventQuery { /// Updates the delivered column in the event table so it can be set to the client pub fn mark_ready_to_deliver() -> &'static str { - "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2;" + "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2 and delivered is NULL;" } } @@ -167,11 +167,13 @@ impl ReconQuery { "INSERT INTO ceramic_one_event ( order_key, cid, ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7 + ahash_4, ahash_5, ahash_6, ahash_7, + delivered ) VALUES ( $1, $2, $3, $4, $5, $6, - $7, $8, $9, $10 + $7, $8, $9, $10, + $11 );" } diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index c246d29ce..c8beaeaf1 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -31,11 +31,7 @@ fn random_event(cid: &str) -> EventInsertable { let cid = order_key.cid().unwrap(); EventInsertable { order_key, - body: EventInsertableBody { - cid, - deliverable: false, - blocks: vec![], - }, + body: EventInsertableBody::new(cid, vec![], true), } } @@ -49,8 +45,7 @@ async fn hash_range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let hash = CeramicOneEvent::hash_range( &pool, @@ -71,8 +66,7 @@ async fn range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let ids = CeramicOneEvent::range( &pool,