Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hack: experimental chunk execution #12013

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3542,7 +3542,8 @@ impl Chain {
}
let Some(account_id) = me.as_ref() else { return Ok(false) };
Ok(self.epoch_manager.is_chunk_producer_for_epoch(epoch_id, account_id)?
|| self.epoch_manager.is_chunk_producer_for_epoch(&next_epoch_id, account_id)?)
|| self.epoch_manager.is_chunk_producer_for_epoch(&next_epoch_id, account_id)?
|| true)
}

/// Creates jobs which will update shards for the given block and incoming
Expand Down Expand Up @@ -3789,15 +3790,25 @@ impl Chain {

let runtime = self.runtime_adapter.clone();
let epoch_manager = self.epoch_manager.clone();

Ok(Some((
shard_id,
Box::new(move |parent_span| -> Result<ShardUpdateResult, Error> {
let _ = process_shard_update(
parent_span,
runtime.as_ref(),
epoch_manager.as_ref(),
shard_update_reason.clone(),
shard_context.clone(),
near_primitives::apply::ApplyChunkReason::Experiment,
);
Ok(process_shard_update(
parent_span,
runtime.as_ref(),
epoch_manager.as_ref(),
shard_update_reason,
shard_context,
near_primitives::apply::ApplyChunkReason::UpdateTrackedShard,
)?)
}),
)))
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl ChainGenesis {
}
}

