Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose basic kademlia config #166

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 67 additions & 4 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod network;
mod pubsub;
mod sql;

use std::{env, path::PathBuf, str::FromStr, sync::Arc, time::Duration};
use std::{env, num::NonZeroUsize, path::PathBuf, str::FromStr, sync::Arc, time::Duration};

use anyhow::{anyhow, Result};
use ceramic_core::{EventId, Interest, PeerId};
Expand Down Expand Up @@ -112,11 +112,11 @@ struct DaemonOpts {
log_format: LogFormat,

/// Specify maximum established outgoing connections.
#[arg(long, default_value_t = 2000, env = "CERAMIC_ONE_MAX_CONNS_OUT")]
#[arg(long, default_value_t = 2_000, env = "CERAMIC_ONE_MAX_CONNS_OUT")]
max_conns_out: u32,

/// Specify maximum established incoming connections.
#[arg(long, default_value_t = 2000, env = "CERAMIC_ONE_MAX_CONNS_IN")]
#[arg(long, default_value_t = 2_000, env = "CERAMIC_ONE_MAX_CONNS_IN")]
max_conns_in: u32,

/// Specify maximum pending outgoing connections.
Expand All @@ -135,10 +135,55 @@ struct DaemonOpts {
/// Specify idle connection timeout in milliseconds.
#[arg(
long,
default_value_t = 30000,
default_value_t = 30_000,
env = "CERAMIC_ONE_IDLE_CONNS_TIMEOUT_MS"
)]
idle_conns_timeout_ms: u64,

/// Sets to how many closest peers a record is replicated.
#[arg(long, default_value_t = NonZeroUsize::new(20).expect("> 0"), env = "CERAMIC_ONE_KADEMLIA_REPLICATION")]
kademlia_replication: NonZeroUsize,

/// Sets the allowed level of parallelism for iterative queries.
#[arg(long, default_value_t = NonZeroUsize::new(16).expect("> 0"), env = "CERAMIC_ONE_KADEMLIA_PARALLELISM")]
kademlia_parallelism: NonZeroUsize,

/// Sets the timeout in seconds for a single query.
///
/// **Note**: A single query usually comprises at least as many requests
/// as the replication factor, i.e. this is not a request timeout.
#[arg(
long,
default_value_t = 60,
env = "CERAMIC_ONE_KADEMLIA_QUERY_TIMEOUT_SECS"
)]
kademlia_query_timeout_secs: u64,

/// Sets the interval in seconds at which provider records for keys provided
/// by the local node are re-published.
///
/// `0` means that stored provider records are never automatically
/// re-published.
///
/// Must be significantly less than the provider record TTL.
#[arg(
long,
default_value_t = 12 * 60 * 60,
env = "CERAMIC_ONE_KADEMLIA_PROVIDER_PUBLICATION_INTERVAL_SECS"
)]
kademlia_provider_publication_interval_secs: u64,

/// Sets the TTL in seconds for provider records.
///
/// `0` means that stored provider records never expire.
///
/// Must be significantly larger than the provider publication interval.
#[arg(
long,
default_value_t = 24 * 60 * 60,
env = "CERAMIC_ONE_KADEMLIA_PROVIDER_RECORD_TTL_SECS"
)]
kademlia_provider_record_ttl_secs: u64,
}

#[derive(ValueEnum, Debug, Clone, Default)]
Expand Down Expand Up @@ -319,6 +364,24 @@ impl Daemon {
.iter()
.map(|addr| addr.parse())
.collect::<Result<Vec<Multiaddr>, multiaddr::Error>>()?,
kademlia_replication_factor: opts.kademlia_replication,
kademlia_parallelism: opts.kademlia_parallelism,
kademlia_query_timeout: Duration::from_secs(opts.kademlia_query_timeout_secs),
kademlia_provider_publication_interval: if opts
.kademlia_provider_publication_interval_secs
== 0
{
None
} else {
Some(Duration::from_secs(
opts.kademlia_provider_publication_interval_secs,
))
},
kademlia_provider_record_ttl: if opts.kademlia_provider_record_ttl_secs == 0 {
None
} else {
Some(Duration::from_secs(opts.kademlia_provider_record_ttl_secs))
},
..Default::default()
};
debug!(?p2p_config, "using p2p config");
Expand Down
10 changes: 6 additions & 4 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ where
};
let store = MemoryStore::with_config(peer_id, mem_store_config);

// TODO: make user configurable
let mut kad_config = kad::Config::default();
kad_config.set_parallelism(16usize.try_into().unwrap());
// TODO: potentially lower (this is per query)
kad_config.set_query_timeout(Duration::from_secs(60));
kad_config.set_replication_factor(config.kademlia_replication_factor);
kad_config.set_parallelism(config.kademlia_parallelism);
kad_config.set_query_timeout(config.kademlia_query_timeout);
kad_config.set_provider_record_ttl(config.kademlia_provider_record_ttl);
kad_config
.set_provider_publication_interval(config.kademlia_provider_publication_interval);

let mut kademlia = kad::Behaviour::with_config(pub_key.to_peer_id(), store, kad_config);
for multiaddr in &config.bootstrap_peers {
Expand Down
40 changes: 40 additions & 0 deletions p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,41 @@ pub struct Libp2pConfig {
/// NOTE: It is generally not safe to trust observed addresses received from arbitrary peers.
/// Only enable this option if its known that all connecting peers can be trusted.
pub trust_observed_addrs: bool,

/// Sets the kademlia replication factor.
///
/// The replication factor determines to how many closest peers
/// a record is replicated.
pub kademlia_replication_factor: NonZeroUsize,

/// Sets the allowed level of parallelism for iterative queries.
///
/// The `α` parameter in the Kademlia paper. The maximum number of peers
/// that an iterative query is allowed to wait for in parallel while
/// iterating towards the closest nodes to a target.
pub kademlia_parallelism: NonZeroUsize,

/// Sets the timeout for a single query.
///
/// > **Note**: A single query usually comprises at least as many requests
/// > as the replication factor, i.e. this is not a request timeout.
pub kademlia_query_timeout: Duration,

/// Sets the interval at which provider records for keys provided
/// by the local node are re-published.
///
/// `None` means that stored provider records are never automatically
/// re-published.
///
/// Must be significantly less than the provider record TTL.
pub kademlia_provider_publication_interval: Option<Duration>,

/// Sets the TTL for provider records.
///
/// `None` means that stored provider records never expire.
///
/// Must be significantly larger than the provider publication interval.
pub kademlia_provider_record_ttl: Option<Duration>,
}

impl Default for Libp2pConfig {
Expand Down Expand Up @@ -97,6 +132,11 @@ impl Default for Libp2pConfig {
dial_concurrency_factor: NonZeroU8::new(8).expect("should not be zero"),
idle_connection_timeout: Duration::from_secs(30),
trust_observed_addrs: false,
kademlia_replication_factor: NonZeroUsize::new(20).expect("should not be zero"),
kademlia_parallelism: NonZeroUsize::new(16).expect("should not be zero"),
kademlia_query_timeout: Duration::from_secs(60),
kademlia_provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
kademlia_provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
}
}
}