From 3e7f25312f8bc8fa02e75fff0217e48a04658a91 Mon Sep 17 00:00:00 2001 From: David Estes Date: Mon, 17 Jun 2024 15:34:26 -0600 Subject: [PATCH 1/3] refactor: rename entity DeliveredEventRow --- store/src/sql/access/event.rs | 6 +++--- store/src/sql/entities/mod.rs | 2 +- store/src/sql/entities/utils.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 88014e1c9..fd7909db9 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -12,7 +12,7 @@ use recon::{AssociativeHash, HashCount, InsertResult, Key, Result as ReconResult use crate::{ sql::{ entities::{ - rebuild_car, BlockRow, CountRow, DeliveredEvent, EventInsertable, OrderKey, + rebuild_car, BlockRow, CountRow, DeliveredEventRow, EventInsertable, OrderKey, ReconEventBlockRaw, ReconHash, }, query::{EventQuery, ReconQuery, ReconType, SqlBackend}, @@ -220,13 +220,13 @@ impl CeramicOneEvent { delivered: i64, limit: i64, ) -> Result<(i64, Vec)> { - let rows: Vec = sqlx::query_as(EventQuery::new_delivered_events()) + let rows: Vec = sqlx::query_as(EventQuery::new_delivered_events()) .bind(delivered) .bind(limit) .fetch_all(pool.reader()) .await?; - DeliveredEvent::parse_query_results(delivered, rows) + DeliveredEventRow::parse_query_results(delivered, rows) } /// Finds the event data by a given EventId i.e. "order key". diff --git a/store/src/sql/entities/mod.rs b/store/src/sql/entities/mod.rs index 8103296af..9b8219104 100644 --- a/store/src/sql/entities/mod.rs +++ b/store/src/sql/entities/mod.rs @@ -9,4 +9,4 @@ pub use event::{rebuild_car, EventInsertable, EventInsertableBody}; pub use event_block::{EventBlockRaw, ReconEventBlockRaw}; pub use hash::{BlockHash, ReconHash}; -pub use utils::{CountRow, DeliveredEvent, OrderKey}; +pub use utils::{CountRow, DeliveredEventRow, OrderKey}; diff --git a/store/src/sql/entities/utils.rs b/store/src/sql/entities/utils.rs index d8efc5f5a..3b6555607 100644 --- a/store/src/sql/entities/utils.rs +++ b/store/src/sql/entities/utils.rs @@ -24,12 +24,12 @@ impl TryFrom for EventId { } #[derive(sqlx::FromRow)] -pub struct DeliveredEvent { +pub struct DeliveredEventRow { pub cid: Vec, pub new_highwater_mark: i64, } -impl DeliveredEvent { +impl DeliveredEventRow { /// assumes rows are sorted by `delivered` ascending pub fn parse_query_results(current: i64, rows: Vec) -> Result<(i64, Vec)> { let max: i64 = rows.last().map_or(current, |r| r.new_highwater_mark + 1); From 554eec04f4904e10a41458adc357cc244f12f919 Mon Sep 17 00:00:00 2001 From: David Estes Date: Mon, 17 Jun 2024 17:43:41 -0600 Subject: [PATCH 2/3] refactor: rename some things, fix don't update deliverable flag on deliverable events --- service/src/event/ordering_task.rs | 2 +- service/src/event/service.rs | 2 +- service/src/tests/mod.rs | 2 +- store/src/sql/access/event.rs | 29 +++++++++++++++++++++------ store/src/sql/entities/event.rs | 2 +- store/src/sql/entities/event_block.rs | 4 ++++ store/src/sql/entities/hash.rs | 2 +- store/src/sql/query.rs | 8 +++++--- 8 files changed, 37 insertions(+), 14 deletions(-) diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index a1e13f204..9c7d4a0a7 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -312,7 +312,7 @@ impl OrderingState { // once we find the first event that's deliverable, we can go back through and find the rest continue; } else { - let (exists, delivered) = CeramicOneEvent::delivered_by_cid(pool, &prev).await?; + let (exists, delivered) = CeramicOneEvent::deliverable_by_cid(pool, &prev).await?; if delivered { trace!(deliverable=?ev_cid, "Found delivered prev in database. Adding to ready list"); deliverable.push_back(ev_cid); diff --git a/service/src/event/service.rs b/service/src/event/service.rs index daddd3531..973efd4ed 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -285,7 +285,7 @@ impl InsertEventOrdering { } fn mark_event_deliverable_now(&mut self, mut ev: EventInsertable, init_cid: Cid) { - ev.deliverable(true); + ev.set_deliverable(true); self.notify_task_new .push(DeliveredEvent::new(ev.body.cid, init_cid)); self.insert_now.push(ev); diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index de36fb97a..644d3081b 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -129,7 +129,7 @@ pub(crate) async fn check_deliverable( cid: &Cid, deliverable: bool, ) { - let (exists, delivered) = ceramic_store::CeramicOneEvent::delivered_by_cid(pool, cid) + let (exists, delivered) = ceramic_store::CeramicOneEvent::deliverable_by_cid(pool, cid) .await .unwrap(); assert!(exists); diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index fd7909db9..2e9d055de 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -27,13 +27,27 @@ static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0); pub struct CeramicOneEvent {} impl CeramicOneEvent { - async fn insert_key(tx: &mut SqliteTransaction<'_>, key: &EventId) -> Result { + fn next_deliverable() -> i64 { + GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst) + } + + /// Insert the event and its hash into the ceramic_one_event table + async fn insert_event( + tx: &mut SqliteTransaction<'_>, + key: &EventId, + deliverable: bool, + ) -> Result { let id = key.as_bytes(); let cid = key .cid() .map(|cid| cid.to_bytes()) .ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))?; let hash = Sha256a::digest(key); + let delivered: Option = if deliverable { + Some(Self::next_deliverable()) + } else { + None + }; let resp = sqlx::query(ReconQuery::insert_event()) .bind(id) @@ -46,6 +60,7 @@ impl CeramicOneEvent { .bind(hash.as_u32s()[5]) .bind(hash.as_u32s()[6]) .bind(hash.as_u32s()[7]) + .bind(delivered) .execute(&mut **tx.inner()) .await; @@ -87,7 +102,7 @@ impl CeramicOneEvent { pub async fn mark_ready_to_deliver(conn: &mut SqliteTransaction<'_>, key: &Cid) -> Result<()> { // Fetch add happens with an open transaction (on one writer for the db) so we're guaranteed to get a unique value sqlx::query(EventQuery::mark_ready_to_deliver()) - .bind(GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst)) + .bind(Self::next_deliverable()) .bind(&key.to_bytes()) .execute(&mut **conn.inner()) .await?; @@ -104,14 +119,15 @@ impl CeramicOneEvent { let mut tx = pool.begin_tx().await.map_err(Error::from)?; for (idx, item) in to_add.iter().enumerate() { - let new_key = Self::insert_key(&mut tx, &item.order_key).await?; + let new_key = + Self::insert_event(&mut tx, &item.order_key, item.body.deliverable).await?; if new_key { for block in item.body.blocks.iter() { CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; CeramicOneEventBlock::insert(&mut tx, block).await?; } } - if item.body.deliverable { + if !new_key && item.body.deliverable { Self::mark_ready_to_deliver(&mut tx, &item.body.cid).await?; } new_keys[idx] = new_key; @@ -248,8 +264,9 @@ impl CeramicOneEvent { } /// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered. - /// (bool, bool) = (exists, delivered) - pub async fn delivered_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { + /// returns (bool, bool) = (exists, deliverable) + /// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could. + pub async fn deliverable_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { #[derive(sqlx::FromRow)] struct CidExists { exists: bool, diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 32610eb12..005e45b1c 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -63,7 +63,7 @@ impl EventInsertable { } /// change the deliverable status of the event - pub fn deliverable(&mut self, deliverable: bool) { + pub fn set_deliverable(&mut self, deliverable: bool) { self.body.deliverable = deliverable; } } diff --git a/store/src/sql/entities/event_block.rs b/store/src/sql/entities/event_block.rs index e40698a69..58b80f2f6 100644 --- a/store/src/sql/entities/event_block.rs +++ b/store/src/sql/entities/event_block.rs @@ -134,4 +134,8 @@ impl EventBlockRaw { bytes, }) } + + pub fn cid(&self) -> Cid { + Cid::new_v1(self.codec as u64, self.multihash.clone().into_inner()) + } } diff --git a/store/src/sql/entities/hash.rs b/store/src/sql/entities/hash.rs index 926e6af1e..a914b808f 100644 --- a/store/src/sql/entities/hash.rs +++ b/store/src/sql/entities/hash.rs @@ -4,7 +4,7 @@ use sqlx::{sqlite::SqliteRow, Row as _}; use crate::{Error, Result}; -#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::Type)] +#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)] pub struct BlockHash(Multihash<64>); impl BlockHash { diff --git a/store/src/sql/query.rs b/store/src/sql/query.rs index 9282e079c..fc652dfcf 100644 --- a/store/src/sql/query.rs +++ b/store/src/sql/query.rs @@ -121,7 +121,7 @@ impl EventQuery { /// Updates the delivered column in the event table so it can be set to the client pub fn mark_ready_to_deliver() -> &'static str { - "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2;" + "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2 and delivered is NULL;" } } @@ -167,11 +167,13 @@ impl ReconQuery { "INSERT INTO ceramic_one_event ( order_key, cid, ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7 + ahash_4, ahash_5, ahash_6, ahash_7, + delivered ) VALUES ( $1, $2, $3, $4, $5, $6, - $7, $8, $9, $10 + $7, $8, $9, $10, + $11 );" } From 7ef1343563a4f6726d3cdd55a9eac689a5780121 Mon Sep 17 00:00:00 2001 From: David Estes Date: Tue, 18 Jun 2024 09:03:55 -0600 Subject: [PATCH 3/3] refactor: change the insert event response returns more info than simply whether the event was new --- service/src/event/service.rs | 37 ++++++++++++---- service/src/event/store.rs | 22 ++++++---- service/src/tests/ordering.rs | 12 +++--- store/src/lib.rs | 4 +- store/src/metrics.rs | 4 +- store/src/sql/access/event.rs | 75 ++++++++++++++++++++++++++++----- store/src/sql/access/mod.rs | 2 +- store/src/sql/entities/event.rs | 24 +++++++++-- store/src/sql/mod.rs | 5 ++- store/src/sql/test.rs | 12 ++---- 10 files changed, 145 insertions(+), 52 deletions(-) diff --git a/service/src/event/service.rs b/service/src/event/service.rs index 973efd4ed..c5ffb6b57 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -5,7 +5,7 @@ use ceramic_event::unvalidated; use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool}; use cid::Cid; use ipld_core::ipld::Ipld; -use recon::{InsertResult, ReconItem}; +use recon::ReconItem; use tracing::{trace, warn}; use super::ordering_task::{ @@ -126,10 +126,10 @@ impl CeramicEventService { /// This is likely used in API contexts when a user is trying to insert events. Events discovered from /// peers can come in any order and we will discover the prev chain over time. Use /// `insert_events_from_carfiles_remote_history` for that case. - pub(crate) async fn insert_events_from_carfiles_local_history<'a>( + pub(crate) async fn insert_events_from_carfiles_local_api<'a>( &self, items: &[recon::ReconItem<'a, EventId>], - ) -> Result { + ) -> Result { if items.is_empty() { return Ok(InsertResult::default()); } @@ -144,19 +144,31 @@ impl CeramicEventService { /// This is used in recon contexts when we are discovering events from peers in a recon but not ceramic order and /// don't have the complete order. To enforce that the history is local, e.g. in API contexts, use /// `insert_events_from_carfiles_local_history`. - pub(crate) async fn insert_events_from_carfiles_remote_history<'a>( + pub(crate) async fn insert_events_from_carfiles_recon<'a>( &self, items: &[recon::ReconItem<'a, EventId>], ) -> Result { if items.is_empty() { - return Ok(InsertResult::default()); + return Ok(recon::InsertResult::default()); } let ordering = InsertEventOrdering::discover_deliverable_remote_history(items).await?; - self.process_events(ordering).await + let res = self.process_events(ordering).await?; + // we need to put things back in the right order that the recon trait expects, even though we don't really care about the result + let mut keys = vec![false; items.len()]; + for (i, item) in items.iter().enumerate() { + let new_key = res + .store_result + .inserted + .iter() + .find(|e| e.order_key == *item.key) + .map_or(false, |e| e.new_key); // TODO: should we error if it's not in this set + keys[i] = new_key; + } + Ok(recon::InsertResult::new(keys)) } - async fn process_events(&self, ordering: InsertEventOrdering) -> Result { + async fn process_events(&self, ordering: InsertEventOrdering) -> Result { let res = CeramicOneEvent::insert_many(&self.pool, &ordering.insert_now[..]).await?; for ev in ordering.background_task_deliverable { @@ -194,7 +206,10 @@ impl CeramicEventService { } } } - Ok(res) + Ok(InsertResult { + store_result: res, + missing_history: vec![], + }) } } @@ -374,3 +389,9 @@ impl InsertEventOrdering { } } } + +#[derive(Debug, PartialEq, Eq, Default)] +pub struct InsertResult { + pub(crate) store_result: ceramic_store::InsertResult, + pub(crate) missing_history: Vec, +} diff --git a/service/src/event/store.rs b/service/src/event/store.rs index e61506bcf..51ae5ac22 100644 --- a/service/src/event/store.rs +++ b/service/src/event/store.rs @@ -5,7 +5,7 @@ use ceramic_core::EventId; use ceramic_store::{CeramicOneBlock, CeramicOneEvent}; use cid::Cid; use iroh_bitswap::Block; -use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a}; +use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; use crate::event::CeramicEventService; @@ -16,7 +16,7 @@ impl recon::Store for CeramicEventService { async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { let res = self - .insert_events_from_carfiles_remote_history(&[item.to_owned()]) + .insert_events_from_carfiles_recon(&[item.to_owned()]) .await?; Ok(res.keys.first().copied().unwrap_or(false)) @@ -25,10 +25,11 @@ impl recon::Store for CeramicEventService { /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> ReconResult { - let res = self - .insert_events_from_carfiles_remote_history(items) - .await?; + async fn insert_many( + &self, + items: &[ReconItem<'_, Self::Key>], + ) -> ReconResult { + let res = self.insert_events_from_carfiles_recon(items).await?; Ok(res) } @@ -112,9 +113,14 @@ impl ceramic_api::EventStore for CeramicEventService { .map(|(key, val)| ReconItem::new(key, val.as_slice())) .collect::>(); let res = self - .insert_events_from_carfiles_local_history(&items[..]) + .insert_events_from_carfiles_local_api(&items[..]) .await?; - Ok(res.keys) + Ok(res + .store_result + .inserted + .iter() + .map(|r| r.new_key) + .collect()) } async fn range_with_values( diff --git a/service/src/tests/ordering.rs b/service/src/tests/ordering.rs index ddcbc7f23..1c6b2616c 100644 --- a/service/src/tests/ordering.rs +++ b/service/src/tests/ordering.rs @@ -19,7 +19,7 @@ async fn setup_service() -> CeramicEventService { async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { let new = store - .insert_events_from_carfiles_remote_history(&[item]) + .insert_events_from_carfiles_recon(&[item]) .await .unwrap(); let new = new.keys.into_iter().filter(|k| *k).count(); @@ -28,10 +28,10 @@ async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: Recon async fn add_and_assert_new_local_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { let new = store - .insert_events_from_carfiles_local_history(&[item]) + .insert_events_from_carfiles_local_api(&[item]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(1, new); } @@ -51,7 +51,7 @@ async fn test_missing_prev_error_history_required() { let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ReconItem::new(&data.0, &data.1)]) + .insert_events_from_carfiles_local_api(&[ReconItem::new(&data.0, &data.1)]) .await; match new { Ok(v) => panic!("should have errored: {:?}", v), @@ -100,13 +100,13 @@ async fn test_prev_in_same_write_history_required() { let init: &(EventId, Vec) = &events[0]; let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ + .insert_events_from_carfiles_local_api(&[ ReconItem::new(&data.0, &data.1), ReconItem::new(&init.0, &init.1), ]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(2, new); check_deliverable(&store.pool, &init.0.cid().unwrap(), true).await; check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await; diff --git a/store/src/lib.rs b/store/src/lib.rs index 217c8751d..39747ffbd 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -10,8 +10,8 @@ pub use error::Error; pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use sql::{ entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent, - CeramicOneEventBlock, CeramicOneInterest, Migrations, SqlitePool, SqliteRootStore, - SqliteTransaction, + CeramicOneEventBlock, CeramicOneInterest, InsertResult, InsertedEvent, Migrations, SqlitePool, + SqliteRootStore, SqliteTransaction, }; pub(crate) type Result = std::result::Result; diff --git a/store/src/metrics.rs b/store/src/metrics.rs index 855d4e960..be8442e72 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -13,7 +13,7 @@ use prometheus_client::{ }, registry::Registry, }; -use recon::{AssociativeHash, HashCount, InsertResult, ReconItem, Result as ReconResult}; +use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult}; use tokio::time::Instant; #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -253,7 +253,7 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { + async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 2e9d055de..eaa0a9e33 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::anyhow; use ceramic_core::{event_id::InvalidEventId, EventId}; use cid::Cid; -use recon::{AssociativeHash, HashCount, InsertResult, Key, Result as ReconResult, Sha256a}; +use recon::{AssociativeHash, HashCount, Key, Result as ReconResult, Sha256a}; use crate::{ sql::{ @@ -23,6 +23,48 @@ use crate::{ static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0); +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event that was inserted into the database +pub struct InsertedEvent { + /// The event order key that was inserted + pub order_key: EventId, + /// Whether the event was marked as deliverable + pub deliverable: bool, + /// Whether the event was a new key + pub new_key: bool, +} + +impl InsertedEvent { + /// Create a new delivered event + fn new(order_key: EventId, new_key: bool, deliverable: bool) -> Self { + Self { + order_key, + deliverable, + new_key, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +/// The result of inserting events into the database +pub struct InsertResult { + /// The events that were marked as delivered in this batch + pub inserted: Vec, +} + +impl InsertResult { + /// The count of new keys added in this batch + pub fn count_new_keys(&self) -> usize { + self.inserted.iter().filter(|e| e.new_key).count() + } +} + +impl InsertResult { + fn new(inserted: Vec) -> Self { + Self { inserted } + } +} + /// Access to the ceramic event table and related logic pub struct CeramicOneEvent {} @@ -110,30 +152,41 @@ impl CeramicOneEvent { Ok(()) } - /// Insert many events into the database. This is the main function to use when storing events. + /// Insert many events into the database. The events and their blocks and metadata are inserted in a single + /// transaction and either all successful or rolled back. + /// + /// IMPORTANT: + /// It is the caller's responsibility to order events marked deliverable correctly. + /// That is, events will be processed in the order they are given so earlier events are given a lower global ordering + /// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events + /// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers. pub async fn insert_many( pool: &SqlitePool, to_add: &[EventInsertable], ) -> Result { - let mut new_keys = vec![false; to_add.len()]; + let mut inserted = Vec::with_capacity(to_add.len()); let mut tx = pool.begin_tx().await.map_err(Error::from)?; - for (idx, item) in to_add.iter().enumerate() { - let new_key = - Self::insert_event(&mut tx, &item.order_key, item.body.deliverable).await?; + for item in to_add { + let new_key = Self::insert_event(&mut tx, &item.order_key, item.deliverable()).await?; + inserted.push(InsertedEvent::new( + item.order_key.clone(), + new_key, + item.deliverable(), + )); if new_key { - for block in item.body.blocks.iter() { + for block in item.blocks().iter() { CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; CeramicOneEventBlock::insert(&mut tx, block).await?; } } - if !new_key && item.body.deliverable { - Self::mark_ready_to_deliver(&mut tx, &item.body.cid).await?; + // the item already existed so we didn't mark it as deliverable on insert + if !new_key && item.deliverable() { + Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?; } - new_keys[idx] = new_key; } tx.commit().await.map_err(Error::from)?; - let res = InsertResult::new(new_keys); + let res = InsertResult::new(inserted); Ok(res) } diff --git a/store/src/sql/access/mod.rs b/store/src/sql/access/mod.rs index 912a462ac..84a9131ff 100644 --- a/store/src/sql/access/mod.rs +++ b/store/src/sql/access/mod.rs @@ -4,6 +4,6 @@ mod event_block; mod interest; pub use block::CeramicOneBlock; -pub use event::CeramicOneEvent; +pub use event::{CeramicOneEvent, InsertResult, InsertedEvent}; pub use event_block::CeramicOneEventBlock; pub use interest::CeramicOneInterest; diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 005e45b1c..fb8ae1692 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -62,7 +62,23 @@ impl EventInsertable { Ok(Self { order_key, body }) } - /// change the deliverable status of the event + /// Get the CID of the event + pub fn cid(&self) -> Cid { + self.body.cid + } + + /// Whether this event is deliverable currently + pub fn deliverable(&self) -> bool { + self.body.deliverable + } + + /// Whether this event is deliverable currently + pub fn blocks(&self) -> &Vec { + &self.body.blocks + } + + /// Mark the event as deliverable. + /// This will be used when inserting the event to make sure the field is updated accordingly. pub fn set_deliverable(&mut self, deliverable: bool) { self.body.deliverable = deliverable; } @@ -82,10 +98,10 @@ pub struct EventInsertableBody { impl EventInsertableBody { /// Create a new EventInsertRaw struct. Deliverable is set to false by default. - pub fn new(cid: Cid, blocks: Vec) -> Self { + pub fn new(cid: Cid, blocks: Vec, deliverable: bool) -> Self { Self { cid, - deliverable: false, + deliverable, blocks, } } @@ -136,6 +152,6 @@ impl EventInsertableBody { blocks.push(ebr); idx += 1; } - Ok(Self::new(event_cid, blocks)) + Ok(Self::new(event_cid, blocks, false)) } } diff --git a/store/src/sql/mod.rs b/store/src/sql/mod.rs index 884b04b9b..437de753a 100644 --- a/store/src/sql/mod.rs +++ b/store/src/sql/mod.rs @@ -6,7 +6,10 @@ mod sqlite; #[cfg(test)] mod test; -pub use access::{CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest}; +pub use access::{ + CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, InsertResult, + InsertedEvent, +}; pub use root::SqliteRootStore; pub use sqlite::{SqlitePool, SqliteTransaction}; diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index d0e55c777..ff9ad45a6 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -30,11 +30,7 @@ fn random_event(cid: &str) -> EventInsertable { let cid = order_key.cid().unwrap(); EventInsertable { order_key, - body: EventInsertableBody { - cid, - deliverable: false, - blocks: vec![], - }, + body: EventInsertableBody::new(cid, vec![], true), } } @@ -48,8 +44,7 @@ async fn hash_range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let hash = CeramicOneEvent::hash_range( &pool, @@ -70,8 +65,7 @@ async fn range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let ids = CeramicOneEvent::range( &pool,