#[derive(Clone)]
pub enum StorageDataSource {
/// Full state data is present in DB.
Db,
Expand Down
11 changes: 9 additions & 2 deletions chain/chain/src/update_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub enum ShardUpdateResult {
/// State roots of children shards which are ready.
type ReshardingStateRoots = HashMap<ShardUId, StateRoot>;

#[derive(Clone)]
pub struct NewChunkData {
pub chunk_header: ShardChunkHeader,
pub transactions: Vec<SignedTransaction>,
Expand All @@ -68,13 +69,15 @@ pub struct NewChunkData {
pub storage_context: StorageContext,
}

#[derive(Clone)]
pub struct OldChunkData {
pub prev_chunk_extra: ChunkExtra,
pub resharding_state_roots: Option<ReshardingStateRoots>,
pub block: ApplyChunkBlockContext,
pub storage_context: StorageContext,
}

#[derive(Clone)]
pub struct ReshardingData {
pub resharding_state_roots: ReshardingStateRoots,
pub state_changes: StateChangesForResharding,
Expand All @@ -85,6 +88,7 @@ pub struct ReshardingData {
/// Reason to update a shard when new block appears on chain.
/// All types include state roots for children shards in case of resharding.
#[allow(clippy::large_enum_variant)]
#[derive(Clone)]
pub enum ShardUpdateReason {
/// Block has a new chunk for the shard.
/// Contains chunk itself and all new incoming receipts to the shard.
Expand All @@ -99,6 +103,7 @@ pub enum ShardUpdateReason {
}

/// Information about shard to update.
#[derive(Clone)]
pub struct ShardContext {
pub shard_uid: ShardUId,
/// Whether node cares about shard in this epoch.
Expand All @@ -112,6 +117,7 @@ pub struct ShardContext {
}

/// Information about storage used for applying txs and receipts.
#[derive(Clone)]
pub struct StorageContext {
/// Data source used for processing shard update.
pub storage_data_source: StorageDataSource,
Expand All @@ -126,18 +132,19 @@ pub fn process_shard_update(
epoch_manager: &dyn EpochManagerAdapter,
shard_update_reason: ShardUpdateReason,
shard_context: ShardContext,
apply_chunk_reason: ApplyChunkReason,
) -> Result<ShardUpdateResult, Error> {
Ok(match shard_update_reason {
ShardUpdateReason::NewChunk(data) => ShardUpdateResult::NewChunk(apply_new_chunk(
ApplyChunkReason::UpdateTrackedShard,
apply_chunk_reason,
parent_span,
data,
shard_context,
runtime,
epoch_manager,
)?),
ShardUpdateReason::OldChunk(data) => ShardUpdateResult::OldChunk(apply_old_chunk(
ApplyChunkReason::UpdateTrackedShard,
apply_chunk_reason,
parent_span,
data,
shard_context,
Expand Down
2 changes: 2 additions & 0 deletions core/primitives-core/src/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub enum ApplyChunkReason {
UpdateTrackedShard,
/// Apply-chunk is invoked to validate the state witness for a shard in the context of stateless validation.
ValidateChunkStateWitness,
Experiment,
}

impl ApplyChunkReason {
Expand All @@ -19,6 +20,7 @@ impl ApplyChunkReason {
match self {
ApplyChunkReason::UpdateTrackedShard => "update_shard",
ApplyChunkReason::ValidateChunkStateWitness => "validate_chunk",
ApplyChunkReason::Experiment => "experiment",
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/primitives/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod state_patch {
/// object can be non-empty only if `sandbox` feature is enabled. On
/// non-sandbox build, this struct is ZST and its methods are essentially
/// short-circuited by treating the type as always empty.
#[derive(Default)]
#[derive(Default, Clone)]
pub struct SandboxStatePatch {
records: Vec<StateRecord>,
}
Expand Down Expand Up @@ -50,7 +50,7 @@ pub mod state_patch {
pub mod state_patch {
use crate::state_record::StateRecord;

#[derive(Default)]
#[derive(Default, Clone)]
pub struct SandboxStatePatch;

impl SandboxStatePatch {
Expand Down
1 change: 1 addition & 0 deletions runtime/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json.workspace = true
sha2.workspace = true
thiserror.workspace = true
tracing.workspace = true
zstd.workspace = true

near-crypto.workspace = true
near-o11y.workspace = true
Expand Down
44 changes: 37 additions & 7 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1808,12 +1808,17 @@ impl Runtime {
let mut validator_proposals = vec![];
let protocol_version = processing_state.protocol_version;
let apply_state = &processing_state.apply_state;
let apply_reason = apply_state.apply_reason.clone();

// TODO(#8859): Introduce a dedicated `compute_limit` for the chunk.
// For now compute limit always matches the gas limit.
let compute_limit = apply_state.gas_limit.unwrap_or(Gas::max_value());
let proof_size_limit = if ProtocolFeature::StatelessValidation.enabled(protocol_version) {
Some(apply_state.config.witness_config.main_storage_proof_size_soft_limit)
if matches!(apply_reason, Some(ApplyChunkReason::Experiment)) {
Some(4_000_000)
} else {
Some(apply_state.config.witness_config.main_storage_proof_size_soft_limit)
}
} else {
None
};
Expand Down Expand Up @@ -1854,19 +1859,24 @@ impl Runtime {
)?;

let shard_id_str = processing_state.apply_state.shard_id.to_string();
let apply_reason = apply_reason.map_or("unknown".to_owned(), |reason| reason.to_string());
if processing_state.total.compute >= compute_limit {
metrics::CHUNK_RECEIPTS_LIMITED_BY
.with_label_values(&[shard_id_str.as_str(), "compute_limit"])
.with_label_values(&[shard_id_str.as_str(), &apply_reason, "compute_limit"])
.inc();
} else if proof_size_limit.is_some_and(|limit| {
processing_state.state_update.trie.recorded_storage_size_upper_bound() > limit
}) {
metrics::CHUNK_RECEIPTS_LIMITED_BY
.with_label_values(&[shard_id_str.as_str(), "storage_proof_size_limit"])
.with_label_values(&[
shard_id_str.as_str(),
&apply_reason,
"storage_proof_size_limit",
])
.inc();
} else {
metrics::CHUNK_RECEIPTS_LIMITED_BY
.with_label_values(&[shard_id_str.as_str(), "unlimited"])
.with_label_values(&[shard_id_str.as_str(), &apply_reason, "unlimited"])
.inc();
}

Expand Down Expand Up @@ -1931,14 +1941,34 @@ impl Runtime {

state_update.commit(StateChangeCause::UpdatedDelayedReceipts);
self.apply_state_patch(&mut state_update, state_patch);

let apply_reason_label = apply_state.apply_reason.as_ref().unwrap().to_string();
let shard_id_str = apply_state.shard_id.to_string();

let chunk_recorded_size_upper_bound =
state_update.trie.recorded_storage_size_upper_bound() as f64;
let shard_id_str = apply_state.shard_id.to_string();
metrics::CHUNK_RECORDED_SIZE_UPPER_BOUND
.with_label_values(&[shard_id_str.as_str()])
.with_label_values(&[shard_id_str.as_str(), &apply_reason_label])
.observe(chunk_recorded_size_upper_bound);
let (trie, trie_changes, state_changes) = state_update.finalize()?;

if let Some(partial_storage) = trie.recorded_storage() {
let bytes = borsh::to_vec(&partial_storage.nodes).unwrap();
metrics::CHUNK_STATE_WITNESS_STORAGE_PROOF_SIZE
.with_label_values(&[
&shard_id_str,
&apply_reason_label,
])
.observe(bytes.len() as f64);
let compressed = zstd::encode_all(bytes.as_slice(), 3).unwrap();
metrics::CHUNK_STATE_WITNESS_COMPRESSED_STORAGE_PROOF_SIZE
.with_label_values(&[
&shard_id_str,
&apply_reason_label,
])
.observe(compressed.len() as f64);
}

if let Some(prefetcher) = &processing_state.prefetcher {
// Only clear the prefetcher queue after finalize is done because as part of receipt
// processing we also prefetch account data and access keys that are accessed in
Expand Down Expand Up @@ -1969,7 +1999,7 @@ impl Runtime {
let state_root = trie_changes.new_root;
let chunk_recorded_size = trie.recorded_storage_size() as f64;
metrics::CHUNK_RECORDED_SIZE
.with_label_values(&[shard_id_str.as_str()])
.with_label_values(&[shard_id_str.as_str(), &apply_reason_label])
.observe(chunk_recorded_size);
metrics::CHUNK_RECORDED_SIZE_UPPER_BOUND_RATIO
.with_label_values(&[shard_id_str.as_str()])
Expand Down
36 changes: 31 additions & 5 deletions runtime/runtime/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,17 +333,17 @@ pub static CHUNK_RECORDED_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_chunk_recorded_size",
"Total size of storage proof (recorded trie nodes for state witness, post-finalization) for a single chunk",
&["shard_id"],
Some(buckets_for_chunk_storage_proof_size()),
&["shard_id", "apply_reason"],
Some(buckets_for_witness_field_size()),
)
.unwrap()
});
pub static CHUNK_RECORDED_SIZE_UPPER_BOUND: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_chunk_recorded_size_upper_bound",
"Upper bound of storage proof size (recorded trie nodes size + estimated charges, pre-finalization) for a single chunk",
&["shard_id"],
Some(buckets_for_chunk_storage_proof_size()),
&["shard_id", "apply_reason"],
Some(buckets_for_witness_field_size()),
)
.unwrap()
});
Expand Down Expand Up @@ -437,7 +437,7 @@ pub(crate) static CHUNK_RECEIPTS_LIMITED_BY: LazyLock<IntCounterVec> = LazyLock:
try_create_int_counter_vec(
"near_chunk_receipts_limited_by",
"Number of chunks where the number of processed receipts was limited by a certain factor.",
&["shard_id", "limited_by"],
&["shard_id", "apply_reason", "limited_by"],
)
.unwrap()
});
Expand Down Expand Up @@ -761,3 +761,29 @@ pub fn report_recorded_column_sizes(trie: &Trie, apply_state: &ApplyState) {
.with_label_values(&[shard_id_str.as_str(), "values"])
.observe(total_size.values_size as f64);
}

pub(crate) static CHUNK_STATE_WITNESS_COMPRESSED_STORAGE_PROOF_SIZE: LazyLock<HistogramVec> =
LazyLock::new(|| {
try_create_histogram_vec(
"near_chunk_state_witness_compressed_storage_proof_size",
"compressed storage proof size",
&["shard_id", "apply_reason"],
Some(buckets_for_witness_field_size()),
)
.unwrap()
});

pub(crate) static CHUNK_STATE_WITNESS_STORAGE_PROOF_SIZE: LazyLock<HistogramVec> =
LazyLock::new(|| {
try_create_histogram_vec(
"near_chunk_state_witness_storage_proof_size",
"compressed storage proof size",
&["shard_id", "apply_reason"],
Some(buckets_for_witness_field_size()),
)
.unwrap()
});

fn buckets_for_witness_field_size() -> Vec<f64> {
linear_buckets(100_000., 50_000., 200).unwrap()
}
1 change: 1 addition & 0 deletions tools/replay-archive/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ impl ReplayController {
self.epoch_manager.as_ref(),
update_reason,
shard_context,
near_primitives::apply::ApplyChunkReason::UpdateTrackedShard
)?;

let output = match shard_update_result {
Expand Down
Loading