Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: modify store API and rename some things #390

Merged
merged 3 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl OrderingState {
// once we find the first event that's deliverable, we can go back through and find the rest
continue;
} else {
let (exists, delivered) = CeramicOneEvent::delivered_by_cid(pool, &prev).await?;
let (exists, delivered) = CeramicOneEvent::deliverable_by_cid(pool, &prev).await?;
if delivered {
trace!(deliverable=?ev_cid, "Found delivered prev in database. Adding to ready list");
deliverable.push_back(ev_cid);
Expand Down
39 changes: 30 additions & 9 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ceramic_event::unvalidated;
use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool};
use cid::Cid;
use ipld_core::ipld::Ipld;
use recon::{InsertResult, ReconItem};
use recon::ReconItem;
use tracing::{trace, warn};

use super::ordering_task::{
Expand Down Expand Up @@ -126,10 +126,10 @@ impl CeramicEventService {
/// This is likely used in API contexts when a user is trying to insert events. Events discovered from
/// peers can come in any order and we will discover the prev chain over time. Use
/// `insert_events_from_carfiles_remote_history` for that case.
pub(crate) async fn insert_events_from_carfiles_local_history<'a>(
pub(crate) async fn insert_events_from_carfiles_local_api<'a>(
&self,
items: &[recon::ReconItem<'a, EventId>],
) -> Result<recon::InsertResult> {
) -> Result<InsertResult> {
if items.is_empty() {
return Ok(InsertResult::default());
}
Expand All @@ -144,19 +144,31 @@ impl CeramicEventService {
/// This is used in recon contexts when we are discovering events from peers in a recon but not ceramic order and
/// don't have the complete order. To enforce that the history is local, e.g. in API contexts, use
/// `insert_events_from_carfiles_local_history`.
pub(crate) async fn insert_events_from_carfiles_remote_history<'a>(
pub(crate) async fn insert_events_from_carfiles_recon<'a>(
&self,
items: &[recon::ReconItem<'a, EventId>],
) -> Result<recon::InsertResult> {
if items.is_empty() {
return Ok(InsertResult::default());
return Ok(recon::InsertResult::default());
}

let ordering = InsertEventOrdering::discover_deliverable_remote_history(items).await?;
self.process_events(ordering).await
let res = self.process_events(ordering).await?;
// we need to put things back in the right order that the recon trait expects, even though we don't really care about the result
let mut keys = vec![false; items.len()];
for (i, item) in items.iter().enumerate() {
let new_key = res
.store_result
.inserted
.iter()
.find(|e| e.order_key == *item.key)
.map_or(false, |e| e.new_key); // TODO: should we error if it's not in this set
keys[i] = new_key;
}
Ok(recon::InsertResult::new(keys))
}

async fn process_events(&self, ordering: InsertEventOrdering) -> Result<recon::InsertResult> {
async fn process_events(&self, ordering: InsertEventOrdering) -> Result<InsertResult> {
let res = CeramicOneEvent::insert_many(&self.pool, &ordering.insert_now[..]).await?;

for ev in ordering.background_task_deliverable {
Expand Down Expand Up @@ -194,7 +206,10 @@ impl CeramicEventService {
}
}
}
Ok(res)
Ok(InsertResult {
store_result: res,
missing_history: vec![],
})
}
}

Expand Down Expand Up @@ -285,7 +300,7 @@ impl InsertEventOrdering {
}

fn mark_event_deliverable_now(&mut self, mut ev: EventInsertable, init_cid: Cid) {
ev.deliverable(true);
ev.set_deliverable(true);
self.notify_task_new
.push(DeliveredEvent::new(ev.body.cid, init_cid));
self.insert_now.push(ev);
Expand Down Expand Up @@ -374,3 +389,9 @@ impl InsertEventOrdering {
}
}
}

#[derive(Debug, PartialEq, Eq, Default)]
pub struct InsertResult {
pub(crate) store_result: ceramic_store::InsertResult,
pub(crate) missing_history: Vec<EventId>,
}
22 changes: 14 additions & 8 deletions service/src/event/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ceramic_core::EventId;
use ceramic_store::{CeramicOneBlock, CeramicOneEvent};
use cid::Cid;
use iroh_bitswap::Block;
use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a};
use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a};

use crate::event::CeramicEventService;

Expand All @@ -16,7 +16,7 @@ impl recon::Store for CeramicEventService {

async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult<bool> {
let res = self
.insert_events_from_carfiles_remote_history(&[item.to_owned()])
.insert_events_from_carfiles_recon(&[item.to_owned()])
.await?;

Ok(res.keys.first().copied().unwrap_or(false))
Expand All @@ -25,10 +25,11 @@ impl recon::Store for CeramicEventService {
/// Insert new keys into the key space.
/// Returns true for each key if it did not previously exist, in the
/// same order as the input iterator.
async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> ReconResult<InsertResult> {
let res = self
.insert_events_from_carfiles_remote_history(items)
.await?;
async fn insert_many(
&self,
items: &[ReconItem<'_, Self::Key>],
) -> ReconResult<recon::InsertResult> {
let res = self.insert_events_from_carfiles_recon(items).await?;
Ok(res)
}

Expand Down Expand Up @@ -112,9 +113,14 @@ impl ceramic_api::EventStore for CeramicEventService {
.map(|(key, val)| ReconItem::new(key, val.as_slice()))
.collect::<Vec<_>>();
let res = self
.insert_events_from_carfiles_local_history(&items[..])
.insert_events_from_carfiles_local_api(&items[..])
.await?;
Ok(res.keys)
Ok(res
.store_result
.inserted
.iter()
.map(|r| r.new_key)
.collect())
}

async fn range_with_values(
Expand Down
2 changes: 1 addition & 1 deletion service/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub(crate) async fn check_deliverable(
cid: &Cid,
deliverable: bool,
) {
let (exists, delivered) = ceramic_store::CeramicOneEvent::delivered_by_cid(pool, cid)
let (exists, delivered) = ceramic_store::CeramicOneEvent::deliverable_by_cid(pool, cid)
.await
.unwrap();
assert!(exists);
Expand Down
12 changes: 6 additions & 6 deletions service/src/tests/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn setup_service() -> CeramicEventService {

async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) {
let new = store
.insert_events_from_carfiles_remote_history(&[item])
.insert_events_from_carfiles_recon(&[item])
.await
.unwrap();
let new = new.keys.into_iter().filter(|k| *k).count();
Expand All @@ -28,10 +28,10 @@ async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: Recon

async fn add_and_assert_new_local_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) {
let new = store
.insert_events_from_carfiles_local_history(&[item])
.insert_events_from_carfiles_local_api(&[item])
.await
.unwrap();
let new = new.keys.into_iter().filter(|k| *k).count();
let new = new.store_result.count_new_keys();
assert_eq!(1, new);
}

Expand All @@ -51,7 +51,7 @@ async fn test_missing_prev_error_history_required() {
let data = &events[1];

let new = store
.insert_events_from_carfiles_local_history(&[ReconItem::new(&data.0, &data.1)])
.insert_events_from_carfiles_local_api(&[ReconItem::new(&data.0, &data.1)])
.await;
match new {
Ok(v) => panic!("should have errored: {:?}", v),
Expand Down Expand Up @@ -100,13 +100,13 @@ async fn test_prev_in_same_write_history_required() {
let init: &(EventId, Vec<u8>) = &events[0];
let data = &events[1];
let new = store
.insert_events_from_carfiles_local_history(&[
.insert_events_from_carfiles_local_api(&[
ReconItem::new(&data.0, &data.1),
ReconItem::new(&init.0, &init.1),
])
.await
.unwrap();
let new = new.keys.into_iter().filter(|k| *k).count();
let new = new.store_result.count_new_keys();
assert_eq!(2, new);
check_deliverable(&store.pool, &init.0.cid().unwrap(), true).await;
check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await;
Expand Down
4 changes: 2 additions & 2 deletions store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub use error::Error;
pub use metrics::{Metrics, StoreMetricsMiddleware};
pub use sql::{
entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent,
CeramicOneEventBlock, CeramicOneInterest, Migrations, SqlitePool, SqliteRootStore,
SqliteTransaction,
CeramicOneEventBlock, CeramicOneInterest, InsertResult, InsertedEvent, Migrations, SqlitePool,
SqliteRootStore, SqliteTransaction,
};

pub(crate) type Result<T> = std::result::Result<T, Error>;
4 changes: 2 additions & 2 deletions store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use prometheus_client::{
},
registry::Registry,
};
use recon::{AssociativeHash, HashCount, InsertResult, ReconItem, Result as ReconResult};
use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult};
use tokio::time::Instant;

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
Expand Down Expand Up @@ -253,7 +253,7 @@ where
Ok(new)
}

async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult<InsertResult> {
async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult<recon::InsertResult> {
let res = StoreMetricsMiddleware::<S>::record(
&self.metrics,
"insert_many",
Expand Down
Loading
Loading