Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stop synchronizing interests #632

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions one/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use ceramic_sql::sqlite::SqlitePool;
use clap::Args;
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use recon::{FullInterests, Recon, ReconInterestProvider};
use recon::{Recon, ReconInterestProvider};
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
use std::sync::Arc;
Expand Down Expand Up @@ -530,13 +530,6 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
// Construct a recon implementation for peers.
let recon_peer = Recon::new(peer_svc.clone(), PeerKeyInterests, recon_metrics.clone());

// Construct a recon implementation for interests.
let recon_interest = Recon::new(
interest_svc.clone(),
FullInterests::default(),
recon_metrics.clone(),
);

// Construct a recon implementation for models.
let recon_model = Recon::new(
model_svc.clone(),
Expand All @@ -545,7 +538,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
recon_metrics,
);

let recons = Some((recon_peer, recon_interest, recon_model));
let recons = Some((recon_peer, recon_model));
let ipfs_metrics =
ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::IpfsMetrics::register);
let p2p_metrics = MetricsHandle::register(ceramic_p2p::Metrics::register);
Expand Down
7 changes: 3 additions & 4 deletions one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use ceramic_core::{EventId, Interest, NodeId, NodeKey, PeerKey};
use ceramic_core::{EventId, NodeId, NodeKey, PeerKey};
use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService};
use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, PeerService};
use iroh_rpc_client::P2pClient;
Expand Down Expand Up @@ -34,18 +34,17 @@ impl BuilderState for WithP2p {}

/// Configure the p2p service
impl Builder<Init> {
pub async fn with_p2p<P, I, M, S>(
pub async fn with_p2p<P, M, S>(
self,
libp2p_config: Libp2pConfig,
node_key: NodeKey,
peer_svc: impl PeerService + 'static,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
metrics: ceramic_p2p::Metrics,
) -> anyhow::Result<Builder<WithP2p>>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
Expand Down
15 changes: 7 additions & 8 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use anyhow::Result;
use ceramic_core::{EventId, Interest, PeerKey};
use ceramic_core::{EventId, PeerKey};
use iroh_bitswap::{Bitswap, Block, Config as BitswapConfig};
use libp2p::{
autonat,
Expand Down Expand Up @@ -36,7 +36,7 @@ pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION"
/// Libp2p behaviour for the node.
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "Event")]
pub(crate) struct NodeBehaviour<P, I, M, S>
pub(crate) struct NodeBehaviour<P, M, S>
where
S: iroh_bitswap::Store + Send + Sync,
{
Expand All @@ -56,21 +56,20 @@ where
relay: Toggle<relay::Behaviour>,
relay_client: Toggle<relay::client::Behaviour>,
dcutr: Toggle<dcutr::Behaviour>,
recon: Toggle<recon::libp2p::Behaviour<P, I, M>>,
recon: Toggle<recon::libp2p::Behaviour<P, M>>,
}

impl<P, I, M, S> NodeBehaviour<P, I, M, S>
impl<P, M, S> NodeBehaviour<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a> + Send + Sync,
I: Recon<Key = Interest, Hash = Sha256a> + Send + Sync,
M: Recon<Key = EventId, Hash = Sha256a> + Send + Sync,
S: iroh_bitswap::Store + Send + Sync,
{
pub async fn new(
local_key: &Keypair,
config: &Libp2pConfig,
relay_client: Option<relay::client::Behaviour>,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
metrics: Metrics,
Expand Down Expand Up @@ -186,8 +185,8 @@ where
.with_max_pending_incoming(Some(config.max_conns_pending_in))
.with_max_established_per_peer(Some(config.max_conns_per_peer)),
);
let recon = recons.map(|(peer, interest, model)| {
recon::libp2p::Behaviour::new(peer, interest, model, recon::libp2p::Config::default())
let recon = recons.map(|(peer, model)| {
recon::libp2p::Behaviour::new(peer, model, recon::libp2p::Config::default())
});
Ok(NodeBehaviour {
ping: Ping::default(),
Expand Down
23 changes: 9 additions & 14 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{sync::atomic::Ordering, time::Duration};

use ahash::AHashMap;
use anyhow::{anyhow, bail, Context, Result};
use ceramic_core::{EventId, Interest, NodeKey, PeerKey};
use ceramic_core::{EventId, NodeKey, PeerKey};
use ceramic_metrics::{libp2p_metrics, Recorder};
use cid::Cid;
use futures_util::stream::StreamExt;
Expand Down Expand Up @@ -62,15 +62,14 @@ pub enum NetworkEvent {
/// Node implements a peer to peer node that participates on the Ceramic network.
///
/// Node provides an external API via RpcMessages.
pub struct Node<P, I, M, S>
pub struct Node<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
metrics: Metrics,
swarm: Swarm<NodeBehaviour<P, I, M, S>>,
swarm: Swarm<NodeBehaviour<P, M, S>>,
supported_protocols: HashSet<String>,
net_receiver_in: Receiver<RpcMessage>,
dial_queries: AHashMap<PeerId, Vec<OneShotSender<Result<()>>>>,
Expand All @@ -92,10 +91,9 @@ where
active_address_probe: Option<Multiaddr>,
}

impl<P, I, M, S> fmt::Debug for Node<P, I, M, S>
impl<P, M, S> fmt::Debug for Node<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
Expand Down Expand Up @@ -128,10 +126,9 @@ const NICE_INTERVAL: Duration = Duration::from_secs(6);
const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60);
const EXPIRY_INTERVAL: Duration = Duration::from_secs(1);

impl<P, I, M, S> Drop for Node<P, I, M, S>
impl<P, M, S> Drop for Node<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
Expand All @@ -143,12 +140,10 @@ where

// Allow IntoConnectionHandler deprecated associated type.
// We are not using IntoConnectionHandler directly only referencing the type as part of this event signature.
type NodeSwarmEvent<P, I, M, S> =
SwarmEvent<<NodeBehaviour<P, I, M, S> as NetworkBehaviour>::ToSwarm>;
impl<P, I, M, S> Node<P, I, M, S>
type NodeSwarmEvent<P, M, S> = SwarmEvent<<NodeBehaviour<P, M, S> as NetworkBehaviour>::ToSwarm>;
impl<P, M, S> Node<P, M, S>
where
P: Recon<Key = PeerKey, Hash = Sha256a> + Send + Sync,
I: Recon<Key = Interest, Hash = Sha256a> + Send + Sync,
M: Recon<Key = EventId, Hash = Sha256a> + Send + Sync,
S: iroh_bitswap::Store + Send + Sync,
{
Expand All @@ -157,7 +152,7 @@ where
rpc_addr: P2pAddr,
node_key: NodeKey,
peer_svc: impl PeerService + 'static,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
metrics: Metrics,
) -> Result<Self> {
Expand Down Expand Up @@ -494,7 +489,7 @@ where
#[tracing::instrument(skip_all)]
async fn handle_swarm_event(
&mut self,
event: NodeSwarmEvent<P, I, M, S>,
event: NodeSwarmEvent<P, M, S>,
) -> Result<Option<SwarmEventResult>> {
libp2p_metrics().record(&event);
match event {
Expand Down
16 changes: 7 additions & 9 deletions p2p/src/swarm.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use ceramic_core::{EventId, Interest, PeerKey};
use ceramic_core::{EventId, PeerKey};
use libp2p::{dns, noise, relay, tcp, tls, yamux, Swarm, SwarmBuilder};
use libp2p_identity::Keypair;
use recon::{libp2p::Recon, Sha256a};
Expand Down Expand Up @@ -28,17 +28,16 @@ fn get_dns_config() -> (dns::ResolverConfig, dns::ResolverOpts) {
}
}

pub(crate) async fn build_swarm<P, I, M, S>(
pub(crate) async fn build_swarm<P, M, S>(
config: &Libp2pConfig,
keypair: Keypair,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
metrics: Metrics,
) -> Result<Swarm<NodeBehaviour<P, I, M, S>>>
) -> Result<Swarm<NodeBehaviour<P, M, S>>>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
S: iroh_bitswap::Store,
{
Expand Down Expand Up @@ -105,18 +104,17 @@ where
}
}

fn new_behavior<P, I, M, S>(
fn new_behavior<P, M, S>(
config: &Libp2pConfig,
keypair: &Keypair,
relay_client: Option<relay::client::Behaviour>,
recons: Option<(P, I, M)>,
recons: Option<(P, M)>,
block_store: Arc<S>,
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
metrics: Metrics,
) -> Result<NodeBehaviour<P, I, M, S>>
) -> Result<NodeBehaviour<P, M, S>>
where
P: Recon<Key = PeerKey, Hash = Sha256a> + Send,
I: Recon<Key = Interest, Hash = Sha256a> + Send,
M: Recon<Key = EventId, Hash = Sha256a> + Send,
S: iroh_bitswap::Store,
{
Expand Down
7 changes: 1 addition & 6 deletions p2p/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ceramic_event_svc::store::SqlitePool;
use iroh_rpc_client::P2pClient;
use iroh_rpc_types::Addr;
use libp2p::{Multiaddr, PeerId};
use recon::{FullInterests, Recon, ReconInterestProvider};
use recon::{Recon, ReconInterestProvider};
use test_log::test;

use ceramic_p2p::{Config, Metrics, NetworkEvent, Node, PeerKeyInterests};
Expand Down Expand Up @@ -50,11 +50,6 @@ impl TestRunnerBuilder {
Arc::clone(&peer_svc),
Some((
Recon::new(peer_svc, PeerKeyInterests, recon_metrics.clone()),
Recon::new(
Arc::clone(&interest_svc),
FullInterests::default(),
recon_metrics.clone(),
),
Recon::new(
Arc::clone(&event_svc),
ReconInterestProvider::new(node_key.id(), interest_svc),
Expand Down
33 changes: 9 additions & 24 deletions recon/src/libp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod upgrade;
pub use crate::protocol::Recon;
pub use stream_set::StreamSet;

use ceramic_core::{EventId, Interest, PeerKey};
use ceramic_core::{EventId, PeerKey};
use futures::{future::BoxFuture, FutureExt};
use libp2p::{
core::ConnectedPoint,
Expand All @@ -43,8 +43,6 @@ use crate::{

/// Name of the Recon protocol for synchronizing peers
pub const PROTOCOL_NAME_PEER: &str = "/ceramic/recon/0.1.0/peer";
/// Name of the Recon protocol for synchronizing interests
pub const PROTOCOL_NAME_INTEREST: &str = "/ceramic/recon/0.1.0/interest";
/// Name of the Recon protocol for synchronizing models
pub const PROTOCOL_NAME_MODEL: &str = "/ceramic/recon/0.1.0/model";

Expand Down Expand Up @@ -76,9 +74,8 @@ impl Default for Config {
/// The Behavior tracks all peers on the network that speak the Recon protocol.
/// It is responsible for starting and stopping syncs with various peers depending on the needs of
/// the application.
pub struct Behaviour<P, I, M> {
pub struct Behaviour<P, M> {
peer: P,
interest: I,
model: M,
config: Config,
peers: BTreeMap<PeerId, PeerInfo>,
Expand All @@ -87,15 +84,13 @@ pub struct Behaviour<P, I, M> {
next_sync: Option<BoxFuture<'static, ()>>,
}

impl<P, I, M> std::fmt::Debug for Behaviour<P, I, M>
impl<P, M> std::fmt::Debug for Behaviour<P, M>
where
P: std::fmt::Debug,
I: std::fmt::Debug,
M: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Behaviour")
.field("interest", &self.interest)
.field("model", &self.model)
.field("config", &self.config)
.field("peers", &self.peers)
Expand Down Expand Up @@ -148,18 +143,16 @@ pub enum PeerStatus {
Stopped,
}

impl<P, I, M> Behaviour<P, I, M> {
impl<P, M> Behaviour<P, M> {
/// Create a new Behavior with the provided Recon implementation.
pub fn new(peer: P, interest: I, model: M, config: Config) -> Self
pub fn new(peer: P, model: M, config: Config) -> Self
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
{
let (tx, rx) = tokio::sync::mpsc::channel(1000);
Self {
peer,
interest,
model,
config,
peers: BTreeMap::new(),
Expand All @@ -178,13 +171,12 @@ impl<P, I, M> Behaviour<P, I, M> {
}
}

impl<P, I, M> NetworkBehaviour for Behaviour<P, I, M>
impl<P, M> NetworkBehaviour for Behaviour<P, M>
where
P: Recon<Key = PeerKey, Hash = Sha256a>,
I: Recon<Key = Interest, Hash = Sha256a>,
M: Recon<Key = EventId, Hash = Sha256a>,
{
type ConnectionHandler = Handler<P, I, M>;
type ConnectionHandler = Handler<P, M>;

type ToSwarm = Event;

Expand All @@ -205,13 +197,8 @@ where
next_sync: BTreeMap::from_iter([
// Schedule all stream_sets initially
(StreamSet::Peer, Instant::now()),
// Schedule interests after peers
(
StreamSet::Interest,
Instant::now() + Duration::from_millis(1),
),
// Schedule models after interests
(StreamSet::Model, Instant::now() + Duration::from_millis(2)),
// Schedule models after peers
(StreamSet::Model, Instant::now() + Duration::from_millis(1)),
]),
sync_delay: Default::default(),
});
Expand Down Expand Up @@ -395,7 +382,6 @@ where
connection_id,
handler::State::WaitingInbound,
self.peer.clone(),
self.interest.clone(),
self.model.clone(),
))
}
Expand All @@ -416,7 +402,6 @@ where
stream_set: StreamSet::Peer,
},
self.peer.clone(),
self.interest.clone(),
self.model.clone(),
))
}
Expand Down
Loading
Loading