From 6611fb55421f89d63a5a171aecade04c401a0a83 Mon Sep 17 00:00:00 2001 From: David Estes <5317198+dav1do@users.noreply.github.com> Date: Tue, 30 Apr 2024 14:51:56 -0600 Subject: [PATCH] fix: changes to improve sync and ingest performance (#324) * fix: flush sink messages more aggressively * chore: fix benchmark * fix: parse event value carfile before taking tx lock * chore: rename api store metrics and remove unnecessary fn * fix: use send_all instead of send when returning ranges * fix: add task to api server than batch inserts new events reduces lock contention and seems to be about an order of magnitude faster sql requests (overall throughput doesn't change dramatically on my machine) * chore: remove sample code that shouldn't have been committed * chore: PR review feedback use constants for timeout values and remove traced_test dependency from api * chore: PR feedback fix select! branch that's only an if statement and use traced_test --- api/src/server.rs | 136 +++++++++++++++++++++-- api/src/tests.rs | 35 +++--- one/src/events.rs | 4 +- one/src/lib.rs | 8 +- recon/src/protocol.rs | 51 ++------- store/benches/sqlite_store.rs | 33 +++--- store/src/metrics.rs | 34 ++++-- store/src/sql/entities/event.rs | 126 +++++++++++++++++++-- store/src/sql/entities/mod.rs | 4 +- store/src/sql/entities/query.rs | 4 +- store/src/sql/event.rs | 191 +++++++++++++------------------- store/src/sql/sqlite.rs | 6 - store/src/tests/event.rs | 2 +- 13 files changed, 405 insertions(+), 229 deletions(-) diff --git a/api/src/server.rs b/api/src/server.rs index 2b912c837..1c760ce84 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -2,6 +2,7 @@ #![allow(unused_imports)] +use std::time::Duration; use std::{future::Future, ops::Range}; use std::{marker::PhantomData, ops::RangeBounds}; use std::{net::SocketAddr, ops::Bound}; @@ -44,6 +45,20 @@ use tikv_jemalloc_ctl::{epoch, stats}; use crate::ResumeToken; +/// When the incoming events queue has at least this many items, we'll store them. +/// This imples when we're getting writes faster than the flush interval. +const EVENT_INSERT_QUEUE_SIZE: usize = 3; +/// How often we should flush the queue of events to the store. This applies when we have fewer than `EVENT_INSERT_QUEUE_SIZE` events, +/// in order to avoid stalling a single write from being processed for too long, while still reducing contention when we have a lot of writes. +/// This is quite low, but in my benchmarking adding a longer interval just slowed ingest down, without changing contention noticeably. +const FLUSH_INTERVAL_MS: u64 = 10; + +/// How long are we willing to wait to enqueue an insert to the database service loop before we tell the call it was full. +const INSERT_ENQUEUE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); +/// How long are we willing to wait for the database service to respond to an insert request before we tell the caller it was too slow. +/// Aborting and returning an error doesn't mean that the write won't be processed, only that the caller will get an error indicating it timed out. +const INSERT_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + // Helper to build responses consistent as we can't implement for the api_server::models directly pub struct BuildResponse {} impl BuildResponse { @@ -149,7 +164,8 @@ impl AccessInterestStore for Arc { #[async_trait] pub trait AccessModelStore: Send + Sync { /// Returns (new_key, new_value) where true if was newly inserted, false if it already existed. - async fn insert(&self, key: EventId, value: Option>) -> Result<(bool, bool)>; + async fn insert_many(&self, items: &[(EventId, Option>)]) + -> Result<(Vec, usize)>; async fn range_with_values( &self, start: &EventId, @@ -171,8 +187,11 @@ pub trait AccessModelStore: Send + Sync { #[async_trait::async_trait] impl AccessModelStore for Arc { - async fn insert(&self, key: EventId, value: Option>) -> Result<(bool, bool)> { - self.as_ref().insert(key, value).await + async fn insert_many( + &self, + items: &[(EventId, Option>)], + ) -> Result<(Vec, usize)> { + self.as_ref().insert_many(items).await } async fn range_with_values( @@ -202,30 +221,109 @@ impl AccessModelStore for Arc { } } +struct EventInsert { + id: EventId, + data: Vec, + tx: tokio::sync::oneshot::Sender>, +} + +struct InsertTask { + _handle: tokio::task::JoinHandle<()>, + tx: tokio::sync::mpsc::Sender, +} + #[derive(Clone)] pub struct Server { peer_id: PeerId, network: Network, interest: I, - model: M, + model: Arc, + // If we need to restart this ever, we'll need a mutex. For now we want to avoid locking the channel + // so we just keep track to gracefully shutdown, but if the task dies, the server is in a fatal error state. + insert_task: Arc, marker: PhantomData, } impl Server where I: AccessInterestStore, - M: AccessModelStore, + M: AccessModelStore + 'static, { - pub fn new(peer_id: PeerId, network: Network, interest: I, model: M) -> Self { + pub fn new(peer_id: PeerId, network: Network, interest: I, model: Arc) -> Self { + let (tx, event_rx) = tokio::sync::mpsc::channel::(1024); + let event_store = model.clone(); + + let handle = Self::start_insert_task(event_store, event_rx); + let insert_task = Arc::new(InsertTask { + _handle: handle, + tx, + }); Server { peer_id, network, interest, model, + insert_task, marker: PhantomData, } } + fn start_insert_task( + event_store: Arc, + mut event_rx: tokio::sync::mpsc::Receiver, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS)); + let mut events = vec![]; + // could bias towards processing the queue of events over accepting more, but we'll + // rely on the channel depth for backpressure. the goal is to keep the queue close to empty + // without processing one at a time. when we stop parsing the carfile in the store + // i.e. validate before sending here and this is just an insert, we may want to process more at once. + loop { + tokio::select! { + _ = interval.tick(), if !events.is_empty() => { + Self::process_events(&mut events, &event_store).await; + } + Some(req) = event_rx.recv() => { + events.push(req); + } + } + // make sure the events queue doesn't get too deep when we're under heavy load + if events.len() >= EVENT_INSERT_QUEUE_SIZE { + Self::process_events(&mut events, &event_store).await; + } + } + }) + } + + async fn process_events(events: &mut Vec, event_store: &Arc) { + let mut oneshots = Vec::with_capacity(events.len()); + let mut items = Vec::with_capacity(events.len()); + events.drain(..).for_each(|req: EventInsert| { + oneshots.push(req.tx); + items.push((req.id, Some(req.data))); + }); + tracing::trace!("calling insert many with {} items.", items.len()); + match event_store.insert_many(&items).await { + Ok((results, _)) => { + tracing::debug!("insert many returned {} results.", results.len()); + for (tx, result) in oneshots.into_iter().zip(results.into_iter()) { + if let Err(e) = tx.send(Ok(result)) { + tracing::warn!("failed to send success response to api listener: {:?}", e); + } + } + } + Err(e) => { + tracing::warn!("failed to insert events: {e}"); + for tx in oneshots.into_iter() { + if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}"))) { + tracing::warn!("failed to send failed response to api listener: {:?}", e); + } + } + } + }; + } + pub async fn get_event_feed( &self, resume_at: Option, @@ -298,10 +396,28 @@ where pub async fn post_events(&self, event: Event) -> Result { let event_id = decode_event_id(&event.id)?; let event_data = decode_event_data(&event.data)?; - self.model - .insert(event_id, Some(event_data)) + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::time::timeout( + INSERT_ENQUEUE_TIMEOUT, + self.insert_task.tx.send(EventInsert { + id: event_id, + data: event_data, + tx, + }), + ) + .map_err(|_| { + ErrorResponse::new("Database service queue is too full to accept requests".to_owned()) + }) + .await? + .map_err(|_| ErrorResponse::new("Database service not available".to_owned()))?; + + let _new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx) .await - .map_err(|err| ErrorResponse::new(format!("failed to insert key: {err}")))?; + .map_err(|_| { + ErrorResponse::new("Timeout waiting for database service response".to_owned()) + })? + .map_err(|_| ErrorResponse::new("No response. Database service crashed".to_owned()))? + .map_err(|e| ErrorResponse::new(format!("Failed to insert event: {e}")))?; Ok(EventsPostResponse::Success) } @@ -432,7 +548,7 @@ impl Api for Server where C: Send + Sync, I: AccessInterestStore + Sync, - M: AccessModelStore + Sync, + M: AccessModelStore + Sync + 'static, { #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] async fn liveness_get( diff --git a/api/src/tests.rs b/api/src/tests.rs index 4e541cc3e..b7fc5b973 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -18,6 +18,7 @@ use mockall::{mock, predicate}; use multibase::Base; use recon::Key; use std::str::FromStr; +use std::sync::Arc; use tracing_test::traced_test; struct Context; @@ -54,7 +55,7 @@ impl AccessInterestStore for MockReconInterestTest { } mock! { pub ReconModelTest { - fn insert(&self, key: EventId, value: Option>) -> Result<(bool, bool)>; + fn insert_many(&self, items: &[(EventId, Option>)]) -> Result<(Vec, usize)>; fn range_with_values( &self, start: &EventId, @@ -70,8 +71,11 @@ mock! { } #[async_trait] impl AccessModelStore for MockReconModelTest { - async fn insert(&self, key: EventId, value: Option>) -> Result<(bool, bool)> { - self.insert(key, value) + async fn insert_many( + &self, + items: &[(EventId, Option>)], + ) -> Result<(Vec, usize)> { + self.insert_many(items) } async fn range_with_values( &self, @@ -111,15 +115,14 @@ async fn create_event() { let event_data = "f".to_string(); let mock_interest = MockReconInterestTest::new(); let mut mock_model = MockReconModelTest::new(); + let event_data_arg = Some(decode_event_data(event_data.as_str()).unwrap()); + let args = vec![(event_id.clone(), event_data_arg)]; mock_model - .expect_insert() - .with( - predicate::eq(event_id), - predicate::eq(Some(decode_event_data(event_data.as_str()).unwrap())), - ) + .expect_insert_many() + .with(predicate::eq(args)) .times(1) - .returning(|_, _| Ok((true, true))); - let server = Server::new(peer_id, network, mock_interest, mock_model); + .returning(|_| Ok((vec![true], 1))); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let resp = server .events_post( models::Event { @@ -173,7 +176,7 @@ async fn register_interest_sort_value() { .times(1) .returning(|_, _| Ok(true)); let mock_model = MockReconModelTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let interest = models::Interest { sep: "model".to_string(), sep_value: model.to_owned(), @@ -195,7 +198,7 @@ async fn register_interest_sort_value_bad_request() { // Setup mock expectations let mock_interest = MockReconInterestTest::new(); let mock_model = MockReconModelTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let interest = models::Interest { sep: "model".to_string(), sep_value: model.to_owned(), @@ -246,7 +249,7 @@ async fn register_interest_sort_value_controller() { .times(1) .returning(|_, _| Ok(true)); let mock_model = MockReconModelTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let resp = server .interests_sort_key_sort_value_post( "model".to_string(), @@ -301,7 +304,7 @@ async fn register_interest_value_controller_stream() { .times(1) .returning(|_, _| Ok(true)); let mock_model = MockReconModelTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let resp = server .interests_sort_key_sort_value_post( "model".to_string(), @@ -355,7 +358,7 @@ async fn get_events_for_interest_range() { ) .times(1) .returning(|s, _, _, _| Ok(vec![(s.clone(), vec![])])); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let resp = server .events_sort_key_sort_value_get( "model".to_string(), @@ -401,7 +404,7 @@ async fn test_events_event_id_get_success() { .times(1) .returning(move |_| Ok(Some(event_data.clone()))); let mock_interest = MockReconInterestTest::new(); - let server = Server::new(peer_id, network, mock_interest, mock_model); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model)); let result = server.events_event_id_get(event_id_str, &Context).await; let EventsEventIdGetResponse::Success(event) = result.unwrap() else { panic!("Expected EventsEventIdGetResponse::Success but got another variant"); diff --git a/one/src/events.rs b/one/src/events.rs index 80ebdbb06..4116f9965 100644 --- a/one/src/events.rs +++ b/one/src/events.rs @@ -369,7 +369,7 @@ async fn migrate_from_filesystem(input_ipfs_path: PathBuf, store: SqliteEventSto ); } - let result = store.put_block_tx(cid.hash(), &blob.into(), &mut tx).await; + let result = store.put_block_tx(cid.hash(), &blob, &mut tx).await; if result.is_err() { info!( "{} err: {} {:?}", @@ -547,7 +547,7 @@ mod tests { // Create the CID and store the block. let hash = Code::Sha2_256.digest(block.as_slice()); block_store - .put_block_tx(&hash, &block.into(), &mut tx) + .put_block_tx(&hash, &block, &mut tx) .await .unwrap(); } diff --git a/one/src/lib.rs b/one/src/lib.rs index aa18aca2c..71918b051 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -491,8 +491,12 @@ impl Daemon { // Build HTTP server let network = network.clone(); - let ceramic_server = - ceramic_api::Server::new(peer_id, network, interest_api_store, model_api_store); + let ceramic_server = ceramic_api::Server::new( + peer_id, + network, + interest_api_store, + Arc::new(model_api_store), + ); let ceramic_metrics = MetricsHandle::register(ceramic_api::Metrics::register); // Wrap server in metrics middleware let ceramic_server = ceramic_api::MetricsMiddleware::new(ceramic_server, ceramic_metrics); diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index 695443a94..71ced9622 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -39,8 +39,6 @@ use crate::{ const WANT_VALUES_BUFFER: usize = 10000; // Number of sync ranges to buffer. const SYNC_RANGES_BUFFER: usize = 1000; -// Number of message to buffer on the sink before flushing. -const SINK_BUFFER_COUNT: usize = 100; // Limit to the number of pending range requests. // Range requets grow logistically, meaning they grow // exponentially while splitting and then decay @@ -438,27 +436,24 @@ where .await?; } else { // Send as many as fit under the limit and buffer the rest - let mut fed = false; + let mut to_send = Vec::with_capacity(PENDING_RANGES_LIMIT); for range in ranges { if self.pending_ranges < PENDING_RANGES_LIMIT { - fed = true; self.pending_ranges += 1; - self.common - .stream - .feed( - self.common - .create_message(InitiatorMessage::RangeRequest(range)), - ) - .await?; + to_send.push( + self.common + .create_message(InitiatorMessage::RangeRequest(range)), + ); } else if self.ranges_stack.len() < self.ranges_stack.capacity() { self.ranges_stack.push(range); self.metrics.record(&RangeEnqueued); } else { + // blocked due to channel back pressure self.metrics.record(&RangeEnqueueFailed); } } - if fed { - self.common.stream.flush().await?; + if !to_send.is_empty() { + self.common.stream.send_all(to_send).await?; } }; Ok(()) @@ -810,7 +805,7 @@ where .context("value for key")?; if let Some(value) = value { self.stream - .feed((self.value_resp_fn)( + .send((self.value_resp_fn)( self.sync_id.clone(), ValueResponse { key, value }, )) @@ -840,19 +835,18 @@ where for key in keys { if let Some(value) = self.recon.value_for_key(key.clone()).await? { self.stream - .feed((self.value_resp_fn)( + .send((self.value_resp_fn)( self.sync_id.clone(), ValueResponse { key, value }, )) .await?; } } - self.stream.flush().await?; Ok(()) } async fn feed_want_value(&mut self, message: Out) -> Result<()> { self.stream - .feed(message) + .send(message) .await .context("feeding value request")?; Ok(()) @@ -931,33 +925,12 @@ impl SinkFlusher { self.feed_count = 0; Ok(()) } - async fn feed(&mut self, message: T) -> Result<()> - where - S: Sink, - E: std::error::Error + Send + Sync + 'static, - MessageLabels: for<'a> From<&'a T>, - { - self.feed_count += 1; - self.metrics.record(&MessageSent(&message)); - self.inner.feed(message).await?; - if self.feed_count > SINK_BUFFER_COUNT { - self.feed_count = 0; - self.flush().await?; - } - Ok(()) - } - async fn flush(&mut self) -> Result<()> - where - S: Sink, - E: std::error::Error + Send + Sync + 'static, - { - self.inner.flush().await.context("flushing") - } async fn close(&mut self) -> Result<()> where S: Sink, E: std::error::Error + Send + Sync + 'static, { + // sink `poll_close()` will flush self.inner.close().await.context("closing") } } diff --git a/store/benches/sqlite_store.rs b/store/benches/sqlite_store.rs index d1c4e072d..53ce6f91e 100644 --- a/store/benches/sqlite_store.rs +++ b/store/benches/sqlite_store.rs @@ -1,14 +1,13 @@ use ceramic_core::{EventId, Network}; -use ceramic_store::{ModelStore, SqlitePool}; +use ceramic_store::{SqliteEventStore, SqlitePool}; use cid::Cid; use criterion2::{criterion_group, criterion_main, BatchSize, Criterion}; use multihash::{Code, MultihashDigest}; use rand::RngCore; -use recon::{ReconItem, Sha256a, Store}; -use std::path::PathBuf; +use recon::{ReconItem, Store}; struct ModelSetup { - store: ModelStore, + store: SqliteEventStore, events: Vec<(EventId, Vec)>, } @@ -20,7 +19,7 @@ enum ModelType { fn generate_event_id(data: &[u8]) -> EventId { let cid = Cid::new_v1( 0x55, //RAW - Code::Sha2_256.digest(&data), + Code::Sha2_256.digest(data), ); EventId::new( &Network::Mainnet, @@ -35,7 +34,7 @@ fn generate_event_id(data: &[u8]) -> EventId { const INSERTION_COUNT: usize = 10_000; -async fn model_setup(dir: PathBuf, tpe: ModelType, cnt: usize) -> ModelSetup { +async fn model_setup(tpe: ModelType, cnt: usize) -> ModelSetup { let mut events = Vec::with_capacity(cnt); for _ in 0..cnt { let mut data = match tpe { @@ -49,7 +48,7 @@ async fn model_setup(dir: PathBuf, tpe: ModelType, cnt: usize) -> ModelSetup { rand::thread_rng().fill_bytes(&mut data); let event_id = generate_event_id(&data); let cid = event_id.cid().unwrap(); - let header = iroh_car::CarHeader::V1(iroh_car::CarHeaderV1::from(vec![cid.clone()])); + let header = iroh_car::CarHeader::V1(iroh_car::CarHeaderV1::from(vec![cid])); let writer = tokio::io::BufWriter::new(Vec::with_capacity(1024 * 1024)); let mut writer = iroh_car::CarWriter::new(header, writer); writer.write(cid, data.as_slice()).await.unwrap(); @@ -57,20 +56,17 @@ async fn model_setup(dir: PathBuf, tpe: ModelType, cnt: usize) -> ModelSetup { events.push((event_id, data)); } - let path = dir.join(format!("{}.db", uuid::Uuid::new_v4())); - let pool = SqlitePool::connect(format!("sqlite://{}", path.display())) - .await - .unwrap(); - let store = ModelStore::new(pool).await.unwrap(); + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let store = SqliteEventStore::new(pool).await.unwrap(); ModelSetup { store, events } } async fn model_routine(input: ModelSetup) { let futs = input.events.into_iter().map(|(event_id, data)| { - let mut store = input.store.clone(); + let store = input.store.clone(); async move { let event = ReconItem::new(&event_id, Some(data.as_slice())); - store.insert(event).await + store.insert(&event).await } }); futures::future::join_all(futs).await; @@ -80,15 +76,14 @@ fn small_model_inserts(c: &mut Criterion) { let exec = tokio::runtime::Runtime::new().unwrap(); let dir = exec.block_on(async move { tmpdir::TmpDir::new("ceramic_store").await.unwrap() }); let mut group = c.benchmark_group("small model inserts"); - let path = dir.to_path_buf(); group.bench_function("sqlite store", move |b| { b.to_async(&exec).iter_batched_async_setup( // setup - || async { model_setup(path.clone(), ModelType::Small, INSERTION_COUNT).await }, + || async { model_setup(ModelType::Small, INSERTION_COUNT).await }, // routine |input| async { model_routine(input).await }, // batch size - BatchSize::PerIteration, + BatchSize::SmallInput, ) }); group.finish(); @@ -106,11 +101,11 @@ fn large_model_inserts(c: &mut Criterion) { group.bench_function("sqlite store", |b| { b.to_async(&exec).iter_batched_async_setup( // setup - || async { model_setup(dir.to_path_buf(), ModelType::Large, INSERTION_COUNT).await }, + || async { model_setup(ModelType::Large, INSERTION_COUNT).await }, // routine |input| async { model_routine(input).await }, // batch size - BatchSize::PerIteration, + BatchSize::SmallInput, ) }); group.finish(); diff --git a/store/src/metrics.rs b/store/src/metrics.rs index d8c63a84a..23910062c 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -161,7 +161,7 @@ where async fn insert(&self, key: Interest) -> anyhow::Result { let new = StoreMetricsMiddleware::::record( &self.metrics, - "interest_insert", + "api_interest_insert", self.store.insert(key), ) .await?; @@ -177,7 +177,7 @@ where ) -> anyhow::Result> { StoreMetricsMiddleware::::record( &self.metrics, - "interest_range", + "api_interest_range", self.store.range(start, end, offset, limit), ) .await @@ -189,16 +189,28 @@ impl ceramic_api::AccessModelStore for StoreMetricsMiddleware where S: ceramic_api::AccessModelStore, { - async fn insert(&self, key: EventId, value: Option>) -> anyhow::Result<(bool, bool)> { - let (new_key, new_val) = StoreMetricsMiddleware::::record( + async fn insert_many( + &self, + items: &[(EventId, Option>)], + ) -> anyhow::Result<(Vec, usize)> { + let (new_keys, new_val) = StoreMetricsMiddleware::::record( &self.metrics, - "model_insert", - self.store.insert(key, value), + "api_insert_many", + self.store.insert_many(items), ) .await?; - self.record_key_insert(new_key, new_val); - Ok((new_key, new_val)) + let key_cnt = new_keys.iter().filter(|k| **k).count(); + + self.metrics.record(&InsertEvent { + type_: InsertEventType::Key, + cnt: key_cnt as u64, + }); + self.metrics.record(&InsertEvent { + type_: InsertEventType::Value, + cnt: new_val as u64, + }); + Ok((new_keys, new_val)) } async fn range_with_values( &self, @@ -209,7 +221,7 @@ where ) -> anyhow::Result)>> { StoreMetricsMiddleware::::record( &self.metrics, - "model_range_with_values", + "api_range_with_values", self.store.range_with_values(start, end, offset, limit), ) .await @@ -218,7 +230,7 @@ where async fn value_for_key(&self, key: &EventId) -> anyhow::Result>> { StoreMetricsMiddleware::::record( &self.metrics, - "model_value_for_key", + "api_value_for_key", self.store.value_for_key(key), ) .await @@ -231,7 +243,7 @@ where ) -> anyhow::Result<(i64, Vec)> { StoreMetricsMiddleware::::record( &self.metrics, - "model_keys_since_highwater_mark", + "api_keys_since_highwater_mark", self.store.keys_since_highwater_mark(highwater, limit), ) .await diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 379b9454a..9f5aca353 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -2,13 +2,14 @@ use std::num::TryFromIntError; use ceramic_core::EventId; use cid::Cid; -use iroh_car::{CarHeader, CarWriter}; +use iroh_car::{CarHeader, CarReader, CarWriter}; use itertools::{process_results, Itertools}; use multihash::Multihash; + use sqlx::{sqlite::SqliteRow, Row as _}; +use std::collections::BTreeSet; -use super::BlockRow; -use crate::{Error, Result}; +use crate::{sql::BlockRow, Error, Result}; pub async fn rebuild_car(blocks: Vec) -> Result>> { if blocks.is_empty() { @@ -87,20 +88,129 @@ impl ReconHash { } } -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct EventValueRaw { +use anyhow::anyhow; +use multihash_codetable::{Code, MultihashDigest}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::Type)] +pub struct BlockHash(Multihash<64>); + +impl BlockHash { + pub fn try_from_vec(data: &[u8]) -> Result { + Ok(Self(Multihash::from_bytes(data).map_err(Error::new_app)?)) + } + + pub fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + pub fn inner(&self) -> &Multihash<64> { + &self.0 + } +} + +#[derive(Debug, Clone)] +pub struct EventRaw { + pub order_key: EventId, + pub blocks: Vec, +} + +impl EventRaw { + pub fn new(key: EventId, blocks: Vec) -> Self { + Self { + order_key: key, + blocks, + } + } + + pub async fn try_build(key: EventId, val: &[u8]) -> Result { + let mut reader = CarReader::new(val) + .await + .map_err(|e| Error::new_app(anyhow!(e)))?; + let roots: BTreeSet = reader.header().roots().iter().cloned().collect(); + let mut idx = 0; + let mut blocks = vec![]; + while let Some((cid, data)) = reader.next_block().await.map_err(Error::new_app)? { + let ebr = EventBlockRaw::try_new(&key, idx, roots.contains(&cid), cid, data) + .map_err(Error::from)?; + blocks.push(ebr); + idx += 1; + } + Ok(Self::new(key, blocks)) + } +} + +#[derive(Debug, Clone)] +pub struct EventBlockRaw { pub order_key: Vec, pub codec: i64, pub root: bool, pub idx: i32, - pub multihash: Vec, + pub multihash: BlockHash, pub bytes: Vec, } -impl EventValueRaw { +impl sqlx::FromRow<'_, SqliteRow> for EventBlockRaw { + fn from_row(row: &SqliteRow) -> std::result::Result { + let multihash: Vec = row.try_get("multihash")?; + let multihash = + BlockHash::try_from_vec(&multihash).map_err(|e| sqlx::Error::Decode(Box::new(e)))?; + Ok(Self { + order_key: row.try_get("order_key")?, + codec: row.try_get("codec")?, + root: row.try_get("root")?, + idx: row.try_get("idx")?, + multihash, + bytes: row.try_get("bytes")?, + }) + } +} + +impl EventBlockRaw { + pub fn try_new(key: &EventId, idx: i32, root: bool, cid: Cid, bytes: Vec) -> Result { + let multihash = match cid.hash().code() { + 0x12 => Code::Sha2_256.digest(&bytes), + 0x1b => Code::Keccak256.digest(&bytes), + 0x11 => return Err(Error::new_app(anyhow!("Sha1 not supported"))), + code => { + return Err(Error::new_app(anyhow!( + "multihash type {:#x} not Sha2_256, Keccak256", + code, + ))) + } + }; + + if cid.hash().to_bytes() != multihash.to_bytes() { + return Err(Error::new_app(anyhow!( + "cid did not match blob {} != {}", + hex::encode(cid.hash().to_bytes()), + hex::encode(multihash.to_bytes()) + ))); + } + + let codec: i64 = cid.codec().try_into().map_err(|e: TryFromIntError| { + Error::new_app(anyhow!(e).context(format!( + "Invalid codec could not fit into an i64: {}", + cid.codec() + ))) + })?; + let order_key = key + .cid() + .ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))? + .to_bytes(); + + Ok(Self { + order_key, + codec, + root, + idx, + multihash: BlockHash(multihash), + bytes, + }) + } + pub fn into_block_row(self) -> Result<(EventId, BlockRow)> { let id = EventId::try_from(self.order_key).map_err(Error::new_app)?; - let hash = Multihash::from_bytes(&self.multihash[..]).map_err(Error::new_app)?; + let hash = self.multihash.inner().to_owned(); let code = self.codec.try_into().map_err(|e: TryFromIntError| { let er = anyhow::anyhow!(e).context(format!("Invalid codec: {}", self.codec)); Error::new_app(er) diff --git a/store/src/sql/entities/mod.rs b/store/src/sql/entities/mod.rs index d230759ed..12f0fbea4 100644 --- a/store/src/sql/entities/mod.rs +++ b/store/src/sql/entities/mod.rs @@ -3,7 +3,9 @@ mod event; mod query; pub use block::{BlockBytes, BlockRow}; -pub use event::{rebuild_car, CountRow, DeliveredEvent, EventValueRaw, OrderKey, ReconHash}; +pub use event::{ + rebuild_car, CountRow, DeliveredEvent, EventBlockRaw, EventRaw, OrderKey, ReconHash, +}; pub use query::{BlockQuery, EventBlockQuery, EventQuery, ReconQuery, ReconType, SqlBackend}; #[derive(Debug, Clone, sqlx::FromRow)] diff --git a/store/src/sql/entities/query.rs b/store/src/sql/entities/query.rs index 7863cd176..5fc6a860c 100644 --- a/store/src/sql/entities/query.rs +++ b/store/src/sql/entities/query.rs @@ -23,7 +23,7 @@ impl BlockQuery { pub struct EventQuery; impl EventQuery { - /// Requires binding 1 parameter. Finds the `EventValueRaw` values needed to rebuild the event + /// Requires binding 1 parameter. Finds the `EventBlockRaw` values needed to rebuild the event pub fn value_blocks_one() -> &'static str { r#"SELECT e.order_key, eb.codec, eb.root, b.multihash, b.bytes @@ -34,7 +34,7 @@ impl EventQuery { ORDER BY eb.idx;"# } - /// Requires binding 4 parameters. Finds the `EventValueRaw` values needed to rebuild the event + /// Requires binding 4 parameters. Finds the `EventBlockRaw` values needed to rebuild the event pub fn value_blocks_many() -> &'static str { r#"SELECT key.order_key, eb.codec, eb.root, eb.idx, b.multihash, b.bytes diff --git a/store/src/sql/event.rs b/store/src/sql/event.rs index b03d8e897..de53b2f6d 100644 --- a/store/src/sql/event.rs +++ b/store/src/sql/event.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeSet, num::TryFromIntError}; +use std::num::TryFromIntError; use anyhow::anyhow; use async_trait::async_trait; @@ -7,8 +7,7 @@ use ceramic_core::{event_id::InvalidEventId, EventId, RangeOpen}; use cid::Cid; use iroh_bitswap::Block; -use iroh_car::CarReader; -use multihash_codetable::{Code, Multihash, MultihashDigest}; +use multihash_codetable::Multihash; use recon::{ AssociativeHash, Error as ReconError, HashCount, InsertResult, Key, ReconItem, Result as ReconResult, Sha256a, @@ -19,8 +18,8 @@ use tracing::instrument; use crate::{ sql::{ rebuild_car, BlockBytes, BlockQuery, BlockRow, CountRow, DeliveredEvent, EventBlockQuery, - EventQuery, EventValueRaw, FirstAndLast, OrderKey, ReconHash, ReconQuery, ReconType, - SqlBackend, GLOBAL_COUNTER, + EventBlockRaw, EventQuery, EventRaw, FirstAndLast, OrderKey, ReconHash, ReconQuery, + ReconType, SqlBackend, GLOBAL_COUNTER, }, DbTxSqlite, Error, Result, SqlitePool, }; @@ -60,7 +59,7 @@ impl SqliteEventStore { /// Begin a database transaction. pub async fn begin_tx(&self) -> Result> { - self.pool.tx().await + Ok(self.pool.writer().begin().await?) } /// Commit the database transaction. @@ -68,13 +67,6 @@ impl SqliteEventStore { Ok(tx.commit().await?) } - async fn insert_item(&self, item: &ReconItem<'_, EventId>) -> Result<(bool, bool)> { - let mut tx = self.pool.writer().begin().await?; - let (new_key, new_val) = self.insert_item_int(item, &mut tx).await?; - tx.commit().await?; - Ok((new_key, new_val)) - } - async fn range_with_values_int( &self, left_fencepost: &EventId, @@ -84,7 +76,7 @@ impl SqliteEventStore { ) -> Result)> + Send + 'static>> { let offset = offset.try_into().unwrap_or(i64::MAX); let limit: i64 = limit.try_into().unwrap_or(i64::MAX); - let all_blocks: Vec = sqlx::query_as(EventQuery::value_blocks_many()) + let all_blocks: Vec = sqlx::query_as(EventQuery::value_blocks_many()) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) .bind(limit) @@ -92,7 +84,7 @@ impl SqliteEventStore { .fetch_all(self.pool.reader()) .await?; - let values = EventValueRaw::into_carfiles(all_blocks).await?; + let values = EventBlockRaw::into_carfiles(all_blocks).await?; Ok(Box::new(values.into_iter())) } @@ -127,47 +119,59 @@ impl SqliteEventStore { rebuild_car(blocks).await } - /// returns (new_key, new_val) tuple - async fn insert_item_int( - &self, - item: &ReconItem<'_, EventId>, - conn: &mut DbTxSqlite<'_>, - ) -> Result<(bool, bool)> { - // We make sure the key exists as we require it as an FK to add the event_block record. - let new_key = self.insert_key_int(item.key, conn).await?; - - if let Some(val) = item.value { - // Put each block from the car file. Should we check if value already existed and skip this? - // It will no-op but will still try to insert the blocks again - let mut reader = CarReader::new(val) - .await - .map_err(|e| Error::new_app(anyhow!(e)))?; - let roots: BTreeSet = reader.header().roots().iter().cloned().collect(); - let mut idx = 0; - while let Some((cid, data)) = reader - .next_block() - .await - .map_err(|e| Error::new_app(anyhow!(e)))? - { - self.insert_event_block_int( - item.key, - idx, - roots.contains(&cid), - cid, - &data.into(), - conn, - ) - .await?; - idx += 1; + async fn insert_item_int(&self, item: ReconItem<'_, EventId>) -> Result<(bool, bool)> { + let new_val = item.value.is_some(); + let res = self.insert_items_int(&[item]).await?; + let new_key = res.keys.first().cloned().unwrap_or(false); + Ok((new_key, new_val)) + } + + /// Insert many items into the store (internal to the store) + async fn insert_items_int(&self, items: &[ReconItem<'_, EventId>]) -> Result { + if items.is_empty() { + return Ok(InsertResult::new(vec![], 0)); + } + let mut to_add = vec![]; + for item in items { + if let Some(val) = item.value { + match EventRaw::try_build(item.key.to_owned(), val).await { + Ok(parsed) => to_add.push(parsed), + Err(error) => { + tracing::warn!(%error, order_key=%item.key, "Error parsing event into carfile"); + continue; + } + } + } else { + to_add.push(EventRaw::new(item.key.clone(), vec![])); + } + } + if to_add.is_empty() { + return Ok(InsertResult::new(vec![], 0)); + } + let mut new_keys = vec![false; to_add.len()]; + let mut new_val_cnt = 0; + let mut tx = self.pool.writer().begin().await.map_err(Error::from)?; + + for (idx, item) in to_add.into_iter().enumerate() { + let new_key = self.insert_key_int(&item.order_key, &mut tx).await?; + for block in item.blocks.iter() { + self.insert_event_block_int(block, &mut tx).await?; + self.mark_ready_to_deliver(&item.order_key, &mut tx).await?; + } + new_keys[idx] = new_key; + if !item.blocks.is_empty() { + new_val_cnt += 1; } - self.mark_ready_to_deliver(item.key, conn).await?; } - Ok((new_key, item.value.is_some())) + tx.commit().await.map_err(Error::from)?; + let res = InsertResult::new(new_keys, new_val_cnt); + + Ok(res) } /// Add a block, returns true if the block is new pub async fn put_block(&self, hash: &Multihash, blob: &Bytes) -> Result { - let mut tx = self.pool.tx().await?; + let mut tx = self.pool.writer().begin().await?; let res = self.put_block_tx(hash, blob, &mut tx).await?; tx.commit().await?; Ok(res) @@ -177,12 +181,12 @@ impl SqliteEventStore { pub async fn put_block_tx( &self, hash: &Multihash, - blob: &Bytes, + blob: &[u8], conn: &mut DbTxSqlite<'_>, ) -> Result { let resp = sqlx::query(BlockQuery::put()) .bind(hash.to_bytes()) - .bind(blob.to_vec()) + .bind(blob) .execute(&mut **conn) .await; @@ -202,51 +206,19 @@ impl SqliteEventStore { // store a block in the db. async fn insert_event_block_int( &self, - key: &EventId, - idx: i32, - root: bool, - cid: Cid, - blob: &Bytes, + ev_block: &EventBlockRaw, conn: &mut DbTxSqlite<'_>, ) -> Result<()> { - let hash = match cid.hash().code() { - 0x12 => Code::Sha2_256.digest(blob), - 0x1b => Code::Keccak256.digest(blob), - 0x11 => return Err(Error::new_app(anyhow!("Sha1 not supported"))), - code => { - return Err(Error::new_app(anyhow!( - "multihash type {:#x} not Sha2_256, Keccak256", - code, - ))) - } - }; - if cid.hash().to_bytes() != hash.to_bytes() { - return Err(Error::new_app(anyhow!( - "cid did not match blob {} != {}", - hex::encode(cid.hash().to_bytes()), - hex::encode(hash.to_bytes()) - ))); - } - - let _new = self.put_block_tx(&hash, blob, conn).await?; + let _new = self + .put_block_tx(ev_block.multihash.inner(), &ev_block.bytes, conn) + .await?; - let code: i64 = cid.codec().try_into().map_err(|e: TryFromIntError| { - Error::new_app(anyhow!(e).context(format!( - "Invalid codec could not fit into an i64: {}", - cid.codec() - ))) - })?; - let id = key - .cid() - .ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))? - .to_bytes(); - let multihash = hash.to_bytes(); sqlx::query(EventBlockQuery::upsert()) - .bind(id) - .bind(idx) - .bind(root) - .bind(multihash) - .bind(code) + .bind(&ev_block.order_key) + .bind(ev_block.idx) + .bind(ev_block.root) + .bind(ev_block.multihash.to_bytes()) + .bind(ev_block.codec) .execute(&mut **conn) .await?; Ok(()) @@ -347,8 +319,8 @@ impl recon::Store for SqliteEventStore { /// Returns true if the key was new. The value is always updated if included async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { - let (new, _new_val) = self.insert_item(item).await?; - Ok(new) + let (res, _new_val) = self.insert_item_int(item.to_owned()).await?; + Ok(res) } /// Insert new keys into the key space. @@ -357,19 +329,8 @@ impl recon::Store for SqliteEventStore { match items.len() { 0 => Ok(InsertResult::new(vec![], 0)), _ => { - let mut results = vec![false; items.len()]; - let mut new_val_cnt = 0; - let mut tx = self.pool.writer().begin().await.map_err(Error::from)?; - - for (idx, item) in items.iter().enumerate() { - let (new_key, new_val) = self.insert_item_int(item, &mut tx).await?; - results[idx] = new_key; - if new_val { - new_val_cnt += 1; - } - } - tx.commit().await.map_err(Error::from)?; - Ok(InsertResult::new(results, new_val_cnt)) + let res = self.insert_items_int(items).await?; + Ok(res) } } } @@ -564,10 +525,16 @@ impl iroh_bitswap::Store for SqliteEventStore { /// This guarantees that regardless of entry point (api or recon), the data is stored and retrieved in the same way. #[async_trait::async_trait] impl ceramic_api::AccessModelStore for SqliteEventStore { - async fn insert(&self, key: EventId, value: Option>) -> anyhow::Result<(bool, bool)> { - Ok(self - .insert_item(&ReconItem::new(&key, value.as_deref())) - .await?) + async fn insert_many( + &self, + items: &[(EventId, Option>)], + ) -> anyhow::Result<(Vec, usize)> { + let items = items + .iter() + .map(|(key, value)| ReconItem::new(key, value.as_deref())) + .collect::>>(); + let res = self.insert_items_int(&items).await?; + Ok((res.keys, res.value_count)) } async fn range_with_values( diff --git a/store/src/sql/sqlite.rs b/store/src/sql/sqlite.rs index 1eaebce00..eea3d99fa 100644 --- a/store/src/sql/sqlite.rs +++ b/store/src/sql/sqlite.rs @@ -68,12 +68,6 @@ impl SqlitePool { &self.writer } - /// Get a writer tranaction. The writer pool has only one connection so this is an exclusive lock. - /// Use this method to perform simultaneous writes to the database, calling `commit` when you are done. - pub async fn tx(&self) -> Result { - Ok(self.writer.begin().await?) - } - /// Get a reference to the reader database pool. The reader pool has many connections. pub fn reader(&self) -> &sqlx::SqlitePool { &self.reader diff --git a/store/src/tests/event.rs b/store/src/tests/event.rs index df91737be..e5df11b87 100644 --- a/store/src/tests/event.rs +++ b/store/src/tests/event.rs @@ -640,7 +640,7 @@ async fn prep_highwater_tests(store: &dyn AccessModelStore) -> (EventId, EventId let (_blocks, store_value) = build_car_file(x).await; assert_eq!(_blocks.len(), x); store - .insert(key.to_owned(), Some(store_value)) + .insert_many(&[(key.to_owned(), Some(store_value))]) .await .unwrap(); }