Skip to content

Commit

Permalink
refactor: rename some things, fix don't update deliverable flag on de…
Browse files Browse the repository at this point in the history
…liverable events
  • Loading branch information
dav1do committed Jun 17, 2024
1 parent 3e7f253 commit 554eec0
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 14 deletions.
2 changes: 1 addition & 1 deletion service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl OrderingState {
// once we find the first event that's deliverable, we can go back through and find the rest
continue;
} else {
let (exists, delivered) = CeramicOneEvent::delivered_by_cid(pool, &prev).await?;
let (exists, delivered) = CeramicOneEvent::deliverable_by_cid(pool, &prev).await?;
if delivered {
trace!(deliverable=?ev_cid, "Found delivered prev in database. Adding to ready list");
deliverable.push_back(ev_cid);
Expand Down
2 changes: 1 addition & 1 deletion service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl InsertEventOrdering {
}

fn mark_event_deliverable_now(&mut self, mut ev: EventInsertable, init_cid: Cid) {
ev.deliverable(true);
ev.set_deliverable(true);
self.notify_task_new
.push(DeliveredEvent::new(ev.body.cid, init_cid));
self.insert_now.push(ev);
Expand Down
2 changes: 1 addition & 1 deletion service/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub(crate) async fn check_deliverable(
cid: &Cid,
deliverable: bool,
) {
let (exists, delivered) = ceramic_store::CeramicOneEvent::delivered_by_cid(pool, cid)
let (exists, delivered) = ceramic_store::CeramicOneEvent::deliverable_by_cid(pool, cid)
.await
.unwrap();
assert!(exists);
Expand Down
29 changes: 23 additions & 6 deletions store/src/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,27 @@ static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0);
pub struct CeramicOneEvent {}

impl CeramicOneEvent {
async fn insert_key(tx: &mut SqliteTransaction<'_>, key: &EventId) -> Result<bool> {
fn next_deliverable() -> i64 {
GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst)
}

/// Insert the event and its hash into the ceramic_one_event table
async fn insert_event(
tx: &mut SqliteTransaction<'_>,
key: &EventId,
deliverable: bool,
) -> Result<bool> {
let id = key.as_bytes();
let cid = key
.cid()
.map(|cid| cid.to_bytes())
.ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))?;
let hash = Sha256a::digest(key);
let delivered: Option<i64> = if deliverable {
Some(Self::next_deliverable())
} else {
None
};

let resp = sqlx::query(ReconQuery::insert_event())
.bind(id)
Expand All @@ -46,6 +60,7 @@ impl CeramicOneEvent {
.bind(hash.as_u32s()[5])
.bind(hash.as_u32s()[6])
.bind(hash.as_u32s()[7])
.bind(delivered)
.execute(&mut **tx.inner())
.await;

Expand Down Expand Up @@ -87,7 +102,7 @@ impl CeramicOneEvent {
pub async fn mark_ready_to_deliver(conn: &mut SqliteTransaction<'_>, key: &Cid) -> Result<()> {
// Fetch add happens with an open transaction (on one writer for the db) so we're guaranteed to get a unique value
sqlx::query(EventQuery::mark_ready_to_deliver())
.bind(GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst))
.bind(Self::next_deliverable())
.bind(&key.to_bytes())
.execute(&mut **conn.inner())
.await?;
Expand All @@ -104,14 +119,15 @@ impl CeramicOneEvent {
let mut tx = pool.begin_tx().await.map_err(Error::from)?;

for (idx, item) in to_add.iter().enumerate() {
let new_key = Self::insert_key(&mut tx, &item.order_key).await?;
let new_key =
Self::insert_event(&mut tx, &item.order_key, item.body.deliverable).await?;
if new_key {
for block in item.body.blocks.iter() {
CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?;
CeramicOneEventBlock::insert(&mut tx, block).await?;
}
}
if item.body.deliverable {
if !new_key && item.body.deliverable {
Self::mark_ready_to_deliver(&mut tx, &item.body.cid).await?;
}
new_keys[idx] = new_key;
Expand Down Expand Up @@ -248,8 +264,9 @@ impl CeramicOneEvent {
}

/// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered.
/// (bool, bool) = (exists, delivered)
pub async fn delivered_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> {
/// returns (bool, bool) = (exists, deliverable)
/// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could.
pub async fn deliverable_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> {
#[derive(sqlx::FromRow)]
struct CidExists {
exists: bool,
Expand Down
2 changes: 1 addition & 1 deletion store/src/sql/entities/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl EventInsertable {
}

/// change the deliverable status of the event
pub fn deliverable(&mut self, deliverable: bool) {
pub fn set_deliverable(&mut self, deliverable: bool) {
self.body.deliverable = deliverable;
}
}
Expand Down
4 changes: 4 additions & 0 deletions store/src/sql/entities/event_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,8 @@ impl EventBlockRaw {
bytes,
})
}

pub fn cid(&self) -> Cid {
Cid::new_v1(self.codec as u64, self.multihash.clone().into_inner())
}
}
2 changes: 1 addition & 1 deletion store/src/sql/entities/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use sqlx::{sqlite::SqliteRow, Row as _};

use crate::{Error, Result};

#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::Type)]
#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)]
pub struct BlockHash(Multihash<64>);

impl BlockHash {
Expand Down
8 changes: 5 additions & 3 deletions store/src/sql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl EventQuery {

/// Updates the delivered column in the event table so it can be set to the client
pub fn mark_ready_to_deliver() -> &'static str {
"UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2;"
"UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2 and delivered is NULL;"
}
}

Expand Down Expand Up @@ -167,11 +167,13 @@ impl ReconQuery {
"INSERT INTO ceramic_one_event (
order_key, cid,
ahash_0, ahash_1, ahash_2, ahash_3,
ahash_4, ahash_5, ahash_6, ahash_7
ahash_4, ahash_5, ahash_6, ahash_7,
delivered
) VALUES (
$1, $2,
$3, $4, $5, $6,
$7, $8, $9, $10
$7, $8, $9, $10,
$11
);"
}

Expand Down

0 comments on commit 554eec0

Please sign in to comment.