Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: alloy bytes conversion #1552

Merged
merged 7 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions ethportal-api/src/types/portal_wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
network::{Network, Subnetwork},
},
utils::bytes::{hex_decode, hex_encode},
RawContentKey,
RawContentKey, RawContentValue,
};

/// The maximum size of a Discv5 packet.
Expand Down Expand Up @@ -494,14 +494,14 @@ impl From<Nodes> for Value {
#[derive(Debug, PartialEq, Clone, Encode, Decode)]
pub struct FindContent {
// TODO: Use some version of H256
pub content_key: Vec<u8>,
pub content_key: RawContentKey,
}

#[derive(Debug, PartialEq, Clone, Encode, Decode)]
#[ssz(enum_behaviour = "union")]
pub enum Content {
ConnectionId(u16),
Content(Vec<u8>),
Content(RawContentValue),
Enrs(Vec<SszEnr>),
}

Expand Down Expand Up @@ -530,7 +530,7 @@ pub struct Offer {
#[derive(Debug, Clone)]
pub struct PopulatedOffer {
/// All the offered content, pairing the keys and values
pub content_items: Vec<(RawContentKey, Vec<u8>)>,
pub content_items: Vec<(RawContentKey, RawContentValue)>,
}

impl From<PopulatedOffer> for Offer {
Expand All @@ -548,7 +548,7 @@ impl From<PopulatedOffer> for Offer {
#[derive(Debug, Clone)]
pub struct PopulatedOfferWithResult {
/// The offered content key & value
pub content_item: (RawContentKey, Vec<u8>),
pub content_item: (RawContentKey, RawContentValue),
/// The channel to send the result of the offer to
pub result_tx: tokio::sync::mpsc::UnboundedSender<OfferTrace>,
}
Expand Down Expand Up @@ -587,7 +587,10 @@ impl From<Accept> for Value {
#[allow(clippy::unwrap_used)]
mod test {
use super::*;
use alloy::primitives::bytes;
use alloy::{
hex::FromHex,
primitives::{bytes, Bytes},
};
use ssz_types::Error::OutOfBounds;
use std::str::FromStr;
use test_log::test;
Expand Down Expand Up @@ -703,7 +706,7 @@ mod test {

#[test]
fn message_encoding_find_content() {
let content_key = hex_decode("0x706f7274616c").unwrap();
let content_key = Bytes::from_hex("0x706f7274616c").unwrap();
let find_content = FindContent { content_key };
let find_content = Message::FindContent(find_content);

Expand Down Expand Up @@ -733,7 +736,7 @@ mod test {

#[test]
fn message_encoding_content_content() {
let content_val = hex_decode("0x7468652063616b652069732061206c6965").unwrap();
let content_val = Bytes::from_hex("0x7468652063616b652069732061206c6965").unwrap();
let content = Content::Content(content_val);
let content = Message::Content(content);

Expand Down
5 changes: 3 additions & 2 deletions portalnet/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{

use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
use discv5::{
enr::{CombinedKey, Enr as Discv5Enr, NodeId},
ConfigBuilder, Discv5, Event, ListenConfig, RequestError, TalkRequest,
Expand Down Expand Up @@ -314,7 +315,7 @@ impl Discovery {
enr: Enr,
subnetwork: Subnetwork,
request: ProtocolRequest,
) -> Result<Vec<u8>, RequestError> {
) -> Result<Bytes, RequestError> {
// Send empty protocol id if unable to convert it to bytes
let protocol = match self
.network_spec
Expand All @@ -329,7 +330,7 @@ impl Discovery {
};

let response = self.discv5.talk_req(enr, protocol, request).await?;
Ok(response)
Ok(Bytes::from(response))
}
}

Expand Down
11 changes: 6 additions & 5 deletions portalnet/src/find/iterators/findcontent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{

use crossbeam_channel::{unbounded, Receiver, Sender};
use discv5::kbucket::{Distance, Key};
use ethportal_api::RawContentValue;
use tracing::warn;

use super::{
Expand All @@ -40,7 +41,7 @@ use super::{

pub enum FindContentQueryResponse<TNodeId> {
ClosestNodes(Vec<TNodeId>),
Content(Vec<u8>),
Content(RawContentValue),
ConnectionId(u16),
}

Expand All @@ -51,7 +52,7 @@ pub enum FindContentQueryPending<TNodeId> {
NonePending,
/// Content returned, but not yet validated.
PendingContent {
content: Vec<u8>,
content: RawContentValue,
nodes_to_poke: Vec<TNodeId>,
// peer that sent the content
peer: TNodeId,
Expand All @@ -78,14 +79,14 @@ pub enum FindContentQueryResult<TNodeId> {
/// Content proposed by a peer, which has not been validated
#[derive(Debug, Clone)]
enum UnvalidatedContent {
Content(Vec<u8>),
Content(RawContentValue),
Connection(u16),
}

#[derive(Debug, Clone)]
pub struct ValidatedContent<TNodeId> {
/// The body of the content
pub content: Vec<u8>,
pub content: RawContentValue,
/// Was the content transferred via uTP?
pub was_utp_transfer: bool,
/// Which peer sent the content that was validated?
Expand Down Expand Up @@ -584,7 +585,7 @@ mod tests {
// but validation status is unknown
let mut validating: Vec<_> = vec![];

let found_content: Vec<u8> = vec![0xef];
let found_content = RawContentValue::from([0xef]);
let mut content_peer = None;

'finished: loop {
Expand Down
6 changes: 3 additions & 3 deletions portalnet/src/find/query_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ethportal_api::{
portal_wire::{Content, FindContent, FindNodes, Request},
query_trace::QueryTrace,
},
OverlayContentKey,
OverlayContentKey, RawContentValue,
};

/// Information about a query.
Expand All @@ -26,7 +26,7 @@ pub struct QueryInfo<TContentKey> {
// (content_value, utp_transfer, trace)
// Returns an OverlayRequestError if the content wasn't found on the network
pub type RecursiveFindContentResult =
Result<(Vec<u8>, bool, Option<QueryTrace>), OverlayRequestError>;
Result<(RawContentValue, bool, Option<QueryTrace>), OverlayRequestError>;

// Content, utp_transfer
// Content is Content type because the response to a simple find content query
Expand Down Expand Up @@ -73,7 +73,7 @@ impl<TContentKey: OverlayContentKey> QueryInfo<TContentKey> {
Request::FindNodes(FindNodes { distances })
}
QueryType::FindContent { ref target, .. } => Request::FindContent(FindContent {
content_key: target.to_bytes().to_vec(),
content_key: target.to_bytes(),
}),
};

Expand Down
8 changes: 4 additions & 4 deletions portalnet/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use ethportal_api::{
portal_wire::{OfferTrace, PopulatedOffer, PopulatedOfferWithResult, Request, Response},
},
utils::bytes::{hex_encode, hex_encode_compact},
OverlayContentKey,
OverlayContentKey, RawContentValue,
};

/// Datatype to store the result of a gossip request.
Expand All @@ -39,7 +39,7 @@ pub struct GossipResult {
/// Propagate gossip in a way that can be used across threads, without &self.
/// Doesn't trace gossip results
pub fn propagate_gossip_cross_thread<TContentKey: OverlayContentKey, TMetric: Metric>(
content: Vec<(TContentKey, Vec<u8>)>,
content: Vec<(TContentKey, RawContentValue)>,
kbuckets: &SharedKBucketsTable,
command_tx: mpsc::UnboundedSender<OverlayCommand<TContentKey>>,
utp_controller: Option<Arc<UtpController>>,
Expand All @@ -62,7 +62,7 @@ pub fn propagate_gossip_cross_thread<TContentKey: OverlayContentKey, TMetric: Me
let mut content_id_to_interested_enrs = kbuckets.batch_interested_enrs::<TMetric>(&content_ids);

// Map from ENRs to content they will gossip
let mut enrs_and_content: HashMap<Enr, Vec<&(TContentKey, Vec<u8>)>> = HashMap::new();
let mut enrs_and_content: HashMap<Enr, Vec<&(TContentKey, RawContentValue)>> = HashMap::new();
for (content_id, content_key_value) in &content {
let interested_enrs = content_id_to_interested_enrs.remove(content_id).unwrap_or_else(|| {
error!("interested_enrs should contain all content ids, even if there are no interested ENRs");
Expand Down Expand Up @@ -149,7 +149,7 @@ pub async fn trace_propagate_gossip_cross_thread<
TMetric: Metric,
>(
content_key: TContentKey,
data: Vec<u8>,
data: RawContentValue,
kbuckets: &SharedKBucketsTable,
command_tx: mpsc::UnboundedSender<OverlayCommand<TContentKey>>,
) -> GossipResult {
Expand Down
19 changes: 11 additions & 8 deletions portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use anyhow::anyhow;
use bytes::Bytes;
use discv5::{
enr::NodeId,
kbucket::{FailureReason, InsertResult, KBucketsTable, NodeStatus},
Expand Down Expand Up @@ -50,7 +51,7 @@ use ethportal_api::{
},
},
utils::bytes::hex_encode,
OverlayContentKey, RawContentKey,
OverlayContentKey, RawContentKey, RawContentValue,
};
use trin_metrics::{overlay::OverlayMetricsReporter, portalnet::PORTALNET_METRICS};
use trin_storage::ContentStore;
Expand Down Expand Up @@ -205,7 +206,7 @@ impl<
}

/// Propagate gossip accepted content via OFFER/ACCEPT, return number of peers propagated
pub fn propagate_gossip(&self, content: Vec<(TContentKey, Vec<u8>)>) -> usize {
pub fn propagate_gossip(&self, content: Vec<(TContentKey, RawContentValue)>) -> usize {
propagate_gossip_cross_thread::<_, TMetric>(
content,
&self.kbuckets,
Expand All @@ -219,7 +220,7 @@ impl<
pub async fn propagate_gossip_trace(
&self,
content_key: TContentKey,
data: Vec<u8>,
data: RawContentValue,
) -> GossipResult {
trace_propagate_gossip_cross_thread::<_, TMetric>(
content_key,
Expand Down Expand Up @@ -382,7 +383,7 @@ impl<
pub async fn send_find_content(
&self,
enr: Enr,
content_key: Vec<u8>,
content_key: RawContentKey,
) -> Result<FindContentResult, OverlayRequestError> {
// Construct the request.
let request = FindContent {
Expand Down Expand Up @@ -417,7 +418,9 @@ impl<
// Init uTP stream if `connection_id` is received
Content::ConnectionId(conn_id) => {
let conn_id = u16::from_be(conn_id);
let content = self.init_find_content_stream(enr, conn_id).await?;
let content = RawContentValue::from(
self.init_find_content_stream(enr, conn_id).await?,
);
match self.validate_content(&content_key, &content).await {
Ok(_) => Ok((Content::Content(content), true)),
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
Expand Down Expand Up @@ -451,7 +454,7 @@ impl<
&self,
enr: Enr,
conn_id: u16,
) -> Result<Vec<u8>, OverlayRequestError> {
) -> Result<Bytes, OverlayRequestError> {
let cid = utp_rs::cid::ConnectionId {
recv: conn_id,
send: conn_id.wrapping_add(1),
Expand All @@ -471,7 +474,7 @@ impl<
pub async fn send_offer(
&self,
enr: Enr,
content_items: Vec<(RawContentKey, Vec<u8>)>,
content_items: Vec<(RawContentKey, RawContentValue)>,
) -> Result<Accept, OverlayRequestError> {
// Construct the request.
let request = Request::PopulatedOffer(PopulatedOffer { content_items });
Expand All @@ -493,7 +496,7 @@ impl<
&self,
enr: Enr,
content_key: RawContentKey,
content_value: Vec<u8>,
content_value: RawContentValue,
) -> Result<OfferTrace, OverlayRequestError> {
// Construct the request.
let (result_tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
Expand Down
Loading