Skip to content

Commit

Permalink
refactor: change the insert event response
Browse files Browse the repository at this point in the history
returns more info than simply whether the event was new
  • Loading branch information
dav1do committed Jun 18, 2024
1 parent 554eec0 commit 7ef1343
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 52 deletions.
37 changes: 29 additions & 8 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ceramic_event::unvalidated;
use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool};
use cid::Cid;
use ipld_core::ipld::Ipld;
use recon::{InsertResult, ReconItem};
use recon::ReconItem;
use tracing::{trace, warn};

use super::ordering_task::{
Expand Down Expand Up @@ -126,10 +126,10 @@ impl CeramicEventService {
/// This is likely used in API contexts when a user is trying to insert events. Events discovered from
/// peers can come in any order and we will discover the prev chain over time. Use
/// `insert_events_from_carfiles_remote_history` for that case.
pub(crate) async fn insert_events_from_carfiles_local_history<'a>(
pub(crate) async fn insert_events_from_carfiles_local_api<'a>(
&self,
items: &[recon::ReconItem<'a, EventId>],
) -> Result<recon::InsertResult> {
) -> Result<InsertResult> {
if items.is_empty() {
return Ok(InsertResult::default());
}
Expand All @@ -144,19 +144,31 @@ impl CeramicEventService {
/// This is used in recon contexts when we are discovering events from peers in a recon but not ceramic order and
/// don't have the complete order. To enforce that the history is local, e.g. in API contexts, use
/// `insert_events_from_carfiles_local_history`.
pub(crate) async fn insert_events_from_carfiles_remote_history<'a>(
pub(crate) async fn insert_events_from_carfiles_recon<'a>(
&self,
items: &[recon::ReconItem<'a, EventId>],
) -> Result<recon::InsertResult> {
if items.is_empty() {
return Ok(InsertResult::default());
return Ok(recon::InsertResult::default());
}

let ordering = InsertEventOrdering::discover_deliverable_remote_history(items).await?;
self.process_events(ordering).await
let res = self.process_events(ordering).await?;
// we need to put things back in the right order that the recon trait expects, even though we don't really care about the result
let mut keys = vec![false; items.len()];
for (i, item) in items.iter().enumerate() {
let new_key = res
.store_result
.inserted
.iter()
.find(|e| e.order_key == *item.key)
.map_or(false, |e| e.new_key); // TODO: should we error if it's not in this set
keys[i] = new_key;
}
Ok(recon::InsertResult::new(keys))
}

async fn process_events(&self, ordering: InsertEventOrdering) -> Result<recon::InsertResult> {
async fn process_events(&self, ordering: InsertEventOrdering) -> Result<InsertResult> {
let res = CeramicOneEvent::insert_many(&self.pool, &ordering.insert_now[..]).await?;

for ev in ordering.background_task_deliverable {
Expand Down Expand Up @@ -194,7 +206,10 @@ impl CeramicEventService {
}
}
}
Ok(res)
Ok(InsertResult {
store_result: res,
missing_history: vec![],
})
}
}

Expand Down Expand Up @@ -374,3 +389,9 @@ impl InsertEventOrdering {
}
}
}

#[derive(Debug, PartialEq, Eq, Default)]
pub struct InsertResult {
pub(crate) store_result: ceramic_store::InsertResult,
pub(crate) missing_history: Vec<EventId>,
}
22 changes: 14 additions & 8 deletions service/src/event/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ceramic_core::EventId;
use ceramic_store::{CeramicOneBlock, CeramicOneEvent};
use cid::Cid;
use iroh_bitswap::Block;
use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a};
use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a};

use crate::event::CeramicEventService;

Expand All @@ -16,7 +16,7 @@ impl recon::Store for CeramicEventService {

async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult<bool> {
let res = self
.insert_events_from_carfiles_remote_history(&[item.to_owned()])
.insert_events_from_carfiles_recon(&[item.to_owned()])
.await?;

Ok(res.keys.first().copied().unwrap_or(false))
Expand All @@ -25,10 +25,11 @@ impl recon::Store for CeramicEventService {
/// Insert new keys into the key space.
/// Returns true for each key if it did not previously exist, in the
/// same order as the input iterator.
async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> ReconResult<InsertResult> {
let res = self
.insert_events_from_carfiles_remote_history(items)
.await?;
async fn insert_many(
&self,
items: &[ReconItem<'_, Self::Key>],
) -> ReconResult<recon::InsertResult> {
let res = self.insert_events_from_carfiles_recon(items).await?;
Ok(res)
}

Expand Down Expand Up @@ -112,9 +113,14 @@ impl ceramic_api::EventStore for CeramicEventService {
.map(|(key, val)| ReconItem::new(key, val.as_slice()))
.collect::<Vec<_>>();
let res = self
.insert_events_from_carfiles_local_history(&items[..])
.insert_events_from_carfiles_local_api(&items[..])
.await?;
Ok(res.keys)
Ok(res
.store_result
.inserted
.iter()
.map(|r| r.new_key)
.collect())
}

async fn range_with_values(
Expand Down
12 changes: 6 additions & 6 deletions service/src/tests/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn setup_service() -> CeramicEventService {

async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) {
let new = store
.insert_events_from_carfiles_remote_history(&[item])
.insert_events_from_carfiles_recon(&[item])
.await
.unwrap();
let new = new.keys.into_iter().filter(|k| *k).count();
Expand All @@ -28,10 +28,10 @@ async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: Recon

async fn add_and_assert_new_local_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) {
let new = store
.insert_events_from_carfiles_local_history(&[item])
.insert_events_from_carfiles_local_api(&[item])
.await
.unwrap();
let new = new.keys.into_iter().filter(|k| *k).count();
let new = new.store_result.count_new_keys();
assert_eq!(1, new);
}

Expand All @@ -51,7 +51,7 @@ async fn test_missing_prev_error_history_required() {
let data = &events[1];

let new = store
.insert_events_from_carfiles_local_history(&[ReconItem::new(&data.0, &data.1)])
.insert_events_from_carfiles_local_api(&[ReconItem::new(&data.0, &data.1)])
.await;
match new {
Ok(v) => panic!("should have errored: {:?}", v),
Expand Down Expand Up @@ -100,13 +100,13 @@ async fn test_prev_in_same_write_history_required() {
let init: &(EventId, Vec<u8>) = &events[0];
let data = &events[1];
let new = store
.insert_events_from_carfiles_local_history(&[
.insert_events_from_carfiles_local_api(&[
ReconItem::new(&data.0, &data.1),
ReconItem::new(&init.0, &init.1),
])
.await
.unwrap();
let new = new.keys.into_iter().filter(|k| *k).count();
let new = new.store_result.count_new_keys();
assert_eq!(2, new);
check_deliverable(&store.pool, &init.0.cid().unwrap(), true).await;
check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await;
Expand Down
4 changes: 2 additions & 2 deletions store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub use error::Error;
pub use metrics::{Metrics, StoreMetricsMiddleware};
pub use sql::{
entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent,
CeramicOneEventBlock, CeramicOneInterest, Migrations, SqlitePool, SqliteRootStore,
SqliteTransaction,
CeramicOneEventBlock, CeramicOneInterest, InsertResult, InsertedEvent, Migrations, SqlitePool,
SqliteRootStore, SqliteTransaction,
};

pub(crate) type Result<T> = std::result::Result<T, Error>;
4 changes: 2 additions & 2 deletions store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use prometheus_client::{
},
registry::Registry,
};
use recon::{AssociativeHash, HashCount, InsertResult, ReconItem, Result as ReconResult};
use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult};
use tokio::time::Instant;

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
Expand Down Expand Up @@ -253,7 +253,7 @@ where
Ok(new)
}

async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult<InsertResult> {
async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult<recon::InsertResult> {
let res = StoreMetricsMiddleware::<S>::record(
&self.metrics,
"insert_many",
Expand Down
75 changes: 64 additions & 11 deletions store/src/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use anyhow::anyhow;
use ceramic_core::{event_id::InvalidEventId, EventId};
use cid::Cid;
use recon::{AssociativeHash, HashCount, InsertResult, Key, Result as ReconResult, Sha256a};
use recon::{AssociativeHash, HashCount, Key, Result as ReconResult, Sha256a};

use crate::{
sql::{
Expand All @@ -23,6 +23,48 @@ use crate::{

static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0);

#[derive(Debug, Clone, PartialEq, Eq)]
/// An event that was inserted into the database
pub struct InsertedEvent {
/// The event order key that was inserted
pub order_key: EventId,
/// Whether the event was marked as deliverable
pub deliverable: bool,
/// Whether the event was a new key
pub new_key: bool,
}

impl InsertedEvent {
/// Create a new delivered event
fn new(order_key: EventId, new_key: bool, deliverable: bool) -> Self {
Self {
order_key,
deliverable,
new_key,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
/// The result of inserting events into the database
pub struct InsertResult {
/// The events that were marked as delivered in this batch
pub inserted: Vec<InsertedEvent>,
}

impl InsertResult {
/// The count of new keys added in this batch
pub fn count_new_keys(&self) -> usize {
self.inserted.iter().filter(|e| e.new_key).count()
}
}

impl InsertResult {
fn new(inserted: Vec<InsertedEvent>) -> Self {
Self { inserted }
}
}

/// Access to the ceramic event table and related logic
pub struct CeramicOneEvent {}

Expand Down Expand Up @@ -110,30 +152,41 @@ impl CeramicOneEvent {
Ok(())
}

/// Insert many events into the database. This is the main function to use when storing events.
/// Insert many events into the database. The events and their blocks and metadata are inserted in a single
/// transaction and either all successful or rolled back.
///
/// IMPORTANT:
/// It is the caller's responsibility to order events marked deliverable correctly.
/// That is, events will be processed in the order they are given so earlier events are given a lower global ordering
/// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events
/// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers.
pub async fn insert_many(
pool: &SqlitePool,
to_add: &[EventInsertable],
) -> Result<InsertResult> {
let mut new_keys = vec![false; to_add.len()];
let mut inserted = Vec::with_capacity(to_add.len());
let mut tx = pool.begin_tx().await.map_err(Error::from)?;

for (idx, item) in to_add.iter().enumerate() {
let new_key =
Self::insert_event(&mut tx, &item.order_key, item.body.deliverable).await?;
for item in to_add {
let new_key = Self::insert_event(&mut tx, &item.order_key, item.deliverable()).await?;
inserted.push(InsertedEvent::new(
item.order_key.clone(),
new_key,
item.deliverable(),
));
if new_key {
for block in item.body.blocks.iter() {
for block in item.blocks().iter() {
CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?;
CeramicOneEventBlock::insert(&mut tx, block).await?;
}
}
if !new_key && item.body.deliverable {
Self::mark_ready_to_deliver(&mut tx, &item.body.cid).await?;
// the item already existed so we didn't mark it as deliverable on insert
if !new_key && item.deliverable() {
Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?;
}
new_keys[idx] = new_key;
}
tx.commit().await.map_err(Error::from)?;
let res = InsertResult::new(new_keys);
let res = InsertResult::new(inserted);

Ok(res)
}
Expand Down
2 changes: 1 addition & 1 deletion store/src/sql/access/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ mod event_block;
mod interest;

pub use block::CeramicOneBlock;
pub use event::CeramicOneEvent;
pub use event::{CeramicOneEvent, InsertResult, InsertedEvent};
pub use event_block::CeramicOneEventBlock;
pub use interest::CeramicOneInterest;
24 changes: 20 additions & 4 deletions store/src/sql/entities/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,23 @@ impl EventInsertable {
Ok(Self { order_key, body })
}

/// change the deliverable status of the event
/// Get the CID of the event
pub fn cid(&self) -> Cid {
self.body.cid
}

/// Whether this event is deliverable currently
pub fn deliverable(&self) -> bool {
self.body.deliverable
}

/// Whether this event is deliverable currently
pub fn blocks(&self) -> &Vec<EventBlockRaw> {
&self.body.blocks
}

/// Mark the event as deliverable.
/// This will be used when inserting the event to make sure the field is updated accordingly.
pub fn set_deliverable(&mut self, deliverable: bool) {
self.body.deliverable = deliverable;
}
Expand All @@ -82,10 +98,10 @@ pub struct EventInsertableBody {

impl EventInsertableBody {
/// Create a new EventInsertRaw struct. Deliverable is set to false by default.
pub fn new(cid: Cid, blocks: Vec<EventBlockRaw>) -> Self {
pub fn new(cid: Cid, blocks: Vec<EventBlockRaw>, deliverable: bool) -> Self {
Self {
cid,
deliverable: false,
deliverable,
blocks,
}
}
Expand Down Expand Up @@ -136,6 +152,6 @@ impl EventInsertableBody {
blocks.push(ebr);
idx += 1;
}
Ok(Self::new(event_cid, blocks))
Ok(Self::new(event_cid, blocks, false))
}
}
5 changes: 4 additions & 1 deletion store/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ mod sqlite;
#[cfg(test)]
mod test;

pub use access::{CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest};
pub use access::{
CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, InsertResult,
InsertedEvent,
};
pub use root::SqliteRootStore;
pub use sqlite::{SqlitePool, SqliteTransaction};

Expand Down
Loading

0 comments on commit 7ef1343

Please sign in to comment.