Skip to content

Commit

Permalink
feat(pd): support archives for migrate and join
Browse files Browse the repository at this point in the history
Enables opt-in archive generation when performing:

  * pd export
  * pd migrate

The goal is to provide a standardized bottling-up of pd state,
specifically the rocksdb directory. In the context of upgrades,
only the "pd migrate" functionality change is what we care about:
we want the archived dir to contain both rocksdb data and the modified
genesis file.

Accordingly, `pd testnet join` is modified to support an optional
archive URL. If set, the remote tar.gz archive will be downloaded
and extracted, clobbering the cometbft config. A remote bootstrap node
is still contacted, to learn about peers, otherwise the newly created
node wouldn't be able to talk to the network.
  • Loading branch information
conorsch committed Mar 20, 2024
1 parent 407fc94 commit 83ae671
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 38 deletions.
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/bin/pd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ decaf377 = { workspace = true, features = ["parallel"],
decaf377-rdsa = { workspace = true }
directories = { workspace = true }
ed25519-consensus = { workspace = true }
flate2 = "1.0.28"
fs_extra = "1.3.0"
futures = { workspace = true }
hex = { workspace = true }
Expand Down Expand Up @@ -91,12 +92,13 @@ rand = { workspace = true }
rand_chacha = { workspace = true }
rand_core = { workspace = true, features = ["getrandom"] }
regex = { workspace = true }
reqwest = { version = "0.11", features = ["json"] }
reqwest = { version = "0.11", features = ["json", "stream"] }
rocksdb = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_with = { workspace = true, features = ["hex"] }
sha2 = { workspace = true }
tar = "0.4.40"
tempfile = { workspace = true }
tendermint = { workspace = true }
tendermint-config = { workspace = true }
Expand Down
26 changes: 21 additions & 5 deletions crates/bin/pd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,32 @@ pub enum RootCommand {
/// The home directory of the full node.
#[clap(long, env = "PENUMBRA_PD_HOME", display_order = 100)]
home: PathBuf,
/// The directory that the exported state will be written to.
/// The directory where the exported node state will be written.
#[clap(long, display_order = 200, alias = "export-path")]
export_directory: PathBuf,
/// An optional filepath for a compressed archive containing the exported
/// node state, e.g. ~/pd-backup.tar.gz.
#[clap(long, display_order = 200)]
export_path: PathBuf,
export_archive: Option<PathBuf>,
/// Whether to prune the JMT tree.
#[clap(long, display_order = 300)]
prune: bool,
},
/// Run a migration on the exported storage state of the full node,
/// and create a genesis file.
Migrate {
/// The directory containing exported state to which the upgrade will be applied.
#[clap(long, display_order = 200)]
target_dir: PathBuf,
/// The directory containing exported state, created via `pd export`, to be modified
/// in-place. This should be a pd home directory, with a subdirectory called "rocksdb".
#[clap(long, display_order = 200, alias = "target-dir")]
target_directory: PathBuf,
#[clap(long, display_order = 300)]
/// Timestamp of the genesis file in RFC3339 format. If unset, defaults to the current time,
/// unless the migration logic overrides it.
genesis_start: Option<tendermint::time::Time>,
/// An optional filepath for a compressed archive containing the migrated node state,
/// e.g. ~/pd-state-post-upgrade.tar.gz.
#[clap(long, display_order = 400)]
migrate_archive: Option<PathBuf>,
},
}

Expand Down Expand Up @@ -197,6 +206,13 @@ pub enum TestnetCommand {
default_value = "https://rpc.testnet.penumbra.zone"
)]
node: Url,

/// Optional URL of archived node state in .tar.gz format. The archive will be
/// downloaded and extracted locally, allowing the node to join a network at a block height
/// higher than 0.
#[clap(long)]
archive_url: Option<Url>,

/// Human-readable name to identify node on network
// Default: 'node-#'
#[clap(long, env = "PENUMBRA_PD_TM_MONIKER")]
Expand Down
137 changes: 112 additions & 25 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#![allow(clippy::clone_on_copy)]
#![deny(clippy::unwrap_used)]
#![recursion_limit = "512"]
use flate2::read::GzDecoder;
use std::error::Error;
use std::io::IsTerminal as _;
use std::io::Write;

use console_subscriber::ConsoleLayer;
use metrics_tracing_context::{MetricsLayer, TracingContextLayer};
Expand All @@ -25,6 +27,7 @@ use rand::Rng;
use rand_core::OsRng;
use tendermint_config::net::Address as TendermintAddress;
use tokio::runtime;
use tokio_stream::StreamExt;
use tower_http::cors::CorsLayer;
use tracing_subscriber::{prelude::*, EnvFilter};
use url::Url;
Expand Down Expand Up @@ -255,6 +258,7 @@ async fn main() -> anyhow::Result<()> {
tn_cmd:
TestnetCommand::Join {
node,
archive_url,
moniker,
external_address,
tendermint_rpc_bind,
Expand Down Expand Up @@ -290,14 +294,61 @@ async fn main() -> anyhow::Result<()> {
// Join the target testnet, looking up network info and writing
// local configs for pd and tendermint.
testnet_join(
output_dir,
output_dir.clone(),
node,
&node_name,
external_address,
tendermint_rpc_bind,
tendermint_p2p_bind,
)
.await?;

// Download and extract archive URL, if set.
if let Some(archive_url) = archive_url {
tracing::info!(%archive_url, "downloading compressed node state");

// Download. Adapted from https://rust-lang-nursery.github.io/rust-cookbook/web/clients/download.html
// Here we inspect HEAD so we can infer filename.
let response = reqwest::get(archive_url).await?;

let fname = response
.url()
.path_segments()
.and_then(|segments| segments.last())
.and_then(|name| if name.is_empty() { None } else { Some(name) })
.unwrap_or("pd-node-state-archive.tar.gz");

let archive_filepath = output_dir.join(fname);
let mut download_opts = std::fs::OpenOptions::new();
download_opts.create_new(true).write(true);
let mut archive_file = download_opts.open(&archive_filepath)?;

// Download via stream, in case file is too large to shove into RAM.
let mut stream = response.bytes_stream();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
archive_file.write_all(&chunk)?;
}
archive_file.flush()?;
tracing::info!("download complete: {}", archive_filepath.display());

// Extract
// Re-open downloaded file for unpacking, for a fresh filehandle.
let mut unpack_opts = std::fs::OpenOptions::new();
unpack_opts.read(true);
let f = unpack_opts.open(&archive_filepath)?;
let tar = GzDecoder::new(f);
let mut archive = tar::Archive::new(tar);
// This dir-path building is duplicated in the config gen code.
let pd_home = output_dir.join("node0").join("pd");
archive
.unpack(&pd_home)
.context("failed to extract tar.gz archive")?;
tracing::info!("archived node state unpacked to {}", pd_home.display());
// Post-extraction, clean up the downloaded tarball.
std::fs::remove_file(archive_filepath)?;
// TODO: use "migrate" tarball, clobber genesis file from migration.
}
}

RootCommand::Testnet {
Expand Down Expand Up @@ -379,44 +430,80 @@ async fn main() -> anyhow::Result<()> {
t.write_configs()?;
}
RootCommand::Export {
mut home,
mut export_path,
home,
export_directory,
export_archive,
prune,
} => {
use fs_extra;

tracing::info!("exporting state to {}", export_path.display());
// Export state as directory.
let src_rocksdb_dir = home.join("rocksdb");
tracing::info!(
"copying node state {} -> {}",
src_rocksdb_dir.display(),
export_directory.display()
);
std::fs::create_dir_all(&export_directory)?;
let copy_opts = fs_extra::dir::CopyOptions::new();
home.push("rocksdb");
let from = [home.as_path()];
tracing::info!(?home, ?export_path, "copying from data dir to export dir",);
std::fs::create_dir_all(&export_path)?;
fs_extra::copy_items(&from, export_path.as_path(), &copy_opts)?;

tracing::info!("done copying");
if !prune {
return Ok(());
fs_extra::copy_items(
&[src_rocksdb_dir.as_path()],
export_directory.as_path(),
&copy_opts,
)?;
tracing::info!("finished copying node state");

let dst_rocksdb_dir = export_directory.join("rocksdb");
// If prune=true, then export-directory is required, because we must munge state prior
// to compressing. So we'll just mandate the presence of the --export-directory arg
// always.
if prune {
tracing::info!("pruning JMT tree");
let export = Storage::load(dst_rocksdb_dir, SUBSTORE_PREFIXES.to_vec()).await?;
let _ = StateDelta::new(export.latest_snapshot());
// TODO:
// - add utilities in `cnidarium` to prune a tree
// - apply the delta to the exported storage
// - apply checks: root hash, size, etc.
todo!()
}

tracing::info!("pruning JMT tree");
export_path.push("rocksdb");
let export = Storage::load(export_path, SUBSTORE_PREFIXES.to_vec()).await?;
let _ = StateDelta::new(export.latest_snapshot());
// TODO:
// - add utilities in `cnidarium` to prune a tree
// - apply the delta to the exported storage
// - apply checks: root hash, size, etc.
todo!()
// Compress to tarball if requested.
if let Some(archive_filepath) = export_archive {
pd::migrate::archive_directory(
dst_rocksdb_dir,
archive_filepath.clone(),
Some("rocksdb".to_owned()),
)?;
tracing::info!("export complete: {}", archive_filepath.display());
} else {
// Provide friendly "OK" message that's still accurate without archiving.
tracing::info!("export complete: {}", export_directory.display());
}
}
RootCommand::Migrate {
target_dir,
target_directory,
genesis_start,
migrate_archive,
} => {
tracing::info!("migrating state from {}", target_dir.display());
tracing::info!("migrating state in {}", target_directory.display());
SimpleMigration
.migrate(target_dir.clone(), genesis_start)
.migrate(target_directory.clone(), genesis_start)
.await
.context("failed to upgrade state")?;
// Compress to tarball if requested.
if let Some(archive_filepath) = migrate_archive {
let rocksdb_dir = target_directory.join("rocksdb");
pd::migrate::archive_directory(
rocksdb_dir,
archive_filepath.clone(),
Some("rocksdb".to_owned()),
)?;
tracing::info!("migration complete: {}", archive_filepath.display());
} else {
// Provide friendly "OK" message that's still accurate without archiving.
tracing::info!("migration complete: {}", target_directory.display());
}
}
}
Ok(())
Expand Down
Loading

0 comments on commit 83ae671

Please sign in to comment.