diff --git a/api/src/server.rs b/api/src/server.rs index 2b912c837..1c760ce84 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -2,6 +2,7 @@ #![allow(unused_imports)] +use std::time::Duration; use std::{future::Future, ops::Range}; use std::{marker::PhantomData, ops::RangeBounds}; use std::{net::SocketAddr, ops::Bound}; @@ -44,6 +45,20 @@ use tikv_jemalloc_ctl::{epoch, stats}; use crate::ResumeToken; +/// When the incoming events queue has at least this many items, we'll store them. +/// This imples when we're getting writes faster than the flush interval. +const EVENT_INSERT_QUEUE_SIZE: usize = 3; +/// How often we should flush the queue of events to the store. This applies when we have fewer than `EVENT_INSERT_QUEUE_SIZE` events, +/// in order to avoid stalling a single write from being processed for too long, while still reducing contention when we have a lot of writes. +/// This is quite low, but in my benchmarking adding a longer interval just slowed ingest down, without changing contention noticeably. +const FLUSH_INTERVAL_MS: u64 = 10; + +/// How long are we willing to wait to enqueue an insert to the database service loop before we tell the call it was full. +const INSERT_ENQUEUE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); +/// How long are we willing to wait for the database service to respond to an insert request before we tell the caller it was too slow. +/// Aborting and returning an error doesn't mean that the write won't be processed, only that the caller will get an error indicating it timed out. +const INSERT_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + // Helper to build responses consistent as we can't implement for the api_server::models directly pub struct BuildResponse {} impl BuildResponse { @@ -149,7 +164,8 @@ impl AccessInterestStore for Arc { #[async_trait] pub trait AccessModelStore: Send + Sync { /// Returns (new_key, new_value) where true if was newly inserted, false if it already existed. - async fn insert(&self, key: EventId, value: Option>) -> Result<(bool, bool)>; + async fn insert_many(&self, items: &[(EventId, Option>)]) + -> Result<(Vec, usize)>; async fn range_with_values( &self, start: &EventId, @@ -171,8 +187,11 @@ pub trait AccessModelStore: Send + Sync { #[async_trait::async_trait] impl AccessModelStore for Arc { - async fn insert(&self, key: EventId, value: Option>) -> Result<(bool, bool)> { - self.as_ref().insert(key, value).await + async fn insert_many( + &self, + items: &[(EventId, Option>)], + ) -> Result<(Vec, usize)> { + self.as_ref().insert_many(items).await } async fn range_with_values( @@ -202,30 +221,109 @@ impl AccessModelStore for Arc { } } +struct EventInsert { + id: EventId, + data: Vec, + tx: tokio::sync::oneshot::Sender>, +} + +struct InsertTask { + _handle: tokio::task::JoinHandle<()>, + tx: tokio::sync::mpsc::Sender, +} + #[derive(Clone)] pub struct Server { peer_id: PeerId, network: Network, interest: I, - model: M, + model: Arc, + // If we need to restart this ever, we'll need a mutex. For now we want to avoid locking the channel + // so we just keep track to gracefully shutdown, but if the task dies, the server is in a fatal error state. + insert_task: Arc, marker: PhantomData, } impl Server where I: AccessInterestStore, - M: AccessModelStore, + M: AccessModelStore + 'static, { - pub fn new(peer_id: PeerId, network: Network, interest: I, model: M) -> Self { + pub fn new(peer_id: PeerId, network: Network, interest: I, model: Arc) -> Self { + let (tx, event_rx) = tokio::sync::mpsc::channel::(1024); + let event_store = model.clone(); + + let handle = Self::start_insert_task(event_store, event_rx); + let insert_task = Arc::new(InsertTask { + _handle: handle, + tx, + }); Server { peer_id, network, interest, model, + insert_task, marker: PhantomData, } } + fn start_insert_task( + event_store: Arc, + mut event_rx: tokio::sync::mpsc::Receiver, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS)); + let mut events = vec![]; + // could bias towards processing the queue of events over accepting more, but we'll + // rely on the channel depth for backpressure. the goal is to keep the queue close to empty + // without processing one at a time. when we stop parsing the carfile in the store + // i.e. validate before sending here and this is just an insert, we may want to process more at once. + loop { + tokio::select! { + _ = interval.tick(), if !events.is_empty() => { + Self::process_events(&mut events, &event_store).await; + } + Some(req) = event_rx.recv() => { + events.push(req); + } + } + // make sure the events queue doesn't get too deep when we're under heavy load + if events.len() >= EVENT_INSERT_QUEUE_SIZE { + Self::process_events(&mut events, &event_store).await; + } + } + }) + } + + async fn process_events(events: &mut Vec, event_store: &Arc) { + let mut oneshots = Vec::with_capacity(events.len()); + let mut items = Vec::with_capacity(events.len()); + events.drain(..).for_each(|req: EventInsert| { + oneshots.push(req.tx); + items.push((req.id, Some(req.data))); + }); + tracing::trace!("calling insert many with {} items.", items.len()); + match event_store.insert_many(&items).await { + Ok((results, _)) => { + tracing::debug!("insert many returned {} results.", results.len()); + for (tx, result) in oneshots.into_iter().zip(results.into_iter()) { + if let Err(e) = tx.send(Ok(result)) { + tracing::warn!("failed to send success response to api listener: {:?}", e); + } + } + } + Err(e) => { + tracing::warn!("failed to insert events: {e}"); + for tx in oneshots.into_iter() { + if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}"))) { + tracing::warn!("failed to send failed response to api listener: {:?}", e); + } + } + } + }; + } + pub async fn get_event_feed( &self, resume_at: Option, @@ -298,10 +396,28 @@ where pub async fn post_events(&self, event: Event) -> Result { let event_id = decode_event_id(&event.id)?; let event_data = decode_event_data(&event.data)?; - self.model - .insert(event_id, Some(event_data)) + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::time::timeout( + INSERT_ENQUEUE_TIMEOUT, + self.insert_task.tx.send(EventInsert { + id: event_id, + data: event_data, + tx, + }), + ) + .map_err(|_| { + ErrorResponse::new("Database service queue is too full to accept requests".to_owned()) + }) + .await? + .map_err(|_| ErrorResponse::new("Database service not available".to_owned()))?; + + let _new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx) .await - .map_err(|err| ErrorResponse::new(format!("failed to insert key: {err}")))?; + .map_err(|_| { + ErrorResponse::new("Timeout waiting for database service response".to_owned()) + })? + .map_err(|_| ErrorResponse::new("No response. Database service crashed".to_owned()))? + .map_err(|e| ErrorResponse::new(format!("Failed to insert event: {e}")))?; Ok(EventsPostResponse::Success) } @@ -432,7 +548,7 @@ impl Api for Server where C: Send + Sync, I: AccessInterestStore + Sync, - M: AccessModelStore + Sync, + M: AccessModelStore + Sync + 'static, { #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] async fn liveness_get( diff --git a/api/src/tests.rs b/api/src/tests.rs index 4e541cc3e..b7fc5b973 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -18,6 +18,7 @@ use mockall::{mock, predicate}; use multibase::Base; use recon::Key; use std::str::FromStr; +use std::sync::Arc; use tracing_test::traced_test; struct Context; @@ -54,7 +55,7 @@ impl AccessInterestStore for MockReconInterestTest { } mock! { pub ReconModelTest { - fn insert(&self, key: EventId, value: Option>) -> Result<(bool, bool)>; + fn insert_many(&self, items: &[(EventId, Option>)]) -> Result<(Vec, usize)>; fn range_with_values( &self, start: &EventId, @@ -70,8 +71,11 @@ mock! { } #[async_trait] impl AccessModelStore for MockReconModelTest { - async fn insert(&self, key: EventId, value: Option>) -> Result<(bool, bool)> { - self.insert(key, value) + async fn insert_many( + &self, + items: &[(EventId, Option>)], + ) -> Result<(Vec, usize)> { + self.insert_many(items) } async fn range_with_values( &self, @@ -111,15 +115,14 @@ async fn create_event() { let event_data = "f".to_string(); let mock_interest = MockReconInterestTest::new(); let mut mock_model = MockReconModelTest::new(); + let event_data_arg = Some(decode_event_data(event_data.as_str()).unwrap()); + let args = vec![(event_id.clone(), event_data_arg)]; mock_model - .expect_insert() - .with( - predicate::eq(event_id), - predicate::eq(Some(decode_event_data(event_data.as_str()).unwrap())), - ) + .expect_insert_many() + .with(predicate::eq(args)) .times(1) - .returning(|_, _| Ok((true, true))); - let server = Server::new(peer_id, network, mock_interest, mock_model); + .returning(|_| Ok((vec![true], 1))); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let resp = server .events_post( models::Event { @@ -173,7 +176,7 @@ async fn register_interest_sort_value() { .times(1) .returning(|_, _| Ok(true)); let mock_model = MockReconModelTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let interest = models::Interest { sep: "model".to_string(), sep_value: model.to_owned(), @@ -195,7 +198,7 @@ async fn register_interest_sort_value_bad_request() { // Setup mock expectations let mock_interest = MockReconInterestTest::new(); let mock_model = MockReconModelTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let interest = models::Interest { sep: "model".to_string(), sep_value: model.to_owned(), @@ -246,7 +249,7 @@ async fn register_interest_sort_value_controller() { .times(1) .returning(|_, _| Ok(true)); let mock_model = MockReconModelTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let resp = server .interests_sort_key_sort_value_post( "model".to_string(), @@ -301,7 +304,7 @@ async fn register_interest_value_controller_stream() { .times(1) .returning(|_, _| Ok(true)); let mock_model = MockReconModelTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let resp = server .interests_sort_key_sort_value_post( "model".to_string(), @@ -355,7 +358,7 @@ async fn get_events_for_interest_range() { ) .times(1) .returning(|s, _, _, _| Ok(vec![(s.clone(), vec![])])); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let resp = server .events_sort_key_sort_value_get( "model".to_string(), @@ -401,7 +404,7 @@ async fn test_events_event_id_get_success() { .times(1) .returning(move |_| Ok(Some(event_data.clone()))); let mock_interest = MockReconInterestTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let result = server.events_event_id_get(event_id_str, &Context).await; let EventsEventIdGetResponse::Success(event) = result.unwrap() else { panic!("Expected EventsEventIdGetResponse::Success but got another variant"); diff --git a/one/src/events.rs b/one/src/events.rs index 80ebdbb06..4116f9965 100644 --- a/one/src/events.rs +++ b/one/src/events.rs @@ -369,7 +369,7 @@ async fn migrate_from_filesystem(input_ipfs_path: PathBuf, store: SqliteEventSto ); } - let result = store.put_block_tx(cid.hash(), &blob.into(), &mut tx).await; + let result = store.put_block_tx(cid.hash(), &blob, &mut tx).await; if result.is_err() { info!( "{} err: {} {:?}", @@ -547,7 +547,7 @@ mod tests { // Create the CID and store the block. let hash = Code::Sha2_256.digest(block.as_slice()); block_store - .put_block_tx(&hash, &block.into(), &mut tx) + .put_block_tx(&hash, &block, &mut tx) .await .unwrap(); } diff --git a/one/src/lib.rs b/one/src/lib.rs index aa18aca2c..71918b051 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -491,8 +491,12 @@ impl Daemon { // Build HTTP server let network = network.clone(); - let ceramic_server = - ceramic_api::Server::new(peer_id, network, interest_api_store, model_api_store); + let ceramic_server = ceramic_api::Server::new( + peer_id, + network, + interest_api_store, + Arc::new(model_api_store), + ); let ceramic_metrics = MetricsHandle::register(ceramic_api::Metrics::register); // Wrap server in metrics middleware let ceramic_server = ceramic_api::MetricsMiddleware::new(ceramic_server, ceramic_metrics); diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index 695443a94..71ced9622 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -39,8 +39,6 @@ use crate::{ const WANT_VALUES_BUFFER: usize = 10000; // Number of sync ranges to buffer. const SYNC_RANGES_BUFFER: usize = 1000; -// Number of message to buffer on the sink before flushing. -const SINK_BUFFER_COUNT: usize = 100; // Limit to the number of pending range requests. // Range requets grow logistically, meaning they grow // exponentially while splitting and then decay @@ -438,27 +436,24 @@ where .await?; } else { // Send as many as fit under the limit and buffer the rest - let mut fed = false; + let mut to_send = Vec::with_capacity(PENDING_RANGES_LIMIT); for range in ranges { if self.pending_ranges < PENDING_RANGES_LIMIT { - fed = true; self.pending_ranges += 1; - self.common - .stream - .feed( - self.common - .create_message(InitiatorMessage::RangeRequest(range)), - ) - .await?; + to_send.push( + self.common + .create_message(InitiatorMessage::RangeRequest(range)), + ); } else if self.ranges_stack.len() < self.ranges_stack.capacity() { self.ranges_stack.push(range); self.metrics.record(&RangeEnqueued); } else { + // blocked due to channel back pressure self.metrics.record(&RangeEnqueueFailed); } } - if fed { - self.common.stream.flush().await?; + if !to_send.is_empty() { + self.common.stream.send_all(to_send).await?; } }; Ok(()) @@ -810,7 +805,7 @@ where .context("value for key")?; if let Some(value) = value { self.stream - .feed((self.value_resp_fn)( + .send((self.value_resp_fn)( self.sync_id.clone(), ValueResponse { key, value }, )) @@ -840,19 +835,18 @@ where for key in keys { if let Some(value) = self.recon.value_for_key(key.clone()).await? { self.stream - .feed((self.value_resp_fn)( + .send((self.value_resp_fn)( self.sync_id.clone(), ValueResponse { key, value }, )) .await?; } } - self.stream.flush().await?; Ok(()) } async fn feed_want_value(&mut self, message: Out) -> Result<()> { self.stream - .feed(message) + .send(message) .await .context("feeding value request")?; Ok(()) @@ -931,33 +925,12 @@ impl SinkFlusher { self.feed_count = 0; Ok(()) } - async fn feed(&mut self, message: T) -> Result<()> - where - S: Sink, - E: std::error::Error + Send + Sync + 'static, - MessageLabels: for<'a> From<&'a T>, - { - self.feed_count += 1; - self.metrics.record(&MessageSent(&message)); - self.inner.feed(message).await?; - if self.feed_count > SINK_BUFFER_COUNT { - self.feed_count = 0; - self.flush().await?; - } - Ok(()) - } - async fn flush(&mut self) -> Result<()> - where - S: Sink, - E: std::error::Error + Send + Sync + 'static, - { - self.inner.flush().await.context("flushing") - } async fn close(&mut self) -> Result<()> where S: Sink, E: std::error::Error + Send + Sync + 'static, { + // sink `poll_close()` will flush self.inner.close().await.context("closing") } } diff --git a/store/benches/sqlite_store.rs b/store/benches/sqlite_store.rs index d1c4e072d..53ce6f91e 100644 --- a/store/benches/sqlite_store.rs +++ b/store/benches/sqlite_store.rs @@ -1,14 +1,13 @@ use ceramic_core::{EventId, Network}; -use ceramic_store::{ModelStore, SqlitePool}; +use ceramic_store::{SqliteEventStore, SqlitePool}; use cid::Cid; use criterion2::{criterion_group, criterion_main, BatchSize, Criterion}; use multihash::{Code, MultihashDigest}; use rand::RngCore; -use recon::{ReconItem, Sha256a, Store}; -use std::path::PathBuf; +use recon::{ReconItem, Store}; struct ModelSetup { - store: ModelStore, + store: SqliteEventStore, events: Vec<(EventId, Vec)>, } @@ -20,7 +19,7 @@ enum ModelType { fn generate_event_id(data: &[u8]) -> EventId { let cid = Cid::new_v1( 0x55, //RAW - Code::Sha2_256.digest(&data), + Code::Sha2_256.digest(data), ); EventId::new( &Network::Mainnet, @@ -35,7 +34,7 @@ fn generate_event_id(data: &[u8]) -> EventId { const INSERTION_COUNT: usize = 10_000; -async fn model_setup(dir: PathBuf, tpe: ModelType, cnt: usize) -> ModelSetup { +async fn model_setup(tpe: ModelType, cnt: usize) -> ModelSetup { let mut events = Vec::with_capacity(cnt); for _ in 0..cnt { let mut data = match tpe { @@ -49,7 +48,7 @@ async fn model_setup(dir: PathBuf, tpe: ModelType, cnt: usize) -> ModelSetup { rand::thread_rng().fill_bytes(&mut data); let event_id = generate_event_id(&data); let cid = event_id.cid().unwrap(); - let header = iroh_car::CarHeader::V1(iroh_car::CarHeaderV1::from(vec![cid.clone()])); + let header = iroh_car::CarHeader::V1(iroh_car::CarHeaderV1::from(vec![cid])); let writer = tokio::io::BufWriter::new(Vec::with_capacity(1024 * 1024)); let mut writer = iroh_car::CarWriter::new(header, writer); writer.write(cid, data.as_slice()).await.unwrap(); @@ -57,20 +56,17 @@ async fn model_setup(dir: PathBuf, tpe: ModelType, cnt: usize) -> ModelSetup { events.push((event_id, data)); } - let path = dir.join(format!("{}.db", uuid::Uuid::new_v4())); - let pool = SqlitePool::connect(format!("sqlite://{}", path.display())) - .await - .unwrap(); - let store = ModelStore::new(pool).await.unwrap(); + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let store = SqliteEventStore::new(pool).await.unwrap(); ModelSetup { store, events } } async fn model_routine(input: ModelSetup) { let futs = input.events.into_iter().map(|(event_id, data)| { - let mut store = input.store.clone(); + let store = input.store.clone(); async move { let event = ReconItem::new(&event_id, Some(data.as_slice())); - store.insert(event).await + store.insert(&event).await } }); futures::future::join_all(futs).await; @@ -80,15 +76,14 @@ fn small_model_inserts(c: &mut Criterion) { let exec = tokio::runtime::Runtime::new().unwrap(); let dir = exec.block_on(async move { tmpdir::TmpDir::new("ceramic_store").await.unwrap() }); let mut group = c.benchmark_group("small model inserts"); - let path = dir.to_path_buf(); group.bench_function("sqlite store", move |b| { b.to_async(&exec).iter_batched_async_setup( // setup - || async { model_setup(path.clone(), ModelType::Small, INSERTION_COUNT).await }, + || async { model_setup(ModelType::Small, INSERTION_COUNT).await }, // routine |input| async { model_routine(input).await }, // batch size - BatchSize::PerIteration, + BatchSize::SmallInput, ) }); group.finish(); @@ -106,11 +101,11 @@ fn large_model_inserts(c: &mut Criterion) { group.bench_function("sqlite store", |b| { b.to_async(&exec).iter_batched_async_setup( // setup - || async { model_setup(dir.to_path_buf(), ModelType::Large, INSERTION_COUNT).await }, + || async { model_setup(ModelType::Large, INSERTION_COUNT).await }, // routine |input| async { model_routine(input).await }, // batch size - BatchSize::PerIteration, + BatchSize::SmallInput, ) }); group.finish(); diff --git a/store/src/metrics.rs b/store/src/metrics.rs index d8c63a84a..23910062c 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -161,7 +161,7 @@ where async fn insert(&self, key: Interest) -> anyhow::Result { let new = StoreMetricsMiddleware::::record( &self.metrics, - "interest_insert", + "api_interest_insert", self.store.insert(key), ) .await?; @@ -177,7 +177,7 @@ where ) -> anyhow::Result> { StoreMetricsMiddleware::::record( &self.metrics, - "interest_range", + "api_interest_range", self.store.range(start, end, offset, limit), ) .await @@ -189,16 +189,28 @@ impl ceramic_api::AccessModelStore for StoreMetricsMiddleware where S: ceramic_api::AccessModelStore, { - async fn insert(&self, key: EventId, value: Option>) -> anyhow::Result<(bool, bool)> { - let (new_key, new_val) = StoreMetricsMiddleware::::record( + async fn insert_many( + &self, + items: &[(EventId, Option>)], + ) -> anyhow::Result<(Vec, usize)> { + let (new_keys, new_val) = StoreMetricsMiddleware::::record( &self.metrics, - "model_insert", - self.store.insert(key, value), + "api_insert_many", + self.store.insert_many(items), ) .await?; - self.record_key_insert(new_key, new_val); - Ok((new_key, new_val)) + let key_cnt = new_keys.iter().filter(|k| **k).count(); + + self.metrics.record(&InsertEvent { + type_: InsertEventType::Key, + cnt: key_cnt as u64, + }); + self.metrics.record(&InsertEvent { + type_: InsertEventType::Value, + cnt: new_val as u64, + }); + Ok((new_keys, new_val)) } async fn range_with_values( &self, @@ -209,7 +221,7 @@ where ) -> anyhow::Result)>> { StoreMetricsMiddleware::::record( &self.metrics, - "model_range_with_values", + "api_range_with_values", self.store.range_with_values(start, end, offset, limit), ) .await @@ -218,7 +230,7 @@ where async fn value_for_key(&self, key: &EventId) -> anyhow::Result>> { StoreMetricsMiddleware::::record( &self.metrics, - "model_value_for_key", + "api_value_for_key", self.store.value_for_key(key), ) .await @@ -231,7 +243,7 @@ where ) -> anyhow::Result<(i64, Vec)> { StoreMetricsMiddleware::::record( &self.metrics, - "model_keys_since_highwater_mark", + "api_keys_since_highwater_mark", self.store.keys_since_highwater_mark(highwater, limit), ) .await diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 379b9454a..9f5aca353 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -2,13 +2,14 @@ use std::num::TryFromIntError; use ceramic_core::EventId; use cid::Cid; -use iroh_car::{CarHeader, CarWriter}; +use iroh_car::{CarHeader, CarReader, CarWriter}; use itertools::{process_results, Itertools}; use multihash::Multihash; + use sqlx::{sqlite::SqliteRow, Row as _}; +use std::collections::BTreeSet; -use super::BlockRow; -use crate::{Error, Result}; +use crate::{sql::BlockRow, Error, Result}; pub async fn rebuild_car(blocks: Vec) -> Result>> { if blocks.is_empty() { @@ -87,20 +88,129 @@ impl ReconHash { } } -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct EventValueRaw { +use anyhow::anyhow; +use multihash_codetable::{Code, MultihashDigest}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::Type)] +pub struct BlockHash(Multihash<64>); + +impl BlockHash { + pub fn try_from_vec(data: &[u8]) -> Result { + Ok(Self(Multihash::from_bytes(data).map_err(Error::new_app)?)) + } + + pub fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + pub fn inner(&self) -> &Multihash<64> { + &self.0 + } +} + +#[derive(Debug, Clone)] +pub struct EventRaw { + pub order_key: EventId, + pub blocks: Vec, +} + +impl EventRaw { + pub fn new(key: EventId, blocks: Vec) -> Self { + Self { + order_key: key, + blocks, + } + } + + pub async fn try_build(key: EventId, val: &[u8]) -> Result { + let mut reader = CarReader::new(val) + .await + .map_err(|e| Error::new_app(anyhow!(e)))?; + let roots: BTreeSet = reader.header().roots().iter().cloned().collect(); + let mut idx = 0; + let mut blocks = vec![]; + while let Some((cid, data)) = reader.next_block().await.map_err(Error::new_app)? { + let ebr = EventBlockRaw::try_new(&key, idx, roots.contains(&cid), cid, data) + .map_err(Error::from)?; + blocks.push(ebr); + idx += 1; + } + Ok(Self::new(key, blocks)) + } +} + +#[derive(Debug, Clone)] +pub struct EventBlockRaw { pub order_key: Vec, pub codec: i64, pub root: bool, pub idx: i32, - pub multihash: Vec, + pub multihash: BlockHash, pub bytes: Vec, } -impl EventValueRaw { +impl sqlx::FromRow<'_, SqliteRow> for EventBlockRaw { + fn from_row(row: &SqliteRow) -> std::result::Result { + let multihash: Vec = row.try_get("multihash")?; + let multihash = + BlockHash::try_from_vec(&multihash).map_err(|e| sqlx::Error::Decode(Box::new(e)))?; + Ok(Self { + order_key: row.try_get("order_key")?, + codec: row.try_get("codec")?, + root: row.try_get("root")?, + idx: row.try_get("idx")?, + multihash, + bytes: row.try_get("bytes")?, + }) + } +} + +impl EventBlockRaw { + pub fn try_new(key: &EventId, idx: i32, root: bool, cid: Cid, bytes: Vec) -> Result { + let multihash = match cid.hash().code() { + 0x12 => Code::Sha2_256.digest(&bytes), + 0x1b => Code::Keccak256.digest(&bytes), + 0x11 => return Err(Error::new_app(anyhow!("Sha1 not supported"))), + code => { + return Err(Error::new_app(anyhow!( + "multihash type {:#x} not Sha2_256, Keccak256", + code, + ))) + } + }; + + if cid.hash().to_bytes() != multihash.to_bytes() { + return Err(Error::new_app(anyhow!( + "cid did not match blob {} != {}", + hex::encode(cid.hash().to_bytes()), + hex::encode(multihash.to_bytes()) + ))); + } + + let codec: i64 = cid.codec().try_into().map_err(|e: TryFromIntError| { + Error::new_app(anyhow!(e).context(format!( + "Invalid codec could not fit into an i64: {}", + cid.codec() + ))) + })?; + let order_key = key + .cid() + .ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))? + .to_bytes(); + + Ok(Self { + order_key, + codec, + root, + idx, + multihash: BlockHash(multihash), + bytes, + }) + } + pub fn into_block_row(self) -> Result<(EventId, BlockRow)> { let id = EventId::try_from(self.order_key).map_err(Error::new_app)?; - let hash = Multihash::from_bytes(&self.multihash[..]).map_err(Error::new_app)?; + let hash = self.multihash.inner().to_owned(); let code = self.codec.try_into().map_err(|e: TryFromIntError| { let er = anyhow::anyhow!(e).context(format!("Invalid codec: {}", self.codec)); Error::new_app(er) diff --git a/store/src/sql/entities/mod.rs b/store/src/sql/entities/mod.rs index d230759ed..12f0fbea4 100644 --- a/store/src/sql/entities/mod.rs +++ b/store/src/sql/entities/mod.rs @@ -3,7 +3,9 @@ mod event; mod query; pub use block::{BlockBytes, BlockRow}; -pub use event::{rebuild_car, CountRow, DeliveredEvent, EventValueRaw, OrderKey, ReconHash}; +pub use event::{ + rebuild_car, CountRow, DeliveredEvent, EventBlockRaw, EventRaw, OrderKey, ReconHash, +}; pub use query::{BlockQuery, EventBlockQuery, EventQuery, ReconQuery, ReconType, SqlBackend}; #[derive(Debug, Clone, sqlx::FromRow)] diff --git a/store/src/sql/entities/query.rs b/store/src/sql/entities/query.rs index 7863cd176..5fc6a860c 100644 --- a/store/src/sql/entities/query.rs +++ b/store/src/sql/entities/query.rs @@ -23,7 +23,7 @@ impl BlockQuery { pub struct EventQuery; impl EventQuery { - /// Requires binding 1 parameter. Finds the `EventValueRaw` values needed to rebuild the event + /// Requires binding 1 parameter. Finds the `EventBlockRaw` values needed to rebuild the event pub fn value_blocks_one() -> &'static str { r#"SELECT e.order_key, eb.codec, eb.root, b.multihash, b.bytes @@ -34,7 +34,7 @@ impl EventQuery { ORDER BY eb.idx;"# } - /// Requires binding 4 parameters. Finds the `EventValueRaw` values needed to rebuild the event + /// Requires binding 4 parameters. Finds the `EventBlockRaw` values needed to rebuild the event pub fn value_blocks_many() -> &'static str { r#"SELECT key.order_key, eb.codec, eb.root, eb.idx, b.multihash, b.bytes diff --git a/store/src/sql/event.rs b/store/src/sql/event.rs index b03d8e897..de53b2f6d 100644 --- a/store/src/sql/event.rs +++ b/store/src/sql/event.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeSet, num::TryFromIntError}; +use std::num::TryFromIntError; use anyhow::anyhow; use async_trait::async_trait; @@ -7,8 +7,7 @@ use ceramic_core::{event_id::InvalidEventId, EventId, RangeOpen}; use cid::Cid; use iroh_bitswap::Block; -use iroh_car::CarReader; -use multihash_codetable::{Code, Multihash, MultihashDigest}; +use multihash_codetable::Multihash; use recon::{ AssociativeHash, Error as ReconError, HashCount, InsertResult, Key, ReconItem, Result as ReconResult, Sha256a, @@ -19,8 +18,8 @@ use tracing::instrument; use crate::{ sql::{ rebuild_car, BlockBytes, BlockQuery, BlockRow, CountRow, DeliveredEvent, EventBlockQuery, - EventQuery, EventValueRaw, FirstAndLast, OrderKey, ReconHash, ReconQuery, ReconType, - SqlBackend, GLOBAL_COUNTER, + EventBlockRaw, EventQuery, EventRaw, FirstAndLast, OrderKey, ReconHash, ReconQuery, + ReconType, SqlBackend, GLOBAL_COUNTER, }, DbTxSqlite, Error, Result, SqlitePool, }; @@ -60,7 +59,7 @@ impl SqliteEventStore { /// Begin a database transaction. pub async fn begin_tx(&self) -> Result> { - self.pool.tx().await + Ok(self.pool.writer().begin().await?) } /// Commit the database transaction. @@ -68,13 +67,6 @@ impl SqliteEventStore { Ok(tx.commit().await?) } - async fn insert_item(&self, item: &ReconItem<'_, EventId>) -> Result<(bool, bool)> { - let mut tx = self.pool.writer().begin().await?; - let (new_key, new_val) = self.insert_item_int(item, &mut tx).await?; - tx.commit().await?; - Ok((new_key, new_val)) - } - async fn range_with_values_int( &self, left_fencepost: &EventId, @@ -84,7 +76,7 @@ impl SqliteEventStore { ) -> Result)> + Send + 'static>> { let offset = offset.try_into().unwrap_or(i64::MAX); let limit: i64 = limit.try_into().unwrap_or(i64::MAX); - let all_blocks: Vec = sqlx::query_as(EventQuery::value_blocks_many()) + let all_blocks: Vec = sqlx::query_as(EventQuery::value_blocks_many()) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) .bind(limit) @@ -92,7 +84,7 @@ impl SqliteEventStore { .fetch_all(self.pool.reader()) .await?; - let values = EventValueRaw::into_carfiles(all_blocks).await?; + let values = EventBlockRaw::into_carfiles(all_blocks).await?; Ok(Box::new(values.into_iter())) } @@ -127,47 +119,59 @@ impl SqliteEventStore { rebuild_car(blocks).await } - /// returns (new_key, new_val) tuple - async fn insert_item_int( - &self, - item: &ReconItem<'_, EventId>, - conn: &mut DbTxSqlite<'_>, - ) -> Result<(bool, bool)> { - // We make sure the key exists as we require it as an FK to add the event_block record. - let new_key = self.insert_key_int(item.key, conn).await?; - - if let Some(val) = item.value { - // Put each block from the car file. Should we check if value already existed and skip this? - // It will no-op but will still try to insert the blocks again - let mut reader = CarReader::new(val) - .await - .map_err(|e| Error::new_app(anyhow!(e)))?; - let roots: BTreeSet = reader.header().roots().iter().cloned().collect(); - let mut idx = 0; - while let Some((cid, data)) = reader - .next_block() - .await - .map_err(|e| Error::new_app(anyhow!(e)))? - { - self.insert_event_block_int( - item.key, - idx, - roots.contains(&cid), - cid, - &data.into(), - conn, - ) - .await?; - idx += 1; + async fn insert_item_int(&self, item: ReconItem<'_, EventId>) -> Result<(bool, bool)> { + let new_val = item.value.is_some(); + let res = self.insert_items_int(&[item]).await?; + let new_key = res.keys.first().cloned().unwrap_or(false); + Ok((new_key, new_val)) + } + + /// Insert many items into the store (internal to the store) + async fn insert_items_int(&self, items: &[ReconItem<'_, EventId>]) -> Result { + if items.is_empty() { + return Ok(InsertResult::new(vec![], 0)); + } + let mut to_add = vec![]; + for item in items { + if let Some(val) = item.value { + match EventRaw::try_build(item.key.to_owned(), val).await { + Ok(parsed) => to_add.push(parsed), + Err(error) => { + tracing::warn!(%error, order_key=%item.key, "Error parsing event into carfile"); + continue; + } + } + } else { + to_add.push(EventRaw::new(item.key.clone(), vec![])); + } + } + if to_add.is_empty() { + return Ok(InsertResult::new(vec![], 0)); + } + let mut new_keys = vec![false; to_add.len()]; + let mut new_val_cnt = 0; + let mut tx = self.pool.writer().begin().await.map_err(Error::from)?; + + for (idx, item) in to_add.into_iter().enumerate() { + let new_key = self.insert_key_int(&item.order_key, &mut tx).await?; + for block in item.blocks.iter() { + self.insert_event_block_int(block, &mut tx).await?; + self.mark_ready_to_deliver(&item.order_key, &mut tx).await?; + } + new_keys[idx] = new_key; + if !item.blocks.is_empty() { + new_val_cnt += 1; } - self.mark_ready_to_deliver(item.key, conn).await?; } - Ok((new_key, item.value.is_some())) + tx.commit().await.map_err(Error::from)?; + let res = InsertResult::new(new_keys, new_val_cnt); + + Ok(res) } /// Add a block, returns true if the block is new pub async fn put_block(&self, hash: &Multihash, blob: &Bytes) -> Result { - let mut tx = self.pool.tx().await?; + let mut tx = self.pool.writer().begin().await?; let res = self.put_block_tx(hash, blob, &mut tx).await?; tx.commit().await?; Ok(res) @@ -177,12 +181,12 @@ impl SqliteEventStore { pub async fn put_block_tx( &self, hash: &Multihash, - blob: &Bytes, + blob: &[u8], conn: &mut DbTxSqlite<'_>, ) -> Result { let resp = sqlx::query(BlockQuery::put()) .bind(hash.to_bytes()) - .bind(blob.to_vec()) + .bind(blob) .execute(&mut **conn) .await; @@ -202,51 +206,19 @@ impl SqliteEventStore { // store a block in the db. async fn insert_event_block_int( &self, - key: &EventId, - idx: i32, - root: bool, - cid: Cid, - blob: &Bytes, + ev_block: &EventBlockRaw, conn: &mut DbTxSqlite<'_>, ) -> Result<()> { - let hash = match cid.hash().code() { - 0x12 => Code::Sha2_256.digest(blob), - 0x1b => Code::Keccak256.digest(blob), - 0x11 => return Err(Error::new_app(anyhow!("Sha1 not supported"))), - code => { - return Err(Error::new_app(anyhow!( - "multihash type {:#x} not Sha2_256, Keccak256", - code, - ))) - } - }; - if cid.hash().to_bytes() != hash.to_bytes() { - return Err(Error::new_app(anyhow!( - "cid did not match blob {} != {}", - hex::encode(cid.hash().to_bytes()), - hex::encode(hash.to_bytes()) - ))); - } - - let _new = self.put_block_tx(&hash, blob, conn).await?; + let _new = self + .put_block_tx(ev_block.multihash.inner(), &ev_block.bytes, conn) + .await?; - let code: i64 = cid.codec().try_into().map_err(|e: TryFromIntError| { - Error::new_app(anyhow!(e).context(format!( - "Invalid codec could not fit into an i64: {}", - cid.codec() - ))) - })?; - let id = key - .cid() - .ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))? - .to_bytes(); - let multihash = hash.to_bytes(); sqlx::query(EventBlockQuery::upsert()) - .bind(id) - .bind(idx) - .bind(root) - .bind(multihash) - .bind(code) + .bind(&ev_block.order_key) + .bind(ev_block.idx) + .bind(ev_block.root) + .bind(ev_block.multihash.to_bytes()) + .bind(ev_block.codec) .execute(&mut **conn) .await?; Ok(()) @@ -347,8 +319,8 @@ impl recon::Store for SqliteEventStore { /// Returns true if the key was new. The value is always updated if included async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { - let (new, _new_val) = self.insert_item(item).await?; - Ok(new) + let (res, _new_val) = self.insert_item_int(item.to_owned()).await?; + Ok(res) } /// Insert new keys into the key space. @@ -357,19 +329,8 @@ impl recon::Store for SqliteEventStore { match items.len() { 0 => Ok(InsertResult::new(vec![], 0)), _ => { - let mut results = vec![false; items.len()]; - let mut new_val_cnt = 0; - let mut tx = self.pool.writer().begin().await.map_err(Error::from)?; - - for (idx, item) in items.iter().enumerate() { - let (new_key, new_val) = self.insert_item_int(item, &mut tx).await?; - results[idx] = new_key; - if new_val { - new_val_cnt += 1; - } - } - tx.commit().await.map_err(Error::from)?; - Ok(InsertResult::new(results, new_val_cnt)) + let res = self.insert_items_int(items).await?; + Ok(res) } } } @@ -564,10 +525,16 @@ impl iroh_bitswap::Store for SqliteEventStore { /// This guarantees that regardless of entry point (api or recon), the data is stored and retrieved in the same way. #[async_trait::async_trait] impl ceramic_api::AccessModelStore for SqliteEventStore { - async fn insert(&self, key: EventId, value: Option>) -> anyhow::Result<(bool, bool)> { - Ok(self - .insert_item(&ReconItem::new(&key, value.as_deref())) - .await?) + async fn insert_many( + &self, + items: &[(EventId, Option>)], + ) -> anyhow::Result<(Vec, usize)> { + let items = items + .iter() + .map(|(key, value)| ReconItem::new(key, value.as_deref())) + .collect::>>(); + let res = self.insert_items_int(&items).await?; + Ok((res.keys, res.value_count)) } async fn range_with_values( diff --git a/store/src/sql/sqlite.rs b/store/src/sql/sqlite.rs index 1eaebce00..eea3d99fa 100644 --- a/store/src/sql/sqlite.rs +++ b/store/src/sql/sqlite.rs @@ -68,12 +68,6 @@ impl SqlitePool { &self.writer } - /// Get a writer tranaction. The writer pool has only one connection so this is an exclusive lock. - /// Use this method to perform simultaneous writes to the database, calling `commit` when you are done. - pub async fn tx(&self) -> Result { - Ok(self.writer.begin().await?) - } - /// Get a reference to the reader database pool. The reader pool has many connections. pub fn reader(&self) -> &sqlx::SqlitePool { &self.reader diff --git a/store/src/tests/event.rs b/store/src/tests/event.rs index df91737be..e5df11b87 100644 --- a/store/src/tests/event.rs +++ b/store/src/tests/event.rs @@ -640,7 +640,7 @@ async fn prep_highwater_tests(store: &dyn AccessModelStore) -> (EventId, EventId let (_blocks, store_value) = build_car_file(x).await; assert_eq!(_blocks.len(), x); store - .insert(key.to_owned(), Some(store_value)) + .insert_many(&[(key.to_owned(), Some(store_value))]) .await .unwrap(); }