Skip to content

Commit

Permalink
chore: update libp2p to 0.52.4
Browse files Browse the repository at this point in the history
Fixes newly deprecated API. Trivial changes were fixed and the rest
allowed.

Additionally the `config` handling left over from iroh/beetle has been
removed.
  • Loading branch information
nathanielc committed Oct 23, 2023
1 parent 078a773 commit ea889a7
Show file tree
Hide file tree
Showing 39 changed files with 711 additions and 2,653 deletions.
1,735 changes: 464 additions & 1,271 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 9 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,15 @@ ceramic-kubo-rpc-server = { path = "./kubo-rpc-server" }
ceramic-metadata = { path = "./metadata" }
ceramic-one = { path = "./one" }
ceramic-p2p = { path = "./p2p" }
cid = { version = "0.9", features = ["serde-codec"] }
cid = { version = "0.10", features = ["serde-codec"] }
clap = { version = "4", features = ["derive", "env"] }
clap_mangen = "0.2.2"
config = "0.13.1"
console = { version = "0.15", default-features = false }
console-subscriber = "0.1.7"
criterion = "0.4"
crossterm = "0.25"
ctrlc = "3.2.2"
dag-jose = "0.1"
dag-jose = "0.1.3"
deadqueue = "0.2.3"
derivative = "2.2"
derive_more = "0.99.17"
Expand Down Expand Up @@ -94,19 +93,19 @@ iroh-rpc-types = { path = "./beetle/iroh-rpc-types" }
iroh-util = { path = "./beetle/iroh-util" }
keyed_priority_queue = "0.4.1"
lazy_static = "1.4"
libipld = "0.15"
libp2p = { version = "0.51", default-features = false }
libp2p-identity = { version = "0.1.2", features = ["peerid", "ed25519"] }
libp2p-tls = { version = "0.1.0", default-features = false } # use explicit version of dep to avoid conflict
libipld = "0.16"
libipld-cbor = "0.16"
libp2p = { version = "0.52.4", default-features = false }
libp2p-identity = { version = "0.2", features = ["peerid", "ed25519"] }
lru = "0.10"
mime = "0.3"
mime_classifier = "0.0.1"
mime_guess = "2.0.4"
minicbor = { version = "0.19.1", features = ["alloc", "std"] }
mockall = "0.11.4"
multiaddr = "0.17" # use same version as Iroh
multiaddr = "0.18"
multibase = "0.9"
multihash = "0.17"
multihash = "0.18"
names = { version = "0.14.0", default-features = false }
nix = "0.26"
num_enum = "0.5.7"
Expand All @@ -116,7 +115,7 @@ opentelemetry-otlp = "0.11"
par-stream = { version = "0.10.2", default-features = false }
paste = "1.0.9"
phf = "0.11"
prometheus-client = "0.19"
prometheus-client = "0.21"
proptest = "1"
prost = "0.11"
prost-build = "0.11.1"
Expand Down Expand Up @@ -196,4 +195,3 @@ authors = [
]
license = "Apache-2.0/MIT"
repository = "https://github.com/3box/rust-ceramic"
rust-version = "1.65"
8 changes: 0 additions & 8 deletions beetle/.cargo/config.toml

This file was deleted.

1 change: 0 additions & 1 deletion beetle/iroh-bitswap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
publish = false

[build-dependencies]
Expand Down
55 changes: 27 additions & 28 deletions beetle/iroh-bitswap/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,10 @@ use futures::{
stream::{BoxStream, SelectAll},
};
use iroh_metrics::{bitswap::BitswapMetrics, core::MRecorder, inc};
use libp2p::swarm::handler::FullyNegotiatedInbound;
use libp2p::swarm::{
handler::{DialUpgradeError, FullyNegotiatedOutbound},
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
};
use libp2p::{
core::{
muxing::SubstreamBox,
upgrade::{NegotiationError, UpgradeError},
Negotiated,
},
swarm::handler::FullyNegotiatedInbound,
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol,
};
use smallvec::SmallVec;
use tokio::sync::oneshot;
Expand All @@ -49,8 +41,8 @@ pub enum BitswapHandlerError {
#[error("negotiation timeout")]
NegotiationTimeout,
/// Protocol negotiation failed.
#[error("negotatiation protocol error {0}")]
NegotiationProtocolError(#[from] NegotiationError),
#[error("no protocol could be agreed upon")]
NegotiationProtocolError,
/// IO error.
#[error("io {0}")]
Io(#[from] std::io::Error),
Expand Down Expand Up @@ -119,7 +111,7 @@ pub struct BitswapHandler {
idle_timeout: Duration,

/// Collection of errors from attempting an upgrade.
upgrade_errors: VecDeque<ConnectionHandlerUpgrErr<BitswapHandlerError>>,
upgrade_errors: VecDeque<StreamUpgradeError<BitswapHandlerError>>,

/// Flag determining whether to maintain the connection to the peer.
keep_alive: KeepAlive,
Expand Down Expand Up @@ -149,6 +141,8 @@ impl Debug for BitswapHandler {

impl BitswapHandler {
/// Builds a new [`BitswapHandler`].
// TODO(nathanielc): Remove uses of KeepAlive::Until
#[allow(deprecated)]
pub fn new(protocol_config: ProtocolConfig, idle_timeout: Duration) -> Self {
Self {
listen_protocol: SubstreamProtocol::new(protocol_config, ()),
Expand All @@ -165,8 +159,8 @@ impl BitswapHandler {
}

impl ConnectionHandler for BitswapHandler {
type InEvent = BitswapHandlerIn;
type OutEvent = HandlerEvent;
type FromBehaviour = BitswapHandlerIn;
type ToBehaviour = HandlerEvent;
type Error = BitswapHandlerError;
type InboundOpenInfo = ();
type InboundProtocol = ProtocolConfig;
Expand All @@ -181,6 +175,8 @@ impl ConnectionHandler for BitswapHandler {
self.keep_alive
}

// TODO(nathanielc): Remove uses of KeepAlive::Until
#[allow(deprecated)]
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BitswapConnectionHandlerEvent> {
inc!(BitswapMetrics::HandlerPollCount);
if !self.events.is_empty() {
Expand All @@ -193,13 +189,12 @@ impl ConnectionHandler for BitswapHandler {
if let Some(error) = self.upgrade_errors.pop_front() {
inc!(BitswapMetrics::HandlerConnUpgradeErrors);
let reported_error = match error {
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => {
BitswapHandlerError::NegotiationTimeout
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => {
BitswapHandlerError::NegotiationProtocolError(negotiation_error)
StreamUpgradeError::Timeout => BitswapHandlerError::NegotiationTimeout,
StreamUpgradeError::Apply(e) => e,
StreamUpgradeError::NegotiationFailed => {
BitswapHandlerError::NegotiationProtocolError
}
StreamUpgradeError::Io(e) => e.into(),
};

// Close the connection
Expand All @@ -221,7 +216,7 @@ impl ConnectionHandler for BitswapHandler {
}

if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) {
if let ConnectionHandlerEvent::Custom(HandlerEvent::Message { .. }) = event {
if let ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::Message { .. }) = event {
// Update keep alive as we have received a message
self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout);
}
Expand Down Expand Up @@ -275,11 +270,15 @@ impl ConnectionHandler for BitswapHandler {
self.upgrade_errors.push_back(error);
}

libp2p::swarm::handler::ConnectionEvent::ListenUpgradeError(_) => {}
libp2p::swarm::handler::ConnectionEvent::ListenUpgradeError(_)
| libp2p::swarm::handler::ConnectionEvent::LocalProtocolsChange(_)
| libp2p::swarm::handler::ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}

fn on_behaviour_event(&mut self, event: Self::InEvent) {
// TODO(nathanielc): Remove uses of KeepAlive::Until
#[allow(deprecated)]
fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
match event {
BitswapHandlerIn::Message(m, response) => {
self.send_queue.push_back((m, response));
Expand All @@ -299,14 +298,14 @@ impl ConnectionHandler for BitswapHandler {
}

fn inbound_substream(
mut substream: Framed<Negotiated<SubstreamBox>, BitswapCodec>,
mut substream: Framed<libp2p::Stream, BitswapCodec>,
) -> impl Stream<Item = BitswapConnectionHandlerEvent> {
async_stream::stream! {
while let Some(message) = substream.next().await {
match message {
Ok((message, protocol)) => {
// reset keep alive idle timeout
yield ConnectionHandlerEvent::Custom(HandlerEvent::Message { message, protocol });
yield ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::Message { message, protocol });
}
Err(error) => match error {
BitswapHandlerError::MaxTransmissionSize => {
Expand Down Expand Up @@ -335,14 +334,14 @@ fn inbound_substream(
}

fn outbound_substream(
mut substream: Framed<Negotiated<SubstreamBox>, BitswapCodec>,
mut substream: Framed<libp2p::Stream, BitswapCodec>,
(message, response): (BitswapMessage, BitswapMessageResponse),
) -> impl Stream<Item = BitswapConnectionHandlerEvent> {
async_stream::stream! {
if let Err(error) = substream.feed(message).await {
warn!("failed to write item: {:?}", error);
response.send(Err(network::SendError::Other(error.to_string()))).ok();
yield ConnectionHandlerEvent::Custom(
yield ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::FailedToSendMessage { error }
);
} else {
Expand Down
39 changes: 26 additions & 13 deletions beetle/iroh-bitswap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use cid::Cid;
use handler::{BitswapHandler, HandlerEvent};
use iroh_metrics::record;
use iroh_metrics::{bitswap::BitswapMetrics, core::MRecorder, inc};
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::ConnectionId;
use libp2p::swarm::{
CloseConnection, DialError, NetworkBehaviour, NotifyHandler, PollParameters, ToSwarm,
};
use libp2p::{swarm::dial_opts::DialOpts, StreamProtocol};
use libp2p::{Multiaddr, PeerId};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -246,7 +246,7 @@ impl<S: Store> Bitswap<S> {
}

/// Called on identify events from swarm, informing us about available protocols of this peer.
pub fn on_identify(&self, peer: &PeerId, protocols: &[String]) {
pub fn on_identify(&self, peer: &PeerId, protocols: &[StreamProtocol]) {
if let Some(PeerState::Connected(conn_id)) = self.get_peer_state(peer) {
let mut protocols: Vec<ProtocolId> =
protocols.iter().filter_map(ProtocolId::try_from).collect();
Expand Down Expand Up @@ -401,16 +401,7 @@ pub enum BitswapEvent {

impl<S: Store> NetworkBehaviour for Bitswap<S> {
type ConnectionHandler = BitswapHandler;
type OutEvent = BitswapEvent;

fn new_handler(&mut self) -> Self::ConnectionHandler {
let protocol_config = self.protocol_config.clone();
BitswapHandler::new(protocol_config, self.idle_timeout)
}

fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
Default::default()
}
type ToSwarm = BitswapEvent;

fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<Self::ConnectionHandler>) {
match event {
Expand Down Expand Up @@ -482,7 +473,7 @@ impl<S: Store> NetworkBehaviour for Bitswap<S> {
&mut self,
cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<libp2p::swarm::ToSwarm<Self::OutEvent, libp2p::swarm::THandlerInEvent<Self>>> {
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
inc!(BitswapMetrics::ToSwarmPollTick);
// limit work
for _ in 0..50 {
Expand Down Expand Up @@ -582,6 +573,28 @@ impl<S: Store> NetworkBehaviour for Bitswap<S> {

Poll::Pending
}

fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> std::result::Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
let protocol_config = self.protocol_config.clone();
Ok(BitswapHandler::new(protocol_config, self.idle_timeout))
}

fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &Multiaddr,
_role_override: libp2p::core::Endpoint,
) -> std::result::Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
let protocol_config = self.protocol_config.clone();
Ok(BitswapHandler::new(protocol_config, self.idle_timeout))
}
}

#[cfg(test)]
Expand Down
24 changes: 12 additions & 12 deletions beetle/iroh-bitswap/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use asynchronous_codec::{Decoder, Encoder, Framed};
use bytes::{Bytes, BytesMut};
use futures::future;
use futures::io::{AsyncRead, AsyncWrite};
use libp2p::core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use prost::Message;
use unsigned_varint::codec;

Expand All @@ -22,27 +22,27 @@ pub enum ProtocolId {
Bitswap120 = 3,
}

impl ProtocolName for ProtocolId {
fn protocol_name(&self) -> &[u8] {
impl AsRef<str> for ProtocolId {
fn as_ref(&self) -> &str {
match self {
ProtocolId::Legacy => b"/ipfs/bitswap",
ProtocolId::Bitswap100 => b"/ipfs/bitswap/1.0.0",
ProtocolId::Bitswap110 => b"/ipfs/bitswap/1.1.0",
ProtocolId::Bitswap120 => b"/ipfs/bitswap/1.2.0",
ProtocolId::Legacy => "/ipfs/bitswap",
ProtocolId::Bitswap100 => "/ipfs/bitswap/1.0.0",
ProtocolId::Bitswap110 => "/ipfs/bitswap/1.1.0",
ProtocolId::Bitswap120 => "/ipfs/bitswap/1.2.0",
}
}
}

impl ProtocolId {
pub fn try_from(value: impl AsRef<[u8]>) -> Option<Self> {
pub fn try_from(value: impl AsRef<str>) -> Option<Self> {
let value = value.as_ref();
if value == ProtocolId::Legacy.protocol_name() {
if value == ProtocolId::Legacy.as_ref() {
Some(ProtocolId::Legacy)
} else if value == ProtocolId::Bitswap100.protocol_name() {
} else if value == ProtocolId::Bitswap100.as_ref() {
Some(ProtocolId::Bitswap100)
} else if value == ProtocolId::Bitswap110.protocol_name() {
} else if value == ProtocolId::Bitswap110.as_ref() {
Some(ProtocolId::Bitswap110)
} else if value == ProtocolId::Bitswap120.protocol_name() {
} else if value == ProtocolId::Bitswap120.as_ref() {
Some(ProtocolId::Bitswap120)
} else {
None
Expand Down
13 changes: 9 additions & 4 deletions beetle/iroh-car/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@ version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
publish = false

[dependencies]
cid.workspace = true
futures.workspace = true
integer-encoding = { workspace = true, features = ["tokio_async"] }
ipld = { package = "libipld", version = "0.15" }
ipld-cbor = { package = "libipld-cbor", version = "0.15" }
libipld.workspace = true
libipld-cbor.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["io-util"] }

[dev-dependencies]
multihash.workspace = true
tokio = { workspace = true, features = ["macros", "sync", "rt", "fs", "io-util"] }
tokio = { workspace = true, features = [
"macros",
"sync",
"rt",
"fs",
"io-util",
] }

[features]
2 changes: 1 addition & 1 deletion beetle/iroh-car/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub enum Error {
#[error("Io error: {0}")]
Io(#[from] std::io::Error),
#[error("Cbor encoding error: {0}")]
Cbor(#[from] ipld::error::Error),
Cbor(#[from] libipld::error::Error),
#[error("ld read too large {0}")]
LdReadTooLarge(usize),
}
Expand Down
Loading

0 comments on commit ea889a7

Please sign in to comment.