Skip to content

Commit

Permalink
feat: synchronize value for synchronized ranges (#238)
Browse files Browse the repository at this point in the history
This change updates the Recon protocol to check for any missing values
when a synchronized range of keys is discovered. This way as nodes are
synchronized they ensure they also have all values for their known keys.
  • Loading branch information
nathanielc authored Jan 25, 2024
1 parent 7177c1c commit 393b876
Show file tree
Hide file tree
Showing 13 changed files with 1,012 additions and 401 deletions.
89 changes: 41 additions & 48 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ pub trait Recon: Clone + Send + Sync {
type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>;

async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()>;
async fn range(
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>>;
) -> Result<Vec<(Self::Key, Vec<u8>)>>;

async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>>;
}
Expand All @@ -63,16 +63,18 @@ where
Ok(())
}

async fn range(
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>> {
Ok(recon::Client::range(self, start, end, offset, limit)
.await?
.collect())
) -> Result<Vec<(Self::Key, Vec<u8>)>> {
Ok(
recon::Client::range_with_values(self, start, end, offset, limit)
.await?
.collect(),
)
}
async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>> {
recon::Client::value_for_key(self, key).await
Expand Down Expand Up @@ -132,13 +134,12 @@ where
Ok(resp)
}

#[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))]
#[instrument(skip(self, _context, event), fields(event.id = event.event_id, event.data.len = event.event_data.len()), ret(level = Level::DEBUG), err(level = Level::ERROR))]
async fn events_post(
&self,
event: Event,
_context: &C,
) -> Result<EventsPostResponse, ApiError> {
debug!(event_id = event.event_id, "events_post");
let event_id = decode_event_id(&event.event_id)?;
let event_data = decode_event_data(&event.event_data)?;
self.model
Expand Down Expand Up @@ -233,33 +234,24 @@ where
.with_not_after(0)
.build();
self.interest
.insert(interest, None)
// We must store a value for the interest otherwise Recon will try forever to
// synchronize the value.
// In the case of interests an empty value is sufficient.
.insert(interest, Some(vec![]))
.await
.map_err(|err| ApiError(format!("failed to update interest: {err}")))?;

let mut events = Vec::new();
for id in self
let events = self
.model
.range(start, stop, offset, limit)
.range_with_values(start, stop, offset, limit)
.await
.map_err(|err| ApiError(format!("failed to get keys: {err}")))?
.into_iter()
{
let event_data = self
.model
.value_for_key(id.clone())
.await
.map_err(|err| ApiError(format!("failed to get event data: {err}")))?;
events.push(Event {
.map(|(id, data)| Event {
event_id: multibase::encode(multibase::Base::Base16Lower, id.as_bytes()),
event_data: multibase::encode(
multibase::Base::Base64,
// Use the empty bytes for keys with no value.
// This way we are explicit there is no value rather that its just missing.
&event_data.unwrap_or_default(),
),
});
}
event_data: multibase::encode(multibase::Base::Base64, data),
})
.collect();
Ok(SubscribeSortKeySortValueGetResponse::Success(events))
}
}
Expand Down Expand Up @@ -291,13 +283,13 @@ mod tests {
mock! {
pub ReconInterestTest {
fn insert(&self, key: Interest, value: Option<Vec<u8>>) -> Result<()>;
fn range(
fn range_with_values(
&self,
start: Interest,
end: Interest,
offset: usize,
limit: usize,
) -> Result<Vec<Interest>>;
) -> Result<Vec<(Interest, Vec<u8>)>>;
}

impl Clone for ReconInterestTest {
Expand All @@ -312,14 +304,14 @@ mod tests {
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
self.insert(key, value)
}
async fn range(
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>> {
self.range(start, end, offset, limit)
) -> Result<Vec<(Self::Key, Vec<u8>)>> {
self.range_with_values(start, end, offset, limit)
}
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
Expand All @@ -329,13 +321,13 @@ mod tests {
mock! {
pub ReconModelTest {
fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<()>;
fn range(
fn range_with_values(
&self,
start: EventId,
end: EventId,
offset: usize,
limit: usize,
) -> Result<Vec<EventId>>;
) -> Result<Vec<(EventId,Vec<u8>)>>;
}
impl Clone for ReconModelTest {
fn clone(&self) -> Self;
Expand All @@ -349,14 +341,14 @@ mod tests {
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
self.insert(key, value)
}
async fn range(
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>> {
self.range(start, end, offset, limit)
) -> Result<Vec<(Self::Key, Vec<u8>)>> {
self.range_with_values(start, end, offset, limit)
}
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
Expand Down Expand Up @@ -444,9 +436,10 @@ mod tests {
.unwrap(),
)
.build();
let event_data = b"hello world";
let event = models::Event {
event_id: multibase::encode(multibase::Base::Base16Lower, event_id.as_slice()),
event_data: multibase::encode(multibase::Base::Base64, b""),
event_data: multibase::encode(multibase::Base::Base64, event_data),
};
// Setup mock expectations
let mut mock_interest = MockReconInterestTest::new();
Expand All @@ -461,21 +454,21 @@ mod tests {
.with_not_after(0)
.build(),
),
predicate::eq(None),
predicate::eq(Some(vec![])),
)
.times(1)
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
.expect_range_with_values()
.with(
predicate::eq(start),
predicate::eq(end),
predicate::eq(0),
predicate::eq(usize::MAX),
)
.times(1)
.returning(move |_, _, _, _| Ok(vec![event_id.clone()]));
.returning(move |_, _, _, _| Ok(vec![(event_id.clone(), event_data.into())]));
let server = Server::new(peer_id, network, mock_interest, mock_model);
let resp = server
.subscribe_sort_key_sort_value_get(
Expand Down Expand Up @@ -527,13 +520,13 @@ mod tests {
.with_not_after(0)
.build(),
),
predicate::eq(None),
predicate::eq(Some(vec![])),
)
.times(1)
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
.expect_range_with_values()
.with(
predicate::eq(start),
predicate::eq(end),
Expand Down Expand Up @@ -593,13 +586,13 @@ mod tests {
.with_not_after(0)
.build(),
),
predicate::eq(None),
predicate::eq(Some(vec![])),
)
.times(1)
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
.expect_range_with_values()
.with(
predicate::eq(start),
predicate::eq(end),
Expand Down Expand Up @@ -659,13 +652,13 @@ mod tests {
.with_not_after(0)
.build(),
),
predicate::eq(None),
predicate::eq(Some(vec![])),
)
.times(1)
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
.expect_range_with_values()
.with(
predicate::eq(start),
predicate::eq(end),
Expand Down
Loading

0 comments on commit 393b876

Please sign in to comment.