Skip to content

Commit

Permalink
Cherrypick state_sync.dump.credentials_file
Browse files Browse the repository at this point in the history
  • Loading branch information
nikurt committed Dec 11, 2023
1 parent 78f0c08 commit b0cfb84
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* The default config now enables TIER1 outbound connections by default. [#9349](https://github.com/near/nearcore/pull/9349)
* State Sync from GCS is available for experimental use. [#9398](https://github.com/near/nearcore/pull/9398)
* Add config option `tx_routing_height_horizon` to configure how many chunk producers are notified about the tx. [#10251](https://github.com/near/nearcore/pull/10251)
* Add config options `state_sync.dump.credentials_file` to configure pro-active state part production for State Sync. [#9487](https://github.com/near/nearcore/pull/9487)

## 1.35.0

Expand Down
4 changes: 2 additions & 2 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl StateSync {
num_concurrent_requests_during_catchup,
}) => {
let external = match location {
ExternalStorageLocation::S3 { bucket, region } => {
ExternalStorageLocation::S3 { bucket, region, .. } => {
let bucket = create_bucket_readonly(&bucket, &region, timeout);
if let Err(err) = bucket {
panic!("Failed to create an S3 bucket: {}", err);
Expand All @@ -190,7 +190,7 @@ impl StateSync {
ExternalStorageLocation::Filesystem { root_dir } => {
ExternalConnection::Filesystem { root_dir: root_dir.clone() }
}
ExternalStorageLocation::GCS { bucket } => ExternalConnection::GCS {
ExternalStorageLocation::GCS { bucket, .. } => ExternalConnection::GCS {
gcs_client: Arc::new(cloud_storage::Client::default()),
reqwest_client: Arc::new(reqwest::Client::default()),
bucket: bucket.clone(),
Expand Down
3 changes: 3 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ pub struct DumpConfig {
/// Feel free to set to `None`, defaults are sensible.
#[serde(skip_serializing_if = "Option::is_none")]
pub iteration_delay: Option<Duration>,
/// Location of a json file with credentials allowing write access to the bucket.
#[serde(skip_serializing_if = "Option::is_none")]
pub credentials_file: Option<PathBuf>,
}

/// Configures how to fetch state parts during state sync.
Expand Down
4 changes: 2 additions & 2 deletions core/store/src/trie/state_parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Trie {
let boundaries_read_duration = boundaries_read_timer.stop_and_record();
let recorded_trie = recording_trie.recorded_storage().unwrap();

tracing::info!(
tracing::debug!(
target: "state-parts",
idx,
total,
Expand Down Expand Up @@ -274,7 +274,7 @@ impl Trie {
let state_part_num_nodes = trie_values.len();
let in_memory_created_nodes =
trie_values.iter().filter(|entry| !disk_read_hashes.contains(&hash(*entry))).count();
tracing::info!(
tracing::debug!(
target: "state-parts",
?part_id,
values_ref = value_refs.len(),
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/src/tests/client/state_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fn test_state_dump() {
location: Filesystem { root_dir: root_dir.path().to_path_buf() },
restart_dump_for_shards: None,
iteration_delay: Some(Duration::ZERO),
credentials_file: None,
});

let _state_sync_dump_handle = spawn_state_sync_dump(
Expand All @@ -90,7 +91,6 @@ fn test_state_dump() {
shard_tracker.clone(),
runtimes[0].clone(),
Some("test0".parse().unwrap()),
None,
)
.unwrap();

Expand Down Expand Up @@ -204,6 +204,7 @@ fn run_state_sync_with_dumped_parts(
location: Filesystem { root_dir: root_dir.path().to_path_buf() },
restart_dump_for_shards: None,
iteration_delay: Some(Duration::ZERO),
credentials_file: None,
});
let _state_sync_dump_handle = spawn_state_sync_dump(
&config,
Expand All @@ -212,7 +213,6 @@ fn run_state_sync_with_dumped_parts(
shard_tracker.clone(),
runtimes[0].clone(),
Some("test0".parse().unwrap()),
None,
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions integration-tests/src/tests/nearcore/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ fn sync_state_dump() {
location: Filesystem { root_dir: dump_dir.path().to_path_buf() },
restart_dump_for_shards: None,
iteration_delay: Some(Duration::from_millis(100)),
credentials_file: None,
});

let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap();
Expand Down
7 changes: 7 additions & 0 deletions nearcore/src/config_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ impl<'a> ConfigValidator<'a> {
}
}
}

if let Some(credentials_file) = &dump_config.credentials_file {
if !credentials_file.exists() || !credentials_file.is_file() {
let error_message = format!("'config.state_sync.dump.credentials_file' is provided but the specified file does not exist or is not a file.");
self.validation_errors.push_config_semantics_error(error_message);
}
}
}
match &state_sync.sync {
SyncConfig::Peers => {}
Expand Down
5 changes: 3 additions & 2 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,15 +362,16 @@ pub fn start_with_config_and_synchronization(
None
};

let credentials_file = config.config.s3_credentials_file;
if config.config.s3_credentials_file.is_none() {
tracing::warn!("'s3_credentials_file' option is present but it will be ignored. Use 'state_sync.dump.credentials_file' instead.")
}
let state_sync_dump_handle = spawn_state_sync_dump(
&config.client_config,
chain_genesis,
epoch_manager,
shard_tracker,
runtime,
config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()),
credentials_file.map(|filename| home_dir.join(filename)),
)?;

let hot_store = storage.get_hot_store();
Expand Down
23 changes: 16 additions & 7 deletions nearcore/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot
use near_store::DBCol;
use rand::{thread_rng, Rng};
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -32,7 +31,6 @@ pub fn spawn_state_sync_dump(
shard_tracker: ShardTracker,
runtime: Arc<dyn RuntimeAdapter>,
account_id: Option<AccountId>,
credentials_file: Option<PathBuf>,
) -> anyhow::Result<Option<StateSyncDumpHandle>> {
let dump_config = if let Some(dump_config) = client_config.state_sync.dump.clone() {
dump_config
Expand All @@ -45,14 +43,25 @@ pub fn spawn_state_sync_dump(

let external = match dump_config.location {
ExternalStorageLocation::S3 { bucket, region } => ExternalConnection::S3{
bucket: Arc::new(create_bucket_readwrite(&bucket, &region, Duration::from_secs(30), credentials_file).expect(
bucket: Arc::new(create_bucket_readwrite(&bucket, &region, Duration::from_secs(30), dump_config.credentials_file).expect(
"Failed to authenticate connection to S3. Please either provide AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in the environment, or create a credentials file and link it in config.json as 's3_credentials_file'."))
},
ExternalStorageLocation::Filesystem { root_dir } => ExternalConnection::Filesystem { root_dir },
ExternalStorageLocation::GCS { bucket } => ExternalConnection::GCS {
gcs_client: Arc::new(cloud_storage::Client::default()),
reqwest_client: Arc::new(reqwest::Client::default()),
bucket },
ExternalStorageLocation::GCS { bucket } => {
if let Some(credentials_file) = dump_config.credentials_file {
if let Ok(var) = std::env::var("SERVICE_ACCOUNT") {
tracing::warn!(target: "state_sync_dump", "Environment variable 'SERVICE_ACCOUNT' is set to {var}, but 'credentials_file' in config.json overrides it to '{credentials_file:?}'");
println!("Environment variable 'SERVICE_ACCOUNT' is set to {var}, but 'credentials_file' in config.json overrides it to '{credentials_file:?}'");
}
std::env::set_var("SERVICE_ACCOUNT", &credentials_file);
tracing::info!(target: "state_sync_dump", "Set the environment variable 'SERVICE_ACCOUNT' to '{credentials_file:?}'");
}
ExternalConnection::GCS {
gcs_client: Arc::new(cloud_storage::Client::default()),
reqwest_client: Arc::new(reqwest::Client::default()),
bucket
}
},
};

// Determine how many threads to start.
Expand Down
15 changes: 12 additions & 3 deletions tools/state-viewer/src/state_parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub(crate) enum StatePartsSubCommand {
/// Dump part ids up to this part (exclusive).
#[clap(long)]
part_to: Option<u64>,
/// Location of a file with write permissions to the bucket.
#[clap(long)]
credentials_file: Option<PathBuf>,
/// Select an epoch to work on.
#[clap(subcommand)]
epoch_selection: EpochSelection,
Expand Down Expand Up @@ -119,8 +122,6 @@ impl StatePartsSubCommand {
let chain_id = &near_config.genesis.config.chain_id;
let sys = actix::System::new();
sys.block_on(async move {
let credentials_file =
near_config.config.s3_credentials_file.clone().map(|file| home_dir.join(file));
match self {
StatePartsSubCommand::Load {
action,
Expand Down Expand Up @@ -151,7 +152,12 @@ impl StatePartsSubCommand {
)
.await
}
StatePartsSubCommand::Dump { part_from, part_to, epoch_selection } => {
StatePartsSubCommand::Dump {
part_from,
part_to,
epoch_selection,
credentials_file,
} => {
let external = create_external_connection(
root_dir,
s3_bucket,
Expand Down Expand Up @@ -209,6 +215,9 @@ fn create_external_connection(
.expect("Failed to create an S3 bucket");
ExternalConnection::S3 { bucket: Arc::new(bucket) }
} else if let Some(bucket) = gcs_bucket {
if let Some(credentials_file) = credentials_file {
std::env::set_var("SERVICE_ACCOUNT", &credentials_file);
}
ExternalConnection::GCS {
gcs_client: Arc::new(cloud_storage::Client::default()),
reqwest_client: Arc::new(reqwest::Client::default()),
Expand Down

0 comments on commit b0cfb84

Please sign in to comment.