diff --git a/api/src/server.rs b/api/src/server.rs index 6c7addf8c..2af1588a6 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -37,7 +37,7 @@ pub trait Recon: Clone + Send + Sync { type Key: Key; type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>; - async fn insert(&self, key: Self::Key) -> Result<()>; + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()>; async fn range( &self, start: Self::Key, @@ -47,7 +47,6 @@ pub trait Recon: Clone + Send + Sync { ) -> Result>; async fn value_for_key(&self, key: Self::Key) -> Result>>; - async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()>; } #[async_trait] @@ -59,8 +58,8 @@ where type Key = K; type Hash = H; - async fn insert(&self, key: Self::Key) -> Result<()> { - let _ = recon::Client::insert(self, key).await?; + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()> { + let _ = recon::Client::insert(self, key, value).await?; Ok(()) } @@ -78,9 +77,6 @@ where async fn value_for_key(&self, key: Self::Key) -> Result>> { recon::Client::value_for_key(self, key).await } - async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()> { - recon::Client::store_value_for_key(self, key, value).await - } } #[derive(Clone)] @@ -146,13 +142,10 @@ where let event_id = decode_event_id(&event.event_id)?; let event_data = decode_event_data(&event.event_data)?; self.model - .insert(event_id.clone()) - .await - .map_err(|err| ApiError(format!("failed to insert key: {err}")))?; - self.model - .store_value_for_key(event_id, &event_data) + .insert(event_id.clone(), Some(event_data)) .await .map_err(|err| ApiError(format!("failed to insert key: {err}")))?; + Ok(EventsPostResponse::Success) } @@ -240,7 +233,7 @@ where .with_not_after(0) .build(); self.interest - .insert(interest) + .insert(interest, None) .await .map_err(|err| ApiError(format!("failed to update interest: {err}")))?; @@ -297,7 +290,7 @@ mod tests { struct Context; mock! { pub ReconInterestTest { - fn insert(&self, key: Interest) -> Result<()>; + fn insert(&self, key: Interest, value: Option>) -> Result<()>; fn range( &self, start: Interest, @@ -316,8 +309,8 @@ mod tests { impl Recon for MockReconInterestTest { type Key = Interest; type Hash = Sha256a; - async fn insert(&self, key: Self::Key) -> Result<()> { - self.insert(key) + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()> { + self.insert(key, value) } async fn range( &self, @@ -331,14 +324,11 @@ mod tests { async fn value_for_key(&self, _key: Self::Key) -> Result>> { Ok(None) } - async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> { - Ok(()) - } } mock! { pub ReconModelTest { - fn insert(&self, key: EventId) -> Result<()>; + fn insert(&self, key: EventId, value: Option>) -> Result<()>; fn range( &self, start: EventId, @@ -356,8 +346,8 @@ mod tests { impl Recon for MockReconModelTest { type Key = EventId; type Hash = Sha256a; - async fn insert(&self, key: Self::Key) -> Result<()> { - self.insert(key) + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()> { + self.insert(key, value) } async fn range( &self, @@ -371,9 +361,6 @@ mod tests { async fn value_for_key(&self, _key: Self::Key) -> Result>> { Ok(None) } - async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> { - Ok(()) - } } #[tokio::test] @@ -391,19 +378,23 @@ mod tests { &Cid::from_str("baejbeicqtpe5si4qvbffs2s7vtbk5ccbsfg6owmpidfj3zeluqz4hlnz6m").unwrap(), // cspell:disable-line ); let event_id_str = multibase::encode(Base::Base16Lower, event_id.to_bytes()); + let event_data = "f".to_string(); let mock_interest = MockReconInterestTest::new(); let mut mock_model = MockReconModelTest::new(); mock_model .expect_insert() - .with(predicate::eq(event_id)) + .with( + predicate::eq(event_id), + predicate::eq(Some(decode_event_data(event_data.as_str()).unwrap())), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let server = Server::new(peer_id, network, mock_interest, mock_model); let resp = server .events_post( models::Event { event_id: event_id_str, - event_data: "f".to_string(), + event_data, }, &Context, ) @@ -461,16 +452,19 @@ mod tests { let mut mock_interest = MockReconInterestTest::new(); mock_interest .expect_insert() - .with(predicate::eq( - Interest::builder() - .with_sort_key("model") - .with_peer_id(&peer_id) - .with_range((start.as_slice(), end.as_slice())) - .with_not_after(0) - .build(), - )) + .with( + predicate::eq( + Interest::builder() + .with_sort_key("model") + .with_peer_id(&peer_id) + .with_range((start.as_slice(), end.as_slice())) + .with_not_after(0) + .build(), + ), + predicate::eq(None), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let mut mock_model = MockReconModelTest::new(); mock_model .expect_range() @@ -524,16 +518,19 @@ mod tests { let mut mock_interest = MockReconInterestTest::new(); mock_interest .expect_insert() - .with(predicate::eq( - Interest::builder() - .with_sort_key("model") - .with_peer_id(&peer_id) - .with_range((start.as_slice(), end.as_slice())) - .with_not_after(0) - .build(), - )) + .with( + predicate::eq( + Interest::builder() + .with_sort_key("model") + .with_peer_id(&peer_id) + .with_range((start.as_slice(), end.as_slice())) + .with_not_after(0) + .build(), + ), + predicate::eq(None), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let mut mock_model = MockReconModelTest::new(); mock_model .expect_range() @@ -587,16 +584,19 @@ mod tests { let mut mock_interest = MockReconInterestTest::new(); mock_interest .expect_insert() - .with(predicate::eq( - Interest::builder() - .with_sort_key("model") - .with_peer_id(&peer_id) - .with_range((start.as_slice(), end.as_slice())) - .with_not_after(0) - .build(), - )) + .with( + predicate::eq( + Interest::builder() + .with_sort_key("model") + .with_peer_id(&peer_id) + .with_range((start.as_slice(), end.as_slice())) + .with_not_after(0) + .build(), + ), + predicate::eq(None), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let mut mock_model = MockReconModelTest::new(); mock_model .expect_range() @@ -650,16 +650,19 @@ mod tests { let mut mock_interest = MockReconInterestTest::new(); mock_interest .expect_insert() - .with(predicate::eq( - Interest::builder() - .with_sort_key("model") - .with_peer_id(&peer_id) - .with_range((start.as_slice(), end.as_slice())) - .with_not_after(0) - .build(), - )) + .with( + predicate::eq( + Interest::builder() + .with_sort_key("model") + .with_peer_id(&peer_id) + .with_range((start.as_slice(), end.as_slice())) + .with_not_after(0) + .build(), + ), + predicate::eq(None), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let mut mock_model = MockReconModelTest::new(); mock_model .expect_range() diff --git a/core/src/lib.rs b/core/src/lib.rs index 7069a1342..1e5bd0ade 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -20,7 +20,7 @@ pub use jws::{Jws, JwsSignature}; pub use network::Network; pub use range::RangeOpen; pub use signer::{JwkSigner, Signer}; -pub use sql::SqlitePool; +pub use sql::{DbTx, SqlitePool}; pub use stream_id::{StreamId, StreamIdType}; pub use cid::Cid; diff --git a/core/src/sql.rs b/core/src/sql.rs index d81ef3de2..b97712d48 100644 --- a/core/src/sql.rs +++ b/core/src/sql.rs @@ -1,6 +1,12 @@ use std::{path::Path, str::FromStr}; -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; +use sqlx::{ + sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, + Sqlite, Transaction, +}; + +/// A trivial wrapper around a sqlx Sqlite database transaction +pub type DbTx<'a> = Transaction<'a, Sqlite>; #[derive(Clone, Debug)] /// The sqlite pool is split into a writer and a reader pool. @@ -19,16 +25,20 @@ impl SqlitePool { // A few ideas: number of RO connections, synchronize = NORMAL, mmap_size, temp_store = memory let conn_opts = SqliteConnectOptions::from_str(&db_path)? .journal_mode(SqliteJournalMode::Wal) + .synchronous(sqlx::sqlite::SqliteSynchronous::Normal) .create_if_missing(true) .optimize_on_close(true, None); let ro_opts = conn_opts.clone().read_only(true); let writer = SqlitePoolOptions::new() + .min_connections(1) .max_connections(1) + .acquire_timeout(std::time::Duration::from_secs(1)) .connect_with(conn_opts) .await?; let reader = SqlitePoolOptions::new() + .min_connections(1) .max_connections(8) .connect_with(ro_opts) .await?; @@ -37,10 +47,17 @@ impl SqlitePool { } /// Get a reference to the writer database pool. The writer pool has only one connection. + /// If you are going to do multiple writes in a row, instead use `tx` and `commit`. pub fn writer(&self) -> &sqlx::SqlitePool { &self.writer } + /// Get a writer tranaction. The writer pool has only one connection so this is an exclusive lock. + /// Use this method to perform simultaneous writes to the database, calling `commit` when you are done. + pub async fn tx(&self) -> anyhow::Result { + Ok(self.writer.begin().await?) + } + /// Get a reference to the reader database pool. The reader pool has many connections. pub fn reader(&self) -> &sqlx::SqlitePool { &self.reader diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 8c685d776..06c08ebf3 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -1247,7 +1247,7 @@ mod tests { type Key = K; type Hash = Sha256a; - async fn insert(&self, _key: Self::Key) -> Result<()> { + async fn insert(&self, _key: Self::Key, _value: Option>) -> Result<()> { unreachable!() } @@ -1268,9 +1268,7 @@ mod tests { async fn value_for_key(&self, _key: Self::Key) -> Result>> { Ok(None) } - async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> { - Ok(()) - } + async fn interests(&self) -> Result>> { unreachable!() } diff --git a/recon/src/client.rs b/recon/src/client.rs index e7a17399f..d287203f0 100644 --- a/recon/src/client.rs +++ b/recon/src/client.rs @@ -7,7 +7,7 @@ use tokio::sync::{ use tracing::warn; use crate::{ - recon::{Range, SyncState}, + recon::{Range, ReconItem, SyncState}, AssociativeHash, InterestProvider, Key, Metrics, Recon, Store, }; @@ -24,9 +24,11 @@ where H: AssociativeHash, { /// Sends an insert request to the server and awaits the response. - pub async fn insert(&self, key: K) -> Result { + pub async fn insert(&self, key: K, value: Option>) -> Result { let (ret, rx) = oneshot::channel(); - self.sender.send(Request::Insert { key, ret }).await?; + self.sender + .send(Request::Insert { key, value, ret }) + .await?; rx.await? } @@ -77,19 +79,6 @@ where rx.await? } - /// Store the value associated with a key so we can sync it later. - pub async fn store_value_for_key(&self, key: K, value: &[u8]) -> Result<()> { - let (ret, rx) = oneshot::channel(); - self.sender - .send(Request::StoreValueForKey { - key, - value: value.to_vec(), - ret, - }) - .await?; - rx.await? - } - /// Report the local nodes interests. pub async fn interests(&self) -> Result>> { let (ret, rx) = oneshot::channel(); @@ -133,6 +122,7 @@ where enum Request { Insert { key: K, + value: Option>, ret: oneshot::Sender>, }, Len { @@ -152,11 +142,6 @@ enum Request { key: K, ret: oneshot::Sender>>>, }, - StoreValueForKey { - key: K, - value: Vec, - ret: oneshot::Sender>, - }, Interests { ret: oneshot::Sender>>>, }, @@ -227,8 +212,12 @@ where let request = self.requests.recv().await; if let Some(request) = request { match request { - Request::Insert { key, ret } => { - send(ret, self.recon.insert(&key).await); + Request::Insert { key, value, ret } => { + let val = self + .recon + .insert(ReconItem::new(&key, value.as_deref())) + .await; + send(ret, val); } Request::Len { ret } => { send(ret, self.recon.len().await); @@ -254,10 +243,6 @@ where let value = self.recon.value_for_key(key).await; send(ret, value); } - Request::StoreValueForKey { key, value, ret } => { - let ok = self.recon.store_value_for_key(key, value).await; - send(ret, ok); - } Request::Interests { ret } => { let value = self.recon.interests().await; send(ret, value); diff --git a/recon/src/metrics.rs b/recon/src/metrics.rs index c110af591..5bfbfdc27 100644 --- a/recon/src/metrics.rs +++ b/recon/src/metrics.rs @@ -216,10 +216,12 @@ impl Metrics { } } -pub(crate) struct KeyInsertEvent; +pub(crate) struct KeyInsertEvent { + pub(crate) cnt: u64, +} impl Recorder for Metrics { - fn record(&self, _event: &KeyInsertEvent) { - self.key_insert_count.inc(); + fn record(&self, event: &KeyInsertEvent) { + self.key_insert_count.inc_by(event.cnt); } } @@ -237,10 +239,12 @@ impl Recorder for Metrics { } } -pub(crate) struct ValueInsertEvent; +pub(crate) struct ValueInsertEvent { + pub(crate) cnt: u64, +} impl Recorder for Metrics { - fn record(&self, _event: &ValueInsertEvent) { - self.value_insert_count.inc(); + fn record(&self, event: &ValueInsertEvent) { + self.value_insert_count.inc_by(event.cnt); } } diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index c4d43bdb5..f3eb58496 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -678,11 +678,11 @@ where Ok(()) } async fn process_value_response(&mut self, key: R::Key, value: Vec) -> Result<()> { - self.recon.insert(key.clone()).await.context("store key")?; self.recon - .store_value_for_key(key, &value) + .insert(key, Some(value)) .await - .context("store value for key") + .context("process value response")?; + Ok(()) } // The remote is missing all keys in the range send them over. async fn process_remote_missing_range(&mut self, range: &Range) -> Result<()> { @@ -807,7 +807,7 @@ pub trait Recon: Clone + Send + Sync + 'static { type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>; /// Insert a new key into the key space. - async fn insert(&self, key: Self::Key) -> Result<()>; + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()>; /// Get all keys in the specified range async fn range( @@ -829,9 +829,6 @@ pub trait Recon: Clone + Send + Sync + 'static { /// retrieve a value associated with a recon key async fn value_for_key(&self, key: Self::Key) -> Result>>; - /// associate a value with a recon key - async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()>; - /// Reports the interests of this recon instance async fn interests(&self) -> Result>>; @@ -866,8 +863,8 @@ where type Key = K; type Hash = H; - async fn insert(&self, key: Self::Key) -> Result<()> { - let _ = Client::insert(self, key).await?; + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()> { + let _ = Client::insert(self, key, value).await?; Ok(()) } @@ -892,9 +889,7 @@ where async fn value_for_key(&self, key: Self::Key) -> Result>> { Client::value_for_key(self, key).await } - async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()> { - Client::store_value_for_key(self, key, value).await - } + async fn interests(&self) -> Result>> { Client::interests(self).await } diff --git a/recon/src/recon.rs b/recon/src/recon.rs index c3cb79359..c48e1b568 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -98,12 +98,31 @@ where /// Reports any new keys and what the range indicates about how the local and remote node are /// synchronized. pub async fn process_range(&mut self, range: Range) -> Result<(SyncState, Vec)> { + let mut should_add = Vec::with_capacity(2); let mut new_keys = Vec::with_capacity(2); - if !range.first.is_fencepost() && self.insert(&range.first).await? { - new_keys.push(range.first.clone()); + + if !range.first.is_fencepost() { + should_add.push(range.first.clone()); + } + + if !range.last.is_fencepost() { + should_add.push(range.last.clone()); } - if !range.last.is_fencepost() && self.insert(&range.last).await? { - new_keys.push(range.last.clone()); + + if !should_add.is_empty() { + let new = self + .insert_many(should_add.iter().map(|key| ReconItem::new_key(key))) + .await?; + debug_assert_eq!( + new.len(), + should_add.len(), + "new and should_add must be same length" + ); + for (idx, key) in should_add.into_iter().enumerate() { + if new[idx] { + new_keys.push(key); + } + } } let calculated_hash = self.store.hash_range(&range.first, &range.last).await?; @@ -224,31 +243,40 @@ where self.store.value_for_key(&key).await } - /// Associate a value with a recon key - pub async fn store_value_for_key(&mut self, key: K, value: Vec) -> Result<()> { - if self.store.store_value_for_key(&key, &value).await? { - self.metrics.record(&ValueInsertEvent); - } - Ok(()) - } + /// Insert key into the key space. Includes an optional value. + /// Returns a boolean (true) indicating if the key was new. + pub async fn insert(&mut self, item: ReconItem<'_, K>) -> Result { + let new_val = item.value.is_some(); + let new = self.store.insert(item).await?; - /// Insert a new key into the key space. - /// Returns true if the key did not previously exist. - pub async fn insert(&mut self, key: &K) -> Result { - let new_key = self.store.insert(key).await?; - if new_key { - self.metrics.record(&KeyInsertEvent); + if new { + self.metrics.record(&KeyInsertEvent { cnt: 1 }); } - Ok(new_key) + if new_val { + self.metrics.record(&ValueInsertEvent { cnt: 1 }); + } + + Ok(new) } - /// Insert many keys into the key space. - pub async fn insert_many<'a, IT>(&mut self, keys: IT) -> Result + /// Insert many keys into the key space. Includes an optional value for each key. + /// Returns an array with a boolean for each key indicating if the key was new. + /// The order is the same as the order of the keys. True means new, false means not new. + pub async fn insert_many<'a, IT>(&mut self, items: IT) -> Result> where - IT: Iterator + Send, + IT: ExactSizeIterator> + Send + Sync, { - let new_key = self.store.insert_many(keys).await?; - Ok(new_key) + let result = self.store.insert_many(items).await?; + let key_cnt = result.keys.iter().filter(|k| **k).count(); + + self.metrics.record(&KeyInsertEvent { + cnt: key_cnt as u64, + }); + self.metrics.record(&ValueInsertEvent { + cnt: result.value_count as u64, + }); + + Ok(result.keys) } /// Reports total number of keys @@ -329,6 +357,57 @@ impl From for HashCount { } } +#[derive(Clone, Debug)] +pub struct ReconItem<'a, K> +where + K: Key, +{ + pub key: &'a K, + pub value: Option<&'a [u8]>, +} + +impl<'a, K> ReconItem<'a, K> +where + K: Key, +{ + pub fn new(key: &'a K, value: Option<&'a [u8]>) -> Self { + Self { key, value } + } + + pub fn new_key(key: &'a K) -> Self { + Self { key, value: None } + } + + pub fn new_with_value(key: &'a K, value: &'a [u8]) -> Self { + Self { + key, + value: Some(value), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct InsertResult { + /// A true/false list indicating whether or not the key was new. + /// It is in the same order as the input list of keys. + pub keys: Vec, + pub value_count: usize, +} + +impl InsertResult { + pub fn new(new_keys: Vec, value_count: usize) -> Self { + Self { + keys: new_keys, + value_count, + } + } + + /// true if any key is new, false otherwise + pub fn included_new_key(&self) -> bool { + self.keys.iter().any(|new| *new) + } +} + /// Store defines the API needed to store the Recon set. #[async_trait] pub trait Store: std::fmt::Debug { @@ -337,22 +416,16 @@ pub trait Store: std::fmt::Debug { /// Type of the AssociativeHash to compute over keys. type Hash: AssociativeHash; - /// Insert a new key into the key space. - /// Returns true if the key did not previously exist. - async fn insert(&mut self, key: &Self::Key) -> Result; + /// Insert a new key into the key space. Returns true if the key did not exist. + /// The value will be updated if included + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result; /// Insert new keys into the key space. - /// Returns true if a key did not previously exist. - async fn insert_many<'a, I>(&mut self, keys: I) -> Result + /// Returns true for each key if it did not previously exist, in the + /// same order as the input iterator. + async fn insert_many<'a, I>(&mut self, items: I) -> Result where - I: Iterator + Send, - { - let mut new = false; - for key in keys { - new |= self.insert(key).await?; - } - Ok(new) - } + I: ExactSizeIterator> + Send + Sync; /// Return the hash of all keys in the range between left_fencepost and right_fencepost. /// Both range bounds are exclusive. @@ -471,12 +544,6 @@ pub trait Store: std::fmt::Debug { Ok(self.len().await? == 0) } - /// store_value_for_key returns - /// Ok(true) if stored, - /// Ok(false) if already present, and - /// Err(e) if store failed. - async fn store_value_for_key(&mut self, key: &Self::Key, value: &[u8]) -> Result; - /// value_for_key returns /// Ok(Some(value)) if stored, /// Ok(None) if not stored, and diff --git a/recon/src/recon/btreestore.rs b/recon/src/recon/btreestore.rs index 68243a012..14430cf4e 100644 --- a/recon/src/recon/btreestore.rs +++ b/recon/src/recon/btreestore.rs @@ -2,9 +2,9 @@ use anyhow::Result; use async_trait::async_trait; use std::{collections::BTreeMap, ops::Bound}; -use crate::recon::{AssociativeHash, Key, MaybeHashedKey, Store}; +use crate::recon::{AssociativeHash, Key, MaybeHashedKey, ReconItem, Store}; -use super::HashCount; +use super::{HashCount, InsertResult}; /// An implementation of a Store that stores keys in an in-memory BTree #[derive(Clone, Debug)] @@ -115,8 +115,31 @@ where type Key = K; type Hash = H; - async fn insert(&mut self, key: &Self::Key) -> Result { - Ok(self.keys.insert(key.to_owned(), H::digest(key)).is_none()) + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { + let new = self + .keys + .insert(item.key.clone(), H::digest(item.key)) + .is_none(); + + if let Some(val) = item.value { + self.values.insert(item.key.clone(), val.to_vec()); + } + Ok(new) + } + + async fn insert_many<'a, I>(&mut self, items: I) -> Result + where + I: ExactSizeIterator> + Send + Sync, + { + let mut new = vec![false; items.len()]; + let mut new_val_cnt = 0; + for (idx, item) in items.enumerate() { + if item.value.is_some() { + new_val_cnt += 1; + } + new[idx] = self.insert(item).await?; + } + Ok(InsertResult::new(new, new_val_cnt)) } async fn hash_range( @@ -179,10 +202,6 @@ where } } - /// store_value_for_key returns Some(true) is inserting, Some(false) if present, and Err if store failed. - async fn store_value_for_key(&mut self, key: &Self::Key, value: &[u8]) -> Result { - Ok(self.values.insert(key.clone(), value.to_vec()).is_none()) - } /// value_for_key returns an Error is retrieving failed and None if the key is not stored. async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { Ok(self.values.get(key).cloned()) diff --git a/recon/src/recon/sqlitestore.rs b/recon/src/recon/sqlitestore.rs index 6994b8405..d3098366b 100644 --- a/recon/src/recon/sqlitestore.rs +++ b/recon/src/recon/sqlitestore.rs @@ -1,10 +1,10 @@ #![warn(missing_docs, missing_debug_implementations, clippy::all)] -use super::HashCount; +use super::{HashCount, InsertResult, ReconItem}; use crate::{AssociativeHash, Key, Store}; use anyhow::Result; use async_trait::async_trait; -use ceramic_core::SqlitePool; +use ceramic_core::{DbTx, SqlitePool}; use sqlx::Row; use std::marker::PhantomData; use std::result::Result::Ok; @@ -49,6 +49,7 @@ where { /// Initialize the recon table. async fn create_table_if_not_exists(&mut self) -> Result<()> { + // Do we want to remove CID and block_retrieved from the table? const CREATE_RECON_TABLE: &str = "CREATE TABLE IF NOT EXISTS recon ( sort_key TEXT, -- the field in the event header to sort by e.g. model key BLOB, -- network_id sort_value controller StreamID height event_cid @@ -61,47 +62,84 @@ where ahash_6 INTEGER, ahash_7 INTEGER, CID TEXT, - value BLOB, block_retrieved BOOL, -- indicates if we still want the block PRIMARY KEY(sort_key, key) )"; - sqlx::query(CREATE_RECON_TABLE) - .execute(self.pool.writer()) + const CREATE_RECON_VALUE_TABLE: &str = "CREATE TABLE IF NOT EXISTS recon_value ( + sort_key TEXT, + key BLOB, + value BLOB, + PRIMARY KEY(sort_key, key) + )"; + + let mut tx = self.pool.tx().await?; + sqlx::query(CREATE_RECON_TABLE).execute(&mut *tx).await?; + sqlx::query(CREATE_RECON_VALUE_TABLE) + .execute(&mut *tx) .await?; + tx.commit().await?; Ok(()) } -} -#[async_trait] -impl Store for SQLiteStore -where - K: Key, - H: AssociativeHash, -{ - type Key = K; - type Hash = H; + /// returns (new_key, new_val) tuple + async fn insert_item_int( + &mut self, + item: &ReconItem<'_, K>, + conn: &mut DbTx<'_>, + ) -> Result<(bool, bool)> { + // we insert the value first as it's possible we already have the key and can skip that step + // as it happens in a transaction, we'll roll back the value insert if the key insert fails and try again + if let Some(val) = item.value { + if self.insert_value_int(item.key, val, conn).await? { + return Ok((false, true)); + } + } + let new_key = self.insert_key_int(item.key, conn).await?; + Ok((new_key, item.value.is_some())) + } - // Ok(true): inserted the key - // Ok(false): did not insert the key ConstraintViolation - // Err(e): sql error - #[instrument(skip(self))] - async fn insert(&mut self, key: &Self::Key) -> Result { - let query = sqlx::query( + /// returns true if the key already exists in the recon table + async fn insert_value_int(&mut self, key: &K, val: &[u8], conn: &mut DbTx<'_>) -> Result { + let value_insert = sqlx::query( + r#"INSERT INTO recon_value (value, sort_key, key) + VALUES (?, ?, ?) + ON CONFLICT (sort_key, key) DO UPDATE + SET value=excluded.value + RETURNING + EXISTS(select 1 from recon where sort_key=? and key=?)"#, + ); + + let resp = value_insert + .bind(val) + .bind(&self.sort_key) + .bind(key.as_bytes()) + .bind(&self.sort_key) + .bind(key.as_bytes()) + .fetch_one(&mut **conn) + .await?; + + let v = resp.get::<'_, bool, _>(0); + Ok(v) + } + + async fn insert_key_int(&mut self, key: &K, conn: &mut DbTx<'_>) -> Result { + let key_insert = sqlx::query( "INSERT INTO recon ( - sort_key, key, - ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7, - block_retrieved - ) VALUES ( - ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ? - );", + sort_key, key, + ahash_0, ahash_1, ahash_2, ahash_3, + ahash_4, ahash_5, ahash_6, ahash_7, + block_retrieved + ) VALUES ( + ?, ?, + ?, ?, ?, ?, + ?, ?, ?, ?, + ? + );", ); + let hash = H::digest(key); - let resp = query + let resp = key_insert .bind(&self.sort_key) .bind(key.as_bytes()) .bind(hash.as_u32s()[0]) @@ -113,7 +151,7 @@ where .bind(hash.as_u32s()[6]) .bind(hash.as_u32s()[7]) .bind(false) - .fetch_all(self.pool.writer()) + .execute(&mut **conn) .await; match resp { std::result::Result::Ok(_rows) => Ok(true), @@ -127,6 +165,50 @@ where Err(err) => Err(err.into()), } } +} + +#[async_trait] +impl Store for SQLiteStore +where + K: Key, + H: AssociativeHash, +{ + type Key = K; + type Hash = H; + + /// Returns true if the key was new. The value is always updated if included + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { + let mut tx = self.pool.writer().begin().await?; + let (new_key, _new_val) = self.insert_item_int(&item, &mut tx).await?; + tx.commit().await?; + Ok(new_key) + } + + /// Insert new keys into the key space. + /// Returns true if a key did not previously exist. + async fn insert_many<'a, I>(&mut self, items: I) -> Result + where + I: ExactSizeIterator> + Send + Sync, + { + match items.len() { + 0 => Ok(InsertResult::new(vec![], 0)), + _ => { + let mut results = vec![false; items.len()]; + let mut new_val_cnt = 0; + let mut tx = self.pool.writer().begin().await?; + + for (idx, item) in items.enumerate() { + let (new_key, new_val) = self.insert_item_int(&item, &mut tx).await?; + results[idx] = new_key; + if new_val { + new_val_cnt += 1; + } + } + tx.commit().await?; + Ok(InsertResult::new(results, new_val_cnt)) + } + } + } /// return the hash and count for a range #[instrument(skip(self))] @@ -359,21 +441,9 @@ where } } - #[instrument(skip(self))] - async fn store_value_for_key(&mut self, key: &Self::Key, value: &[u8]) -> Result { - let query = sqlx::query("UPDATE recon SET value=? WHERE sort_key=? AND key=?;"); - query - .bind(value) - .bind(&self.sort_key) - .bind(key.as_bytes()) - .fetch_all(self.pool.writer()) - .await?; - Ok(true) - } - #[instrument(skip(self))] async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { - let query = sqlx::query("SELECT value FROM recon WHERE sort_key=? AND key=?;"); + let query = sqlx::query("SELECT value FROM recon_value WHERE sort_key=? AND key=?;"); let row = query .bind(&self.sort_key) .bind(key.as_bytes()) @@ -387,6 +457,7 @@ where mod tests { use super::*; + use crate::recon::ReconItem; use crate::tests::AlphaNumBytes; use crate::Sha256a; @@ -403,8 +474,14 @@ mod tests { #[test(tokio::test)] async fn test_hash_range_query() { let mut store = new_store().await; - store.insert(&AlphaNumBytes::from("hello")).await.unwrap(); - store.insert(&AlphaNumBytes::from("world")).await.unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await + .unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("world"))) + .await + .unwrap(); let hash: Sha256a = store .hash_range(&b"a".as_slice().into(), &b"z".as_slice().into()) .await @@ -417,8 +494,14 @@ mod tests { #[test(tokio::test)] async fn test_range_query() { let mut store = new_store().await; - store.insert(&AlphaNumBytes::from("hello")).await.unwrap(); - store.insert(&AlphaNumBytes::from("world")).await.unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await + .unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("world"))) + .await + .unwrap(); let ids = store .range( &b"a".as_slice().into(), @@ -453,7 +536,11 @@ mod tests { ) "# ] - .assert_debug_eq(&store.insert(&AlphaNumBytes::from("hello")).await); + .assert_debug_eq( + &store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await, + ); // reject the second insert of same key expect![ @@ -463,14 +550,24 @@ mod tests { ) "# ] - .assert_debug_eq(&store.insert(&AlphaNumBytes::from("hello")).await); + .assert_debug_eq( + &store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await, + ); } #[test(tokio::test)] async fn test_first_and_last() { let mut store = new_store().await; - store.insert(&AlphaNumBytes::from("hello")).await.unwrap(); - store.insert(&AlphaNumBytes::from("world")).await.unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await + .unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("world"))) + .await + .unwrap(); // Only one key in range let ret = store @@ -526,9 +623,8 @@ mod tests { let mut store = new_store().await; let key = AlphaNumBytes::from("hello"); let store_value = AlphaNumBytes::from("world"); - store.insert(&key).await.unwrap(); store - .store_value_for_key(&key, store_value.as_slice()) + .insert(ReconItem::new_with_value(&key, store_value.as_slice())) .await .unwrap(); let value = store.value_for_key(&key).await.unwrap().unwrap(); diff --git a/recon/src/recon/store_metrics.rs b/recon/src/recon/store_metrics.rs index caa6c7618..be96b2467 100644 --- a/recon/src/recon/store_metrics.rs +++ b/recon/src/recon/store_metrics.rs @@ -6,6 +6,8 @@ use tokio::time::Instant; use crate::{metrics::StoreQuery, recon::HashCount, AssociativeHash, Key, Metrics, Store}; +use super::{InsertResult, ReconItem}; + /// Implement the Store and record metrics #[derive(Debug)] pub struct StoreMetricsMiddleware { @@ -40,18 +42,19 @@ where type Key = K; type Hash = H; - async fn insert(&mut self, key: &Self::Key) -> Result { - StoreMetricsMiddleware::::record(self.metrics.clone(), "insert", self.store.insert(key)) + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { + StoreMetricsMiddleware::::record(self.metrics.clone(), "insert", self.store.insert(item)) .await } - async fn insert_many<'a, I>(&mut self, keys: I) -> Result + + async fn insert_many<'a, I>(&mut self, items: I) -> Result where - I: Iterator + Send, + I: ExactSizeIterator> + Send + Sync, { StoreMetricsMiddleware::::record( self.metrics.clone(), "insert_many", - self.store.insert_many(keys), + self.store.insert_many(items), ) .await } @@ -165,15 +168,6 @@ where .await } - async fn store_value_for_key(&mut self, key: &Self::Key, value: &[u8]) -> Result { - StoreMetricsMiddleware::::record( - self.metrics.clone(), - "store_value_for_key", - self.store.store_value_for_key(key, value), - ) - .await - } - async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { StoreMetricsMiddleware::::record( self.metrics.clone(), diff --git a/recon/src/recon/tests.rs b/recon/src/recon/tests.rs index 7c482e51c..db5ad52e0 100644 --- a/recon/src/recon/tests.rs +++ b/recon/src/recon/tests.rs @@ -40,7 +40,7 @@ use pretty::{Arena, DocAllocator, DocBuilder, Pretty}; use crate::{ protocol::{self, InitiatorMessage, ResponderMessage, ValueResponse}, - recon::{FullInterests, HashCount, InterestProvider, Range}, + recon::{FullInterests, HashCount, InterestProvider, Range, ReconItem}, tests::AlphaNumBytes, AssociativeHash, BTreeStore, Client, Key, Metrics, Recon, Server, Sha256a, Store, }; @@ -569,10 +569,12 @@ async fn word_lists() { ); for key in s.split([' ', '\n']).map(|s| s.to_string()) { if !s.is_empty() { - r.insert(&key.as_bytes().into()).await.unwrap(); - r.store_value_for_key(key.as_bytes().into(), key.to_uppercase().as_bytes().into()) - .await - .unwrap(); + r.insert(ReconItem::new( + &key.as_bytes().into(), + key.to_uppercase().as_bytes().into(), + )) + .await + .unwrap(); } } start_recon(r) @@ -1300,10 +1302,10 @@ async fn disjoint() { cat: [a: A, b: B, c: C, e: , f: , g: ] -> value_req(e) cat: [a: A, b: B, c: C, e: , f: , g: ] - <- value_resp(e: E) - dog: [a: A, b: B, c: C, e: E, f: F, g: G] -> value_req(f) cat: [a: A, b: B, c: C, e: , f: , g: ] + <- value_resp(e: E) + dog: [a: A, b: B, c: C, e: E, f: F, g: G] -> value_req(g) cat: [a: A, b: B, c: C, e: E, f: , g: ] <- value_resp(f: F) @@ -1480,10 +1482,10 @@ async fn paper() { cat: [ape: APE, bee: BEE, cot: COT, doe: , eel: EEL, fox: FOX, gnu: GNU] <- range_resp({gnu h(hog)#1 𝛀 }) dog: [ape: APE, bee: BEE, cot: COT, doe: DOE, eel: EEL, fox: FOX, gnu: GNU, hog: HOG] - -> listen_only - cat: [ape: APE, bee: BEE, cot: COT, doe: , eel: EEL, fox: FOX, gnu: GNU, hog: HOG] <- value_resp(doe: DOE) dog: [ape: APE, bee: BEE, cot: COT, doe: DOE, eel: EEL, fox: FOX, gnu: GNU, hog: HOG] + -> listen_only + cat: [ape: APE, bee: BEE, cot: COT, doe: , eel: EEL, fox: FOX, gnu: GNU, hog: HOG] <- listen_only dog: [ape: APE, bee: BEE, cot: COT, doe: DOE, eel: EEL, fox: FOX, gnu: GNU, hog: HOG] -> finished @@ -1727,18 +1729,18 @@ async fn alternating() { cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: , v: V, w: , x: X, y: , z: Z] -> value_req(u) cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: , v: V, w: , x: X, y: , z: Z] + <- value_resp(u: U) + dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] -> value_req(w) cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: , v: V, w: , x: X, y: , z: Z] - <- value_resp(u: U) + <- value_resp(w: W) dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] -> value_req(y) cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: , x: X, y: , z: Z] - <- value_resp(w: W) + <- value_resp(y: Y) dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] -> listen_only cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: , z: Z] - <- value_resp(y: Y) - dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] <- listen_only dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] -> finished @@ -1957,10 +1959,10 @@ async fn subset_interest() { dog: [b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, r: ] -> value_req(i) cat: [b: , c: C, e: , f: F, g: G, i: , m: , n: N, r: R] - -> value_req(m) - cat: [b: , c: C, d: D, e: , f: F, g: G, i: , m: , n: N, r: R] <- range_resp({c h(d)#1 e}) dog: [b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, r: ] + -> value_req(m) + cat: [b: , c: C, d: D, e: , f: F, g: G, i: , m: , n: N, r: R] -> value_resp(r: R) cat: [b: , c: C, d: D, e: , f: F, g: G, i: , m: , n: N, r: R] <- range_resp({e 0 f})