Skip to content

Commit

Permalink
fix: address PR review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jul 28, 2023
1 parent dde0858 commit 7b4ce6a
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 134 deletions.
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ async-stream = "0.3"
async-trait = "0.1"
asynchronous-codec = "0.6"
bytes = "1.1"
ceramic-api = { path = "./api" }
ceramic-api-server = { path = "./api-server" }
ceramic-core = { path = "./core" }
ceramic-one = { path = "./one" }
ceramic-p2p = { path = "./p2p" }
ceramic-api = { path = "./api" }
ceramic-api-server = { path = "./api-server" }
cid = { version = "0.9", features = ["serde-codec"] }
config = "0.13.1"
criterion = "0.4"
Expand All @@ -44,8 +44,8 @@ libp2p = { version = "0.51", default-features = false }
libp2p-identity = { version = "0.1.2", features = ["peerid", "ed25519"] }
libp2p-tls = { version = "0.1.0", default-features = false } # use explicit version of dep to avoid conflict
lru = "0.10"
minicbor = { version = "0.19.1", features = ["alloc"] }
multiaddr = "0.17" # use same version as Iroh
minicbor = { version = "0.19.1", features = ["alloc", "std"] }
multiaddr = "0.17" # use same version as Iroh
multibase = "0.9"
multihash = "0.17"
names = { version = "0.14.0", default-features = false }
Expand Down Expand Up @@ -76,7 +76,6 @@ unsigned-varint = "0.7"
void = "1.0"
zeroize = "1.4"


# Uncomment these lines to use a local copy of beetle
#[patch."https://github.com/nathanielc/beetle"]
#iroh-bitswap = { path = "../beetle/iroh-bitswap" }
Expand Down
20 changes: 10 additions & 10 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,20 @@ where
}

#[derive(Clone)]
pub struct Server<C, IR, MR> {
pub struct Server<C, I, M> {
peer_id: PeerId,
network: Network,
interest: IR,
model: MR,
interest: I,
model: M,
marker: PhantomData<C>,
}

impl<C, IR, MR> Server<C, IR, MR>
impl<C, I, M> Server<C, I, M>
where
IR: Recon<Key = Interest>,
MR: Recon<Key = EventId>,
I: Recon<Key = Interest>,
M: Recon<Key = EventId>,
{
pub fn new(peer_id: PeerId, network: Network, interest: IR, model: MR) -> Self {
pub fn new(peer_id: PeerId, network: Network, interest: I, model: M) -> Self {
Server {
peer_id,
network,
Expand All @@ -132,11 +132,11 @@ use std::error::Error;
use swagger::ApiError;

#[async_trait]
impl<C, IR, MR> Api<C> for Server<C, IR, MR>
impl<C, I, M> Api<C> for Server<C, I, M>
where
C: Send + Sync,
IR: Recon<Key = Interest> + Sync,
MR: Recon<Key = EventId> + Sync,
I: Recon<Key = Interest> + Sync,
M: Recon<Key = EventId> + Sync,
{
async fn ceramic_events_post(
&self,
Expand Down
123 changes: 63 additions & 60 deletions core/src/interest.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use anyhow::{anyhow, Result};
use cbor::{CborBytes, Decoder, Encoder};
use anyhow::Result;
use cid::multihash::{Hasher, Sha2_256};
use minicbor::{Decoder, Encoder};
use multibase::Base;
use serde::{Deserialize, Serialize};
use std::{fmt::Display, io::Cursor, str::FromStr};
use std::{fmt::Display, str::FromStr};

pub use libp2p_identity::PeerId;

Expand All @@ -21,42 +21,45 @@ impl Interest {
}

/// Report the sort key
pub fn sort_key(&self) -> Result<String> {
let mut decoder = Decoder::from_bytes(self.0.as_slice());
let sort_key_bytes = decode_bytes(&mut decoder)?;
Ok(String::from_utf8(sort_key_bytes)?)
pub fn sort_key_hash(&self) -> Result<&[u8]> {
let mut decoder = Decoder::new(&self.0);
Ok(decoder.bytes()?)
}

/// Report the PeerId value
pub fn peer_id(&self) -> Result<PeerId> {
let mut decoder = Decoder::from_bytes(self.0.as_slice());
let peer_id_bytes = decode_bytes(&mut decoder)?;
Ok(PeerId::from_bytes(&peer_id_bytes)?)
let mut decoder = Decoder::new(&self.0);
// Skip sort key
decoder.skip()?;
let peer_id_bytes = decoder.bytes()?;
Ok(PeerId::from_bytes(peer_id_bytes)?)
}

/// Report the range value
pub fn range(&self) -> Result<RangeOpen<Vec<u8>>> {
let mut decoder = Decoder::from_bytes(self.0.as_slice());
let mut decoder = Decoder::new(&self.0);
// Skip sort key
decoder.items().next();
decoder.skip()?;
// Skip peer_id
decoder.items().next();
let start = decode_bytes(&mut decoder)?;
let end = decode_bytes(&mut decoder)?;
decoder.skip()?;
let start = decoder.bytes()?.to_vec();
let end = decoder.bytes()?.to_vec();
Ok(RangeOpen { start, end })
}

/// Report the not after value
pub fn not_after(&self) -> Result<u64> {
let mut decoder = Decoder::from_bytes(self.0.as_slice());
let mut decoder = Decoder::new(&self.0);
// Skip sort key
decoder.skip()?;
// Skip peer_id
decoder.items().next();
// Skip start
decoder.items().next();
// Skip end
decoder.items().next();
decoder.skip()?;
// Skip start_key
decoder.skip()?;
// Skip end_key
decoder.skip()?;

decode_u64(&mut decoder)
Ok(decoder.u64()?)
}

/// Return the interest as a slice of bytes.
Expand All @@ -65,29 +68,6 @@ impl Interest {
}
}

fn decode_bytes(decoder: &mut Decoder<Cursor<Vec<u8>>>) -> Result<Vec<u8>> {
if let Some(item) = decoder.items().next() {
let item = item?;
match item {
cbor::Cbor::Bytes(data) => Ok(data.0),
item => Err(anyhow!("expected cbor bytes, found: {:?}", item)),
}
} else {
Err(anyhow!("expected top level cbor value, found nothing"))
}
}
fn decode_u64(decoder: &mut Decoder<Cursor<Vec<u8>>>) -> Result<u64> {
if let Some(item) = decoder.items().next() {
let item = item?;
match item {
cbor::Cbor::Unsigned(data) => Ok(data.into_u64()),
item => Err(anyhow!("expected cbor unsigned integer, found: {:?}", item)),
}
} else {
Err(anyhow!("expected top level cbor value, found nothing"))
}
}

impl Display for Interest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", multibase::encode(Base::Base58Btc, &self.0))
Expand Down Expand Up @@ -142,14 +122,26 @@ impl BuilderState for WithNotAfter {}

impl Builder<Init> {
pub fn with_sort_key(self, sort_key: &str) -> Builder<WithSortKey> {
// A typical interest contains:
//
// - sort_key : 8 bytes
// - peer_id: ~66 bytes
// - start_key: ~72 bytes
// - end_key: ~72 bytes
// - not_after: ~8 bytes
//
// with some cbor overhead that is about 256 bytes
//
// TODO: Emperically measure performance of this size.
const INITIAL_VEC_CAPACITY: usize = 256;
let mut hasher = Sha2_256::default();
hasher.update(sort_key.as_bytes());
// sha256 is 32 bytes safe to unwrap to [u8; 32]
let hash: [u8; 32] = hasher.finalize().try_into().unwrap();
let mut encoder = Encoder::from_memory();
let mut encoder = Encoder::new(Vec::with_capacity(INITIAL_VEC_CAPACITY));
encoder
// Encode last 8 bytes of the sort_key hash
.encode([CborBytes(hash[hash.len() - 8..].to_vec())])
.bytes(&hash[hash.len() - 8..])
.expect("sort_key should cbor encode");
Builder {
state: WithSortKey { encoder },
Expand All @@ -160,8 +152,8 @@ impl Builder<WithSortKey> {
pub fn with_peer_id(mut self, peer_id: &PeerId) -> Builder<WithPeerId> {
self.state
.encoder
.encode([CborBytes(peer_id.to_bytes())])
.expect("peer id should cbor encode");
.bytes(&peer_id.to_bytes())
.expect("peer_id should cbor encode");
Builder {
state: WithPeerId {
encoder: self.state.encoder,
Expand All @@ -174,11 +166,10 @@ impl Builder<WithPeerId> {
let range = range.into();
self.state
.encoder
.encode([
CborBytes(range.start.to_vec()),
CborBytes(range.end.to_vec()),
])
.expect("peer id should cbor encode");
.bytes(range.start)
.expect("start_key should cbor encode")
.bytes(range.end)
.expect("end_key should cbor encode");
Builder {
state: WithRange {
encoder: self.state.encoder,
Expand All @@ -190,8 +181,8 @@ impl Builder<WithRange> {
pub fn with_not_after(mut self, not_after: u64) -> Builder<WithNotAfter> {
self.state
.encoder
.encode([not_after])
.expect("not after should cbor encode");
.u64(not_after)
.expect("not_after should cbor encode");
Builder {
state: WithNotAfter {
encoder: self.state.encoder,
Expand All @@ -201,8 +192,8 @@ impl Builder<WithRange> {
}

impl Builder<WithNotAfter> {
pub fn build(mut self) -> Interest {
Interest(self.state.encoder.as_bytes().to_vec())
pub fn build(self) -> Interest {
Interest(self.state.encoder.into_writer())
}
}

Expand Down Expand Up @@ -247,15 +238,23 @@ mod tests {
}

#[test]
fn range() {
fn accessors() {
let peer_id = PeerId::from_str("1AZtAkWrrQrsXMQuBEcBget2vGAPbdQ2Wn4bESe9QEVypJ").unwrap();
let interest = Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((&[0x00, 0x01, 0x02][..], &[0x00, 0x01, 0x09][..]))
.with_not_after(0)
.with_not_after(123456789)
.build();

expect![[r#"
"0F70D652B6B825E4"
"#]]
.assert_debug_eq(&hex::encode_upper(interest.sort_key_hash().unwrap()));
expect![[r#"
"1AZtAkWrrQrsXMQuBEcBget2vGAPbdQ2Wn4bESe9QEVypJ"
"#]]
.assert_debug_eq(&interest.peer_id().unwrap().to_string());
expect![[r#"
RangeOpen {
start: [
Expand All @@ -271,5 +270,9 @@ mod tests {
}
"#]]
.assert_debug_eq(&interest.range().unwrap());
expect![[r#"
123456789
"#]]
.assert_debug_eq(&interest.not_after().unwrap());
}
}
8 changes: 4 additions & 4 deletions one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,16 @@ impl Builder<Init> {

/// Configure the p2p service
impl Builder<WithStore> {
pub async fn with_p2p<IR, MR>(
pub async fn with_p2p<I, M>(
self,
libp2p_config: Libp2pConfig,
key_store_path: PathBuf,
recons: Option<(IR, MR)>,
recons: Option<(I, M)>,
ceramic_peers_key: &str,
) -> anyhow::Result<Builder<WithP2p>>
where
IR: Recon<Key = Interest, Hash = Sha256a>,
MR: Recon<Key = EventId, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
{
let addr = Addr::new_mem();

Expand Down
12 changes: 6 additions & 6 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub const AGENT_VERSION: &str = concat!("iroh/", env!("CARGO_PKG_VERSION"));
/// Libp2p behaviour for the node.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event")]
pub(crate) struct NodeBehaviour<IR, MR> {
pub(crate) struct NodeBehaviour<I, M> {
ping: Ping,
identify: identify::Behaviour,
pub(crate) bitswap: Toggle<Bitswap<BitswapStore>>,
Expand All @@ -52,7 +52,7 @@ pub(crate) struct NodeBehaviour<IR, MR> {
pub(crate) gossipsub: Toggle<gossipsub::Behaviour>,
pub(crate) peer_manager: PeerManager,
limits: connection_limits::Behaviour,
recon: Toggle<recon::libp2p::Behaviour<IR, MR>>,
recon: Toggle<recon::libp2p::Behaviour<I, M>>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -88,17 +88,17 @@ impl Store for BitswapStore {
}
}

impl<IR, MR> NodeBehaviour<IR, MR>
impl<I, M> NodeBehaviour<I, M>
where
IR: Recon<Key = Interest, Hash = Sha256a>,
MR: Recon<Key = EventId, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
{
pub async fn new(
local_key: &Keypair,
config: &Libp2pConfig,
relay_client: Option<relay::client::Behaviour>,
rpc_client: Client,
recons: Option<(IR, MR)>,
recons: Option<(I, M)>,
) -> Result<Self> {
let peer_manager = PeerManager::default();
let pub_key = local_key.public();
Expand Down
Loading

0 comments on commit 7b4ce6a

Please sign in to comment.