Skip to content

Commit

Permalink
fix: changes to improve sync and ingest performance (#324)
Browse files Browse the repository at this point in the history
* fix: flush sink messages more aggressively

* chore: fix benchmark

* fix: parse event value carfile before taking tx lock

* chore: rename api store metrics and remove unnecessary fn

* fix: use send_all instead of send when returning ranges

* fix: add task to api server than batch inserts new events

reduces lock contention and seems to be about an order of magnitude faster sql requests (overall throughput doesn't change dramatically on my machine)

* chore: remove sample code that shouldn't have been committed

* chore: PR review feedback

use constants for timeout values and remove traced_test dependency from api

* chore: PR feedback

fix select! branch that's only an if statement and use traced_test
  • Loading branch information
dav1do authored Apr 30, 2024
1 parent dadeea0 commit 6611fb5
Show file tree
Hide file tree
Showing 13 changed files with 405 additions and 229 deletions.
136 changes: 126 additions & 10 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![allow(unused_imports)]

use std::time::Duration;
use std::{future::Future, ops::Range};
use std::{marker::PhantomData, ops::RangeBounds};
use std::{net::SocketAddr, ops::Bound};
Expand Down Expand Up @@ -44,6 +45,20 @@ use tikv_jemalloc_ctl::{epoch, stats};

use crate::ResumeToken;

/// When the incoming events queue has at least this many items, we'll store them.
/// This imples when we're getting writes faster than the flush interval.
const EVENT_INSERT_QUEUE_SIZE: usize = 3;
/// How often we should flush the queue of events to the store. This applies when we have fewer than `EVENT_INSERT_QUEUE_SIZE` events,
/// in order to avoid stalling a single write from being processed for too long, while still reducing contention when we have a lot of writes.
/// This is quite low, but in my benchmarking adding a longer interval just slowed ingest down, without changing contention noticeably.
const FLUSH_INTERVAL_MS: u64 = 10;

/// How long are we willing to wait to enqueue an insert to the database service loop before we tell the call it was full.
const INSERT_ENQUEUE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
/// How long are we willing to wait for the database service to respond to an insert request before we tell the caller it was too slow.
/// Aborting and returning an error doesn't mean that the write won't be processed, only that the caller will get an error indicating it timed out.
const INSERT_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

// Helper to build responses consistent as we can't implement for the api_server::models directly
pub struct BuildResponse {}
impl BuildResponse {
Expand Down Expand Up @@ -149,7 +164,8 @@ impl<S: AccessInterestStore> AccessInterestStore for Arc<S> {
#[async_trait]
pub trait AccessModelStore: Send + Sync {
/// Returns (new_key, new_value) where true if was newly inserted, false if it already existed.
async fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<(bool, bool)>;
async fn insert_many(&self, items: &[(EventId, Option<Vec<u8>>)])
-> Result<(Vec<bool>, usize)>;
async fn range_with_values(
&self,
start: &EventId,
Expand All @@ -171,8 +187,11 @@ pub trait AccessModelStore: Send + Sync {

#[async_trait::async_trait]
impl<S: AccessModelStore> AccessModelStore for Arc<S> {
async fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<(bool, bool)> {
self.as_ref().insert(key, value).await
async fn insert_many(
&self,
items: &[(EventId, Option<Vec<u8>>)],
) -> Result<(Vec<bool>, usize)> {
self.as_ref().insert_many(items).await
}

async fn range_with_values(
Expand Down Expand Up @@ -202,30 +221,109 @@ impl<S: AccessModelStore> AccessModelStore for Arc<S> {
}
}

struct EventInsert {
id: EventId,
data: Vec<u8>,
tx: tokio::sync::oneshot::Sender<Result<bool>>,
}

struct InsertTask {
_handle: tokio::task::JoinHandle<()>,
tx: tokio::sync::mpsc::Sender<EventInsert>,
}

#[derive(Clone)]
pub struct Server<C, I, M> {
peer_id: PeerId,
network: Network,
interest: I,
model: M,
model: Arc<M>,
// If we need to restart this ever, we'll need a mutex. For now we want to avoid locking the channel
// so we just keep track to gracefully shutdown, but if the task dies, the server is in a fatal error state.
insert_task: Arc<InsertTask>,
marker: PhantomData<C>,
}

impl<C, I, M> Server<C, I, M>
where
I: AccessInterestStore,
M: AccessModelStore,
M: AccessModelStore + 'static,
{
pub fn new(peer_id: PeerId, network: Network, interest: I, model: M) -> Self {
pub fn new(peer_id: PeerId, network: Network, interest: I, model: Arc<M>) -> Self {
let (tx, event_rx) = tokio::sync::mpsc::channel::<EventInsert>(1024);
let event_store = model.clone();

let handle = Self::start_insert_task(event_store, event_rx);
let insert_task = Arc::new(InsertTask {
_handle: handle,
tx,
});
Server {
peer_id,
network,
interest,
model,
insert_task,
marker: PhantomData,
}
}

fn start_insert_task(
event_store: Arc<M>,
mut event_rx: tokio::sync::mpsc::Receiver<EventInsert>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS));
let mut events = vec![];
// could bias towards processing the queue of events over accepting more, but we'll
// rely on the channel depth for backpressure. the goal is to keep the queue close to empty
// without processing one at a time. when we stop parsing the carfile in the store
// i.e. validate before sending here and this is just an insert, we may want to process more at once.
loop {
tokio::select! {
_ = interval.tick(), if !events.is_empty() => {
Self::process_events(&mut events, &event_store).await;
}
Some(req) = event_rx.recv() => {
events.push(req);
}
}
// make sure the events queue doesn't get too deep when we're under heavy load
if events.len() >= EVENT_INSERT_QUEUE_SIZE {
Self::process_events(&mut events, &event_store).await;
}
}
})
}

async fn process_events(events: &mut Vec<EventInsert>, event_store: &Arc<M>) {
let mut oneshots = Vec::with_capacity(events.len());
let mut items = Vec::with_capacity(events.len());
events.drain(..).for_each(|req: EventInsert| {
oneshots.push(req.tx);
items.push((req.id, Some(req.data)));
});
tracing::trace!("calling insert many with {} items.", items.len());
match event_store.insert_many(&items).await {
Ok((results, _)) => {
tracing::debug!("insert many returned {} results.", results.len());
for (tx, result) in oneshots.into_iter().zip(results.into_iter()) {
if let Err(e) = tx.send(Ok(result)) {
tracing::warn!("failed to send success response to api listener: {:?}", e);
}
}
}
Err(e) => {
tracing::warn!("failed to insert events: {e}");
for tx in oneshots.into_iter() {
if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}"))) {
tracing::warn!("failed to send failed response to api listener: {:?}", e);
}
}
}
};
}

pub async fn get_event_feed(
&self,
resume_at: Option<String>,
Expand Down Expand Up @@ -298,10 +396,28 @@ where
pub async fn post_events(&self, event: Event) -> Result<EventsPostResponse, ErrorResponse> {
let event_id = decode_event_id(&event.id)?;
let event_data = decode_event_data(&event.data)?;
self.model
.insert(event_id, Some(event_data))
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::time::timeout(
INSERT_ENQUEUE_TIMEOUT,
self.insert_task.tx.send(EventInsert {
id: event_id,
data: event_data,
tx,
}),
)
.map_err(|_| {
ErrorResponse::new("Database service queue is too full to accept requests".to_owned())
})
.await?
.map_err(|_| ErrorResponse::new("Database service not available".to_owned()))?;

let _new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx)
.await
.map_err(|err| ErrorResponse::new(format!("failed to insert key: {err}")))?;
.map_err(|_| {
ErrorResponse::new("Timeout waiting for database service response".to_owned())
})?
.map_err(|_| ErrorResponse::new("No response. Database service crashed".to_owned()))?
.map_err(|e| ErrorResponse::new(format!("Failed to insert event: {e}")))?;

Ok(EventsPostResponse::Success)
}
Expand Down Expand Up @@ -432,7 +548,7 @@ impl<C, I, M> Api<C> for Server<C, I, M>
where
C: Send + Sync,
I: AccessInterestStore + Sync,
M: AccessModelStore + Sync,
M: AccessModelStore + Sync + 'static,
{
#[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))]
async fn liveness_get(
Expand Down
35 changes: 19 additions & 16 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use mockall::{mock, predicate};
use multibase::Base;
use recon::Key;
use std::str::FromStr;
use std::sync::Arc;
use tracing_test::traced_test;

struct Context;
Expand Down Expand Up @@ -54,7 +55,7 @@ impl AccessInterestStore for MockReconInterestTest {
}
mock! {
pub ReconModelTest {
fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<(bool, bool)>;
fn insert_many(&self, items: &[(EventId, Option<Vec<u8>>)]) -> Result<(Vec<bool>, usize)>;
fn range_with_values(
&self,
start: &EventId,
Expand All @@ -70,8 +71,11 @@ mock! {
}
#[async_trait]
impl AccessModelStore for MockReconModelTest {
async fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<(bool, bool)> {
self.insert(key, value)
async fn insert_many(
&self,
items: &[(EventId, Option<Vec<u8>>)],
) -> Result<(Vec<bool>, usize)> {
self.insert_many(items)
}
async fn range_with_values(
&self,
Expand Down Expand Up @@ -111,15 +115,14 @@ async fn create_event() {
let event_data = "f".to_string();
let mock_interest = MockReconInterestTest::new();
let mut mock_model = MockReconModelTest::new();
let event_data_arg = Some(decode_event_data(event_data.as_str()).unwrap());
let args = vec![(event_id.clone(), event_data_arg)];
mock_model
.expect_insert()
.with(
predicate::eq(event_id),
predicate::eq(Some(decode_event_data(event_data.as_str()).unwrap())),
)
.expect_insert_many()
.with(predicate::eq(args))
.times(1)
.returning(|_, _| Ok((true, true)));
let server = Server::new(peer_id, network, mock_interest, mock_model);
.returning(|_| Ok((vec![true], 1)));
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model));
let resp = server
.events_post(
models::Event {
Expand Down Expand Up @@ -173,7 +176,7 @@ async fn register_interest_sort_value() {
.times(1)
.returning(|_, _| Ok(true));
let mock_model = MockReconModelTest::new();
let server = Server::new(peer_id, network, mock_interest, mock_model);
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model));
let interest = models::Interest {
sep: "model".to_string(),
sep_value: model.to_owned(),
Expand All @@ -195,7 +198,7 @@ async fn register_interest_sort_value_bad_request() {
// Setup mock expectations
let mock_interest = MockReconInterestTest::new();
let mock_model = MockReconModelTest::new();
let server = Server::new(peer_id, network, mock_interest, mock_model);
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model));
let interest = models::Interest {
sep: "model".to_string(),
sep_value: model.to_owned(),
Expand Down Expand Up @@ -246,7 +249,7 @@ async fn register_interest_sort_value_controller() {
.times(1)
.returning(|_, _| Ok(true));
let mock_model = MockReconModelTest::new();
let server = Server::new(peer_id, network, mock_interest, mock_model);
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model));
let resp = server
.interests_sort_key_sort_value_post(
"model".to_string(),
Expand Down Expand Up @@ -301,7 +304,7 @@ async fn register_interest_value_controller_stream() {
.times(1)
.returning(|_, _| Ok(true));
let mock_model = MockReconModelTest::new();
let server = Server::new(peer_id, network, mock_interest, mock_model);
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model));
let resp = server
.interests_sort_key_sort_value_post(
"model".to_string(),
Expand Down Expand Up @@ -355,7 +358,7 @@ async fn get_events_for_interest_range() {
)
.times(1)
.returning(|s, _, _, _| Ok(vec![(s.clone(), vec![])]));
let server = Server::new(peer_id, network, mock_interest, mock_model);
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model));
let resp = server
.events_sort_key_sort_value_get(
"model".to_string(),
Expand Down Expand Up @@ -401,7 +404,7 @@ async fn test_events_event_id_get_success() {
.times(1)
.returning(move |_| Ok(Some(event_data.clone())));
let mock_interest = MockReconInterestTest::new();
let server = Server::new(peer_id, network, mock_interest, mock_model);
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_model));
let result = server.events_event_id_get(event_id_str, &Context).await;
let EventsEventIdGetResponse::Success(event) = result.unwrap() else {
panic!("Expected EventsEventIdGetResponse::Success but got another variant");
Expand Down
4 changes: 2 additions & 2 deletions one/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ async fn migrate_from_filesystem(input_ipfs_path: PathBuf, store: SqliteEventSto
);
}

let result = store.put_block_tx(cid.hash(), &blob.into(), &mut tx).await;
let result = store.put_block_tx(cid.hash(), &blob, &mut tx).await;
if result.is_err() {
info!(
"{} err: {} {:?}",
Expand Down Expand Up @@ -547,7 +547,7 @@ mod tests {
// Create the CID and store the block.
let hash = Code::Sha2_256.digest(block.as_slice());
block_store
.put_block_tx(&hash, &block.into(), &mut tx)
.put_block_tx(&hash, &block, &mut tx)
.await
.unwrap();
}
Expand Down
8 changes: 6 additions & 2 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,12 @@ impl Daemon {

// Build HTTP server
let network = network.clone();
let ceramic_server =
ceramic_api::Server::new(peer_id, network, interest_api_store, model_api_store);
let ceramic_server = ceramic_api::Server::new(
peer_id,
network,
interest_api_store,
Arc::new(model_api_store),
);
let ceramic_metrics = MetricsHandle::register(ceramic_api::Metrics::register);
// Wrap server in metrics middleware
let ceramic_server = ceramic_api::MetricsMiddleware::new(ceramic_server, ceramic_metrics);
Expand Down
Loading

0 comments on commit 6611fb5

Please sign in to comment.