diff --git a/one/src/lib.rs b/one/src/lib.rs index 2d8e0279a..a9bb4f4a1 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -7,7 +7,7 @@ mod network; mod pubsub; mod sql; -use std::{env, path::PathBuf, str::FromStr, sync::Arc, time::Duration}; +use std::{env, num::NonZeroUsize, path::PathBuf, str::FromStr, sync::Arc, time::Duration}; use anyhow::{anyhow, Result}; use ceramic_core::{EventId, Interest, PeerId}; @@ -112,11 +112,11 @@ struct DaemonOpts { log_format: LogFormat, /// Specify maximum established outgoing connections. - #[arg(long, default_value_t = 2000, env = "CERAMIC_ONE_MAX_CONNS_OUT")] + #[arg(long, default_value_t = 2_000, env = "CERAMIC_ONE_MAX_CONNS_OUT")] max_conns_out: u32, /// Specify maximum established incoming connections. - #[arg(long, default_value_t = 2000, env = "CERAMIC_ONE_MAX_CONNS_IN")] + #[arg(long, default_value_t = 2_000, env = "CERAMIC_ONE_MAX_CONNS_IN")] max_conns_in: u32, /// Specify maximum pending outgoing connections. @@ -135,10 +135,55 @@ struct DaemonOpts { /// Specify idle connection timeout in milliseconds. #[arg( long, - default_value_t = 30000, + default_value_t = 30_000, env = "CERAMIC_ONE_IDLE_CONNS_TIMEOUT_MS" )] idle_conns_timeout_ms: u64, + + /// Sets to how many closest peers a record is replicated. + #[arg(long, default_value_t = NonZeroUsize::new(20).expect("> 0"), env = "CERAMIC_ONE_KADEMLIA_REPLICATION")] + kademlia_replication: NonZeroUsize, + + /// Sets the allowed level of parallelism for iterative queries. + #[arg(long, default_value_t = NonZeroUsize::new(16).expect("> 0"), env = "CERAMIC_ONE_KADEMLIA_PARALLELISM")] + kademlia_parallelism: NonZeroUsize, + + /// Sets the timeout in seconds for a single query. + /// + /// **Note**: A single query usually comprises at least as many requests + /// as the replication factor, i.e. this is not a request timeout. + #[arg( + long, + default_value_t = 60, + env = "CERAMIC_ONE_KADEMLIA_QUERY_TIMEOUT_SECS" + )] + kademlia_query_timeout_secs: u64, + + /// Sets the interval in seconds at which provider records for keys provided + /// by the local node are re-published. + /// + /// `0` means that stored provider records are never automatically + /// re-published. + /// + /// Must be significantly less than the provider record TTL. + #[arg( + long, + default_value_t = 43_200, + env = "CERAMIC_ONE_KADEMLIA_PROVIDER_PUBLICATION_INTERVAL_SECS" + )] + kademlia_provider_publication_interval_secs: u64, + + /// Sets the TTL in seconds for provider records. + /// + /// `0` means that stored provider records never expire. + /// + /// Must be significantly larger than the provider publication interval. + #[arg( + long, + default_value_t = 86_400, + env = "CERAMIC_ONE_KADEMLIA_PROVIDER_RECORD_TTL_SECS" + )] + kademlia_provider_record_ttl_secs: u64, } #[derive(ValueEnum, Debug, Clone, Default)] @@ -319,6 +364,24 @@ impl Daemon { .iter() .map(|addr| addr.parse()) .collect::, multiaddr::Error>>()?, + kademlia_replication_factor: opts.kademlia_replication, + kademlia_parallelism: opts.kademlia_parallelism, + kademlia_query_timeout: Duration::from_secs(opts.kademlia_query_timeout_secs), + kademlia_provider_publication_interval: if opts + .kademlia_provider_publication_interval_secs + == 0 + { + None + } else { + Some(Duration::from_secs( + opts.kademlia_provider_publication_interval_secs, + )) + }, + kademlia_provider_record_ttl: if opts.kademlia_provider_record_ttl_secs == 0 { + None + } else { + Some(Duration::from_secs(opts.kademlia_provider_record_ttl_secs)) + }, ..Default::default() }; debug!(?p2p_config, "using p2p config"); diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index d95996317..664c17035 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -141,11 +141,13 @@ where }; let store = MemoryStore::with_config(peer_id, mem_store_config); - // TODO: make user configurable let mut kad_config = kad::Config::default(); - kad_config.set_parallelism(16usize.try_into().unwrap()); - // TODO: potentially lower (this is per query) - kad_config.set_query_timeout(Duration::from_secs(60)); + kad_config.set_replication_factor(config.kademlia_replication_factor); + kad_config.set_parallelism(config.kademlia_parallelism); + kad_config.set_query_timeout(config.kademlia_query_timeout); + kad_config.set_provider_record_ttl(config.kademlia_provider_record_ttl); + kad_config + .set_provider_publication_interval(config.kademlia_provider_publication_interval); let mut kademlia = kad::Behaviour::with_config(pub_key.to_peer_id(), store, kad_config); for multiaddr in &config.bootstrap_peers { diff --git a/p2p/src/config.rs b/p2p/src/config.rs index cea3afa85..60669987f 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -68,6 +68,41 @@ pub struct Libp2pConfig { /// NOTE: It is generally not safe to trust observed addresses received from arbitrary peers. /// Only enable this option if its known that all connecting peers can be trusted. pub trust_observed_addrs: bool, + + /// Sets the kademlia replication factor. + /// + /// The replication factor determines to how many closest peers + /// a record is replicated. + pub kademlia_replication_factor: NonZeroUsize, + + /// Sets the allowed level of parallelism for iterative queries. + /// + /// The `α` parameter in the Kademlia paper. The maximum number of peers + /// that an iterative query is allowed to wait for in parallel while + /// iterating towards the closest nodes to a target. + pub kademlia_parallelism: NonZeroUsize, + + /// Sets the timeout for a single query. + /// + /// > **Note**: A single query usually comprises at least as many requests + /// > as the replication factor, i.e. this is not a request timeout. + pub kademlia_query_timeout: Duration, + + /// Sets the interval at which provider records for keys provided + /// by the local node are re-published. + /// + /// `None` means that stored provider records are never automatically + /// re-published. + /// + /// Must be significantly less than the provider record TTL. + pub kademlia_provider_publication_interval: Option, + + /// Sets the TTL for provider records. + /// + /// `None` means that stored provider records never expire. + /// + /// Must be significantly larger than the provider publication interval. + pub kademlia_provider_record_ttl: Option, } impl Default for Libp2pConfig { @@ -97,6 +132,11 @@ impl Default for Libp2pConfig { dial_concurrency_factor: NonZeroU8::new(8).expect("should not be zero"), idle_connection_timeout: Duration::from_secs(30), trust_observed_addrs: false, + kademlia_replication_factor: NonZeroUsize::new(20).expect("should not be zero"), + kademlia_parallelism: NonZeroUsize::new(16).expect("should not be zero"), + kademlia_query_timeout: Duration::from_secs(60), + kademlia_provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)), + kademlia_provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)), } } }