Skip to content

Commit

Permalink
feat: Modify recon storage tables, trait and sqlite config to improve…
Browse files Browse the repository at this point in the history
… throughput (#243)

* feat: split recon values into their own table

* feat: Remove recon add value operation and combine it with insert

insert and insert_many have been modified to perform better in sqlite.
We grab a transaction and do all our writes, quitting early if we've done anything before.
This has appeared to about 2x throughput from the shared pool and multiple write operations we had before

* feat: adjust sqlite config options

synchronous = normal is typically sufficient in WAL journal mode, and demonstrates a 2x increase in writes in my benchmarks

* fix: only return true from insert when key is updated

* fix: update recon test expectations (and typo)

we need to figure out how to make these deterministic. currently they exact order of exchange depends on the speed at which the other side takes certain actions. speeding up the inserts appears to have caused a resp before a second req in some cases.

---------

Co-authored-by: David Estes <[email protected]>
  • Loading branch information
dav1do and dav1do authored Jan 24, 2024
1 parent bc3ccea commit 1958656
Show file tree
Hide file tree
Showing 12 changed files with 429 additions and 249 deletions.
131 changes: 67 additions & 64 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub trait Recon: Clone + Send + Sync {
type Key: Key;
type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>;

async fn insert(&self, key: Self::Key) -> Result<()>;
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()>;
async fn range(
&self,
start: Self::Key,
Expand All @@ -47,7 +47,6 @@ pub trait Recon: Clone + Send + Sync {
) -> Result<Vec<Self::Key>>;

async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>>;
async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()>;
}

#[async_trait]
Expand All @@ -59,8 +58,8 @@ where
type Key = K;
type Hash = H;

async fn insert(&self, key: Self::Key) -> Result<()> {
let _ = recon::Client::insert(self, key).await?;
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
let _ = recon::Client::insert(self, key, value).await?;
Ok(())
}

Expand All @@ -78,9 +77,6 @@ where
async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>> {
recon::Client::value_for_key(self, key).await
}
async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()> {
recon::Client::store_value_for_key(self, key, value).await
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -146,13 +142,10 @@ where
let event_id = decode_event_id(&event.event_id)?;
let event_data = decode_event_data(&event.event_data)?;
self.model
.insert(event_id.clone())
.await
.map_err(|err| ApiError(format!("failed to insert key: {err}")))?;
self.model
.store_value_for_key(event_id, &event_data)
.insert(event_id.clone(), Some(event_data))
.await
.map_err(|err| ApiError(format!("failed to insert key: {err}")))?;

Ok(EventsPostResponse::Success)
}

Expand Down Expand Up @@ -240,7 +233,7 @@ where
.with_not_after(0)
.build();
self.interest
.insert(interest)
.insert(interest, None)
.await
.map_err(|err| ApiError(format!("failed to update interest: {err}")))?;

Expand Down Expand Up @@ -297,7 +290,7 @@ mod tests {
struct Context;
mock! {
pub ReconInterestTest {
fn insert(&self, key: Interest) -> Result<()>;
fn insert(&self, key: Interest, value: Option<Vec<u8>>) -> Result<()>;
fn range(
&self,
start: Interest,
Expand All @@ -316,8 +309,8 @@ mod tests {
impl Recon for MockReconInterestTest {
type Key = Interest;
type Hash = Sha256a;
async fn insert(&self, key: Self::Key) -> Result<()> {
self.insert(key)
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
self.insert(key, value)
}
async fn range(
&self,
Expand All @@ -331,14 +324,11 @@ mod tests {
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
}
async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> {
Ok(())
}
}

mock! {
pub ReconModelTest {
fn insert(&self, key: EventId) -> Result<()>;
fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<()>;
fn range(
&self,
start: EventId,
Expand All @@ -356,8 +346,8 @@ mod tests {
impl Recon for MockReconModelTest {
type Key = EventId;
type Hash = Sha256a;
async fn insert(&self, key: Self::Key) -> Result<()> {
self.insert(key)
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
self.insert(key, value)
}
async fn range(
&self,
Expand All @@ -371,9 +361,6 @@ mod tests {
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
}
async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> {
Ok(())
}
}

#[tokio::test]
Expand All @@ -391,19 +378,23 @@ mod tests {
&Cid::from_str("baejbeicqtpe5si4qvbffs2s7vtbk5ccbsfg6owmpidfj3zeluqz4hlnz6m").unwrap(), // cspell:disable-line
);
let event_id_str = multibase::encode(Base::Base16Lower, event_id.to_bytes());
let event_data = "f".to_string();
let mock_interest = MockReconInterestTest::new();
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_insert()
.with(predicate::eq(event_id))
.with(
predicate::eq(event_id),
predicate::eq(Some(decode_event_data(event_data.as_str()).unwrap())),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let server = Server::new(peer_id, network, mock_interest, mock_model);
let resp = server
.events_post(
models::Event {
event_id: event_id_str,
event_data: "f".to_string(),
event_data,
},
&Context,
)
Expand Down Expand Up @@ -461,16 +452,19 @@ mod tests {
let mut mock_interest = MockReconInterestTest::new();
mock_interest
.expect_insert()
.with(predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
))
.with(
predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
),
predicate::eq(None),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
Expand Down Expand Up @@ -524,16 +518,19 @@ mod tests {
let mut mock_interest = MockReconInterestTest::new();
mock_interest
.expect_insert()
.with(predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
))
.with(
predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
),
predicate::eq(None),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
Expand Down Expand Up @@ -587,16 +584,19 @@ mod tests {
let mut mock_interest = MockReconInterestTest::new();
mock_interest
.expect_insert()
.with(predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
))
.with(
predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
),
predicate::eq(None),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
Expand Down Expand Up @@ -650,16 +650,19 @@ mod tests {
let mut mock_interest = MockReconInterestTest::new();
mock_interest
.expect_insert()
.with(predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
))
.with(
predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
),
predicate::eq(None),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use jws::{Jws, JwsSignature};
pub use network::Network;
pub use range::RangeOpen;
pub use signer::{JwkSigner, Signer};
pub use sql::SqlitePool;
pub use sql::{DbTx, SqlitePool};
pub use stream_id::{StreamId, StreamIdType};

pub use cid::Cid;
Expand Down
19 changes: 18 additions & 1 deletion core/src/sql.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use std::{path::Path, str::FromStr};

use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
Sqlite, Transaction,
};

/// A trivial wrapper around a sqlx Sqlite database transaction
pub type DbTx<'a> = Transaction<'a, Sqlite>;

#[derive(Clone, Debug)]
/// The sqlite pool is split into a writer and a reader pool.
Expand All @@ -19,16 +25,20 @@ impl SqlitePool {
// A few ideas: number of RO connections, synchronize = NORMAL, mmap_size, temp_store = memory
let conn_opts = SqliteConnectOptions::from_str(&db_path)?
.journal_mode(SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.create_if_missing(true)
.optimize_on_close(true, None);

let ro_opts = conn_opts.clone().read_only(true);

let writer = SqlitePoolOptions::new()
.min_connections(1)
.max_connections(1)
.acquire_timeout(std::time::Duration::from_secs(1))
.connect_with(conn_opts)
.await?;
let reader = SqlitePoolOptions::new()
.min_connections(1)
.max_connections(8)
.connect_with(ro_opts)
.await?;
Expand All @@ -37,10 +47,17 @@ impl SqlitePool {
}

/// Get a reference to the writer database pool. The writer pool has only one connection.
/// If you are going to do multiple writes in a row, instead use `tx` and `commit`.
pub fn writer(&self) -> &sqlx::SqlitePool {
&self.writer
}

/// Get a writer tranaction. The writer pool has only one connection so this is an exclusive lock.
/// Use this method to perform simultaneous writes to the database, calling `commit` when you are done.
pub async fn tx(&self) -> anyhow::Result<DbTx> {
Ok(self.writer.begin().await?)
}

/// Get a reference to the reader database pool. The reader pool has many connections.
pub fn reader(&self) -> &sqlx::SqlitePool {
&self.reader
Expand Down
6 changes: 2 additions & 4 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ mod tests {
type Key = K;
type Hash = Sha256a;

async fn insert(&self, _key: Self::Key) -> Result<()> {
async fn insert(&self, _key: Self::Key, _value: Option<Vec<u8>>) -> Result<()> {
unreachable!()
}

Expand All @@ -1268,9 +1268,7 @@ mod tests {
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
}
async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> {
Ok(())
}

async fn interests(&self) -> Result<Vec<RangeOpen<Self::Key>>> {
unreachable!()
}
Expand Down
Loading

0 comments on commit 1958656

Please sign in to comment.