Skip to content

Commit

Permalink
chore: hack to try reusing a sqlite connection for undelivered proces…
Browse files Browse the repository at this point in the history
…sing
  • Loading branch information
dav1do committed Dec 4, 2024
1 parent 0722214 commit d8b7d8f
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 10 deletions.
30 changes: 21 additions & 9 deletions event-svc/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use anyhow::anyhow;
use ceramic_event::unvalidated;
use ceramic_sql::sqlite::SqliteConnection;
use cid::Cid;
use ipld_core::ipld::Ipld;
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -169,16 +170,25 @@ impl StreamEvent {
}

/// Builds a stream event from the database if it exists.
async fn load_by_cid(event_access: Arc<EventAccess>, cid: EventCid) -> Result<Option<Self>> {
async fn load_by_cid(
event_access: &Arc<EventAccess>,
conn: &mut SqliteConnection,
cid: EventCid,
) -> Result<Option<Self>> {
// TODO: Condense the multiple DB queries happening here into a single query
let (exists, deliverable) = event_access.deliverable_by_cid(&cid).await?;
let (exists, deliverable) = event_access
.deliverable_by_cid_with_conn(&cid, conn)
.await?;
if exists {
let data = event_access.value_by_cid(&cid).await?.ok_or_else(|| {
Error::new_app(anyhow!(
"Missing event data for event that must exist: CID={}",
cid
))
})?;
let data = event_access
.value_by_cid_with_conn(&cid, conn)
.await?
.ok_or_else(|| {
Error::new_app(anyhow!(
"Missing event data for event that must exist: CID={}",
cid
))
})?;
let (_cid, parsed) = unvalidated::Event::<Ipld>::decode_car(data.as_slice(), false)
.map_err(Error::new_app)?;

Expand Down Expand Up @@ -395,6 +405,8 @@ impl StreamEvents {

trace!(count=%undelivered_q.len(), "undelivered events to process");

// use one connection for all reads in the loop. will close when dropped
let mut conn = event_access.detach_ro_connection().await?;
while let Some(StreamEventMetadata {
cid: undelivered_cid,
prev: desired_prev,
Expand All @@ -417,7 +429,7 @@ impl StreamEvents {
// nothing to do until it arrives on the channel
}
} else if let Some(discovered_prev) =
StreamEvent::load_by_cid(Arc::clone(&event_access), desired_prev).await?
StreamEvent::load_by_cid(&event_access, &mut conn, desired_prev).await?
{
match &discovered_prev {
// we found our prev in the database and it's deliverable, so we're deliverable now
Expand Down
51 changes: 50 additions & 1 deletion event-svc/src/store/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anyhow::anyhow;
use ceramic_anchor_service::AnchorRequest;
use ceramic_core::{event_id::InvalidEventId, Cid, EventId, NodeId};
use ceramic_event::unvalidated;
use ceramic_sql::sqlite::{SqlitePool, SqliteTransaction};
use ceramic_sql::sqlite::{SqliteConnection, SqlitePool, SqliteTransaction};
use ipld_core::ipld::Ipld;
use itertools::Itertools;
use recon::{AssociativeHash, HashCount, Key, Sha256a};
Expand Down Expand Up @@ -166,6 +166,12 @@ impl EventAccess {
self.pool.begin_tx().await
}

/// Remove a readonly connection from the pool (permanently).
/// It may exceed max connections setting in the meantime.
pub async fn detach_ro_connection(&self) -> Result<SqliteConnection> {
self.pool.detach_ro_connection().await
}

/// Get the current highwater mark for delivered events.
pub async fn get_highwater_mark(&self) -> Result<i64> {
Ok(self.delivered_counter.load(Ordering::Relaxed))
Expand Down Expand Up @@ -477,6 +483,18 @@ impl EventAccess {
rebuild_car(blocks).await
}

/// Finds the event data for the given CIDs i.e. the root CID in the carfile of the event.
pub async fn value_by_cid_conn(
conn: &mut sqlx::SqliteConnection,
key: &Cid,
) -> Result<Option<Vec<u8>>> {
let blocks: Vec<BlockRow> = sqlx::query_as(EventQuery::value_blocks_by_cid_one())
.bind(key.to_bytes())
.fetch_all(conn)
.await?;
rebuild_car(blocks).await
}

/// Finds the event data by a given CID i.e. the root CID in the carfile of the event.
pub async fn value_by_cid(&self, key: &Cid) -> Result<Option<Vec<u8>>> {
let blocks: Vec<BlockRow> = sqlx::query_as(EventQuery::value_blocks_by_cid_one())
Expand All @@ -486,6 +504,18 @@ impl EventAccess {
rebuild_car(blocks).await
}

/// Finds the event data by a given CID i.e. the root CID in the carfile of the event using a provided connection
pub async fn value_by_cid_with_conn(
key: &Cid,
conn: &mut SqliteConnection,
) -> Result<Option<Vec<u8>>> {
let blocks: Vec<BlockRow> = sqlx::query_as(EventQuery::value_blocks_by_cid_one())
.bind(key.to_bytes())
.fetch_all(conn.inner())
.await?;
rebuild_car(blocks).await
}

/// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered.
/// returns (bool, bool) = (exists, deliverable)
/// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could.
Expand All @@ -502,6 +532,25 @@ impl EventAccess {
Ok(exist.map_or((false, false), |row| (row.exists, row.delivered)))
}

/// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered.
/// returns (bool, bool) = (exists, deliverable)
/// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could.
pub async fn deliverable_by_cid_with_conn(
key: &Cid,
conn: &mut SqliteConnection,
) -> Result<(bool, bool)> {
#[derive(sqlx::FromRow)]
struct CidExists {
exists: bool,
delivered: bool,
}
let exist: Option<CidExists> = sqlx::query_as(EventQuery::value_delivered_by_cid())
.bind(key.to_bytes())
.fetch_optional(conn.inner())
.await?;
Ok(exist.map_or((false, false), |row| (row.exists, row.delivered)))
}

/// Fetch data event CIDs from a specified source that are above the current anchoring high water mark
pub async fn data_events_by_informant(
&self,
Expand Down
20 changes: 20 additions & 0 deletions sql/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ impl SqlitePool {
Ok(SqliteTransaction { tx })
}

/// Remove a connection from the pool (permanently). It may exceed max connections setting in the meantime.
pub async fn detach_ro_connection(&self) -> Result<SqliteConnection> {
let conn = self.reader.acquire().await?.detach();
Ok(SqliteConnection { conn })
}

/// Get a reference to the writer database pool. The writer pool has only one connection.
/// If you are going to do multiple writes in a row, instead use `tx` and `commit`.
pub fn writer(&self) -> &sqlx::SqlitePool {
Expand All @@ -212,6 +218,19 @@ impl SqlitePool {
}
}

#[derive(Debug)]
/// A wrapper around a sqlx Sqlite connection
pub struct SqliteConnection {
conn: sqlx::SqliteConnection,
}

impl SqliteConnection {
/// Access to the `sqlx::SqliteConnection` directly
pub fn inner(&mut self) -> &mut sqlx::SqliteConnection {
&mut self.conn
}
}

#[derive(Debug)]
/// A wrapper around a sqlx Sqlite transaction
pub struct SqliteTransaction<'a> {
Expand All @@ -231,6 +250,7 @@ impl<'a> SqliteTransaction<'a> {
Ok(())
}

/// Access to the `sqlx::Transaction` directly
pub fn inner(&mut self) -> &mut Transaction<'a, Sqlite> {
&mut self.tx
}
Expand Down

0 comments on commit d8b7d8f

Please sign in to comment.