Skip to content

Commit

Permalink
feat: wiring for bandwidth scheduler (#12234)
Browse files Browse the repository at this point in the history
Add the wiring needed for the bandwidth scheduler algorithm.

Changes:
* Add a new `ProtocolFeature` - `BandwidthScheduler`, its protocol
version is set to nightly
* Add a struct that will keep the bandwidth requests generated by the
shards
* Propagate the bandwidth requests through the blockchain - put the
generated bandwidth requests in the shard headers, pass the previous
bandwidth requests to the runtime
* Add a struct that represents the bandwidth scheduler state, it's
stored in the trie and modified on every scheduler invocation.
* Mock implementation of bandwidth scheduler - it takes the previous
bandwidth requests and the state and mocks the scheduler algorithm. It
activates the requests propagation logic and breaks some tests.

### Propagation of bandwidth requests

The flow of bandwidth requests looks as follows:
* A chunk is applied and generates bandwidth requests. They are put in
`ApplyResult` and `ApplyChunkResult`
* The requests are taken from the apply result and put in `ChunkExtra`.
`ChunkExtra` is persisted in the database
* During chunk production, Client fetches `ChunkExtra` of the previous
chunk and puts the bandwidth requests in chunk header
* The produced chunks are included in the block
* The new chunks are applied, their `ApplyState` contains bandwidth
requests taken from all the chunk headers in the block that contains the
applied chunks.
* During the application, bandwidth scheduler looks at the requests
created at the previous height and grants banwidth
* Receipts are sent out
* Then the chunk generates new bandwidth requests
* etc

The flow is very similar to the one for congestion info.

### Scheduler state

Bandwidth scheduler needs to keep some persistent state. In the future
it'll be something like "how much every shard was granted lately", it'll
be used to maintain fairness. For now it's just mock data.
Scheduler state should always be the same on all shards. All shards
start with the same scheduler state, apply the scheduler at the same
heights with the same inputs and always end up with the same scheduler
state.
This means that the bandwidth scheduler also needs to be run for missing
chunks. Luckily that can be easily achieved thanks to existing
`apply_old_chunk` infrastructure (all missing chunk are applied, it
counts as "implicit state transitions").
The `state_root` will now change at every height, even when there are no
receipts to be processed. It breaks some tests which assumed that the
state root wouldn't change.

The pull request is meant to be reviewed commit-by-commit, I tried to
make the commit history nice.
  • Loading branch information
jancionear authored Oct 24, 2024
1 parent b27295b commit 55f21d5
Show file tree
Hide file tree
Showing 69 changed files with 1,108 additions and 156 deletions.
5 changes: 5 additions & 0 deletions chain/chain-primitives/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ pub enum Error {
/// Invalid Congestion Info
#[error("Invalid Congestion Info: {0}")]
InvalidCongestionInfo(String),
/// Invalid bandwidth requests
#[error("Invalid bandwidth requests - chunk extra doesn't match chunk header: {0}")]
InvalidBandwidthRequests(String),
/// Invalid shard id
#[error("Shard id {0} does not exist")]
InvalidShardId(ShardId),
Expand Down Expand Up @@ -315,6 +318,7 @@ impl Error {
| Error::InvalidGasUsed
| Error::InvalidBalanceBurnt
| Error::InvalidCongestionInfo(_)
| Error::InvalidBandwidthRequests(_)
| Error::InvalidShardId(_)
| Error::InvalidStateRequest(_)
| Error::InvalidRandomnessBeaconOutput
Expand Down Expand Up @@ -393,6 +397,7 @@ impl Error {
Error::InvalidGasUsed => "invalid_gas_used",
Error::InvalidBalanceBurnt => "invalid_balance_burnt",
Error::InvalidCongestionInfo(_) => "invalid_congestion_info",
Error::InvalidBandwidthRequests(_) => "invalid_bandwidth_requests",
Error::InvalidShardId(_) => "invalid_shard_id",
Error::InvalidStateRequest(_) => "invalid_state_request",
Error::InvalidRandomnessBeaconOutput => "invalid_randomness_beacon_output",
Expand Down
9 changes: 8 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use near_chain_configs::{MutableConfigValue, MutableValidatorSigner};
use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError};
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::bandwidth_scheduler::BandwidthRequests;
use near_primitives::block::{genesis_chunks, Block, BlockValidityError, Tip};
use near_primitives::block_header::BlockHeader;
use near_primitives::challenge::{
Expand Down Expand Up @@ -621,6 +622,7 @@ impl Chain {
gas_limit,
0,
congestion_info,
BandwidthRequests::default_for_protocol_version(genesis_protocol_version),
)
}

Expand Down Expand Up @@ -3237,7 +3239,12 @@ impl Chain {
};
let congestion_info = block.block_congestion_info();

Ok(ApplyChunkBlockContext::from_header(block_header, gas_price, congestion_info))
Ok(ApplyChunkBlockContext::from_header(
block_header,
gas_price,
congestion_info,
block.block_bandwidth_requests(),
))
}

fn block_catch_up_postprocess(
Expand Down
23 changes: 23 additions & 0 deletions chain/chain/src/chain_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl<'a> ChainUpdate<'a> {
should_save_state_transition_data: bool,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "chain", "apply_chunk_postprocessing", height=block.header().height()).entered();
Self::bandwidth_scheduler_state_sanity_check(&apply_results);
for result in apply_results {
self.process_apply_chunk_result(block, result, should_save_state_transition_data)?;
}
Expand Down Expand Up @@ -123,6 +124,7 @@ impl<'a> ChainUpdate<'a> {
gas_limit,
apply_result.total_balance_burnt,
apply_result.congestion_info,
apply_result.bandwidth_requests,
),
);

Expand Down Expand Up @@ -195,6 +197,24 @@ impl<'a> ChainUpdate<'a> {
Ok(())
}

/// Extra sanity check for bandwdith scheduler - the scheduler state should be the same on all shards.
fn bandwidth_scheduler_state_sanity_check(apply_results: &[ShardUpdateResult]) {
let state_hashes: Vec<CryptoHash> = apply_results
.iter()
.map(|r| match r {
ShardUpdateResult::NewChunk(new_res) => {
new_res.apply_result.bandwidth_scheduler_state_hash
}
ShardUpdateResult::OldChunk(old_res) => {
old_res.apply_result.bandwidth_scheduler_state_hash
}
})
.collect();
for hash in &state_hashes {
assert_eq!(*hash, state_hashes[0]);
}
}

/// This is the last step of process_block_single, where we take the preprocess block info
/// apply chunk results and store the results on chain.
#[tracing::instrument(
Expand Down Expand Up @@ -529,6 +549,7 @@ impl<'a> ChainUpdate<'a> {
challenges_result: block_header.challenges_result().clone(),
random_seed: *block_header.random_value(),
congestion_info: block.block_congestion_info(),
bandwidth_requests: block.block_bandwidth_requests(),
},
&receipts,
chunk.transactions(),
Expand Down Expand Up @@ -564,6 +585,7 @@ impl<'a> ChainUpdate<'a> {
gas_limit,
apply_result.total_balance_burnt,
apply_result.congestion_info,
apply_result.bandwidth_requests,
);
self.chain_store_update.save_chunk_extra(block_header.hash(), &shard_uid, chunk_extra);

Expand Down Expand Up @@ -635,6 +657,7 @@ impl<'a> ChainUpdate<'a> {
&block_header,
prev_block_header.next_gas_price(),
block.block_congestion_info(),
block.block_bandwidth_requests(),
),
&[],
&[],
Expand Down
4 changes: 4 additions & 0 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ impl NightshadeRuntime {
challenges_result,
random_seed,
congestion_info,
bandwidth_requests,
} = block;
let ApplyChunkShardContext {
shard_id,
Expand Down Expand Up @@ -390,6 +391,7 @@ impl NightshadeRuntime {
is_first_block_with_chunk_of_version,
},
congestion_info,
bandwidth_requests,
};

let instant = Instant::now();
Expand Down Expand Up @@ -467,6 +469,8 @@ impl NightshadeRuntime {
applied_receipts_hash: hash(&borsh::to_vec(receipts).unwrap()),
congestion_info: apply_result.congestion_info,
contract_accesses: apply_result.contract_accesses,
bandwidth_requests: apply_result.bandwidth_requests,
bandwidth_scheduler_state_hash: apply_result.bandwidth_scheduler_state_hash,
contract_deploys: apply_result.contract_deploys,
};

Expand Down
3 changes: 3 additions & 0 deletions chain/chain/src/runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use near_pool::{
};
use near_primitives::action::FunctionCallAction;
use near_primitives::apply::ApplyChunkReason;
use near_primitives::bandwidth_scheduler::BlockBandwidthRequests;
use near_primitives::congestion_info::{BlockCongestionInfo, ExtendedCongestionInfo};
use near_primitives::epoch_block_info::BlockInfo;
use near_primitives::receipt::{ActionReceipt, ReceiptV1};
Expand Down Expand Up @@ -214,6 +215,7 @@ impl TestEnv {
) -> ApplyChunkResult {
// TODO(congestion_control): pass down prev block info and read congestion info from there
// For now, just use default.
// TODO(bandwidth_scheduler) - pass bandwidth requests from prev_block
let prev_block_hash = self.head.last_block_hash;
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_block_hash).unwrap();
let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id).unwrap();
Expand Down Expand Up @@ -256,6 +258,7 @@ impl TestEnv {
challenges_result,
random_seed: CryptoHash::default(),
congestion_info,
bandwidth_requests: BlockBandwidthRequests::empty(),
},
receipts,
transactions,
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/stateless_validation/chunk_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ pub fn apply_result_to_chunk_extra(
chunk.gas_limit(),
apply_result.total_balance_burnt,
apply_result.congestion_info,
apply_result.bandwidth_requests,
)
}

Expand Down
4 changes: 3 additions & 1 deletion chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use near_parameters::RuntimeConfig;
use near_pool::types::TransactionGroupIterator;
use near_primitives::account::{AccessKey, Account};
use near_primitives::apply::ApplyChunkReason;
use near_primitives::bandwidth_scheduler::BandwidthRequests;
use near_primitives::block::Tip;
use near_primitives::block_header::{Approval, ApprovalInner};
use near_primitives::congestion_info::{CongestionInfo, ExtendedCongestionInfo};
Expand Down Expand Up @@ -1383,7 +1384,8 @@ impl RuntimeAdapter for KeyValueRuntime {
processed_yield_timeouts: vec![],
applied_receipts_hash: hash(&borsh::to_vec(receipts).unwrap()),
congestion_info: Self::get_congestion_info(PROTOCOL_VERSION),
// Since all actions are transfer actions, there is no contracts accessed.
bandwidth_requests: BandwidthRequests::default_for_protocol_version(PROTOCOL_VERSION),
bandwidth_scheduler_state_hash: CryptoHash::default(),
contract_accesses: Default::default(),
contract_deploys: Default::default(),
})
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/tests/simple_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn build_chain() {
// cargo insta test --accept -p near-chain --features nightly -- tests::simple_chain::build_chain
let hash = chain.head().unwrap().last_block_hash;
if cfg!(feature = "nightly") {
insta::assert_snapshot!(hash, @"Hc3bWEd7ikHf9BAe2SknvH2jAAakEtBRU1FBu6Udocm3");
insta::assert_snapshot!(hash, @"2V6auEJDpFWUadSMYwAUg68tn9KmoMmHw2JnHizYicwc");
} else {
insta::assert_snapshot!(hash, @"GHZFAFiMdGzAfnWTcS9u9wqFvxMrgFpyEr6Use7jk2Lo");
}
Expand All @@ -51,7 +51,7 @@ fn build_chain() {

let hash = chain.head().unwrap().last_block_hash;
if cfg!(feature = "nightly") {
insta::assert_snapshot!(hash, @"39R6bDFXkPfwdYs4crV3RyCde85ecycqP5DBwdtwyjcJ");
insta::assert_snapshot!(hash, @"8yW4usbwYcRDKmKmDkHTFpzggDZBGu6avKLu1iTf4Lr6");
} else {
insta::assert_snapshot!(hash, @"3Pdm44L71Bk8EokPHF1pxakHojsriNadBdZZSpcoDv9q");
}
Expand Down
10 changes: 10 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub use near_epoch_manager::EpochManagerAdapter;
use near_parameters::RuntimeConfig;
use near_pool::types::TransactionGroupIterator;
use near_primitives::apply::ApplyChunkReason;
use near_primitives::bandwidth_scheduler::BandwidthRequests;
use near_primitives::bandwidth_scheduler::BlockBandwidthRequests;
pub use near_primitives::block::{Block, BlockHeader, Tip};
use near_primitives::challenge::{ChallengesResult, PartialState};
use near_primitives::checked_feature;
Expand Down Expand Up @@ -105,6 +107,11 @@ pub struct ApplyChunkResult {
/// should be set to None for chunks before the CongestionControl protocol
/// version and Some otherwise.
pub congestion_info: Option<CongestionInfo>,
/// Requests for bandwidth to send receipts to other shards.
/// Will be None for protocol versions that don't have the BandwidthScheduler feature enabled.
pub bandwidth_requests: Option<BandwidthRequests>,
/// Used only for a sanity check.
pub bandwidth_scheduler_state_hash: CryptoHash,
/// Code-hashes of the contracts accessed (called) while applying the chunk.
pub contract_accesses: BTreeSet<CodeHash>,
/// Code-hashes of the contracts deployed while applying the chunk.
Expand Down Expand Up @@ -303,13 +310,15 @@ pub struct ApplyChunkBlockContext {
pub challenges_result: ChallengesResult,
pub random_seed: CryptoHash,
pub congestion_info: BlockCongestionInfo,
pub bandwidth_requests: BlockBandwidthRequests,
}

impl ApplyChunkBlockContext {
pub fn from_header(
header: &BlockHeader,
gas_price: Balance,
congestion_info: BlockCongestionInfo,
bandwidth_requests: BlockBandwidthRequests,
) -> Self {
Self {
height: header.height(),
Expand All @@ -320,6 +329,7 @@ impl ApplyChunkBlockContext {
challenges_result: header.challenges_result().clone(),
random_seed: *header.random_value(),
congestion_info,
bandwidth_requests,
}
}
}
Expand Down
40 changes: 40 additions & 0 deletions chain/chain/src/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use borsh::BorshDeserialize;

use near_crypto::PublicKey;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::bandwidth_scheduler::BandwidthRequests;
use near_primitives::block::{Block, BlockHeader};
use near_primitives::challenge::{
BlockDoubleSign, Challenge, ChallengeBody, ChunkProofs, ChunkState, MaybeEncodedShardChunk,
Expand Down Expand Up @@ -178,6 +179,10 @@ pub fn validate_chunk_with_chunk_extra_and_receipts_root(
}

validate_congestion_info(&prev_chunk_extra.congestion_info(), &chunk_header.congestion_info())?;
validate_bandwidth_requests(
prev_chunk_extra.bandwidth_requests(),
chunk_header.bandwidth_requests(),
)?;

Ok(())
}
Expand Down Expand Up @@ -211,6 +216,41 @@ fn validate_congestion_info(
}
}

fn validate_bandwidth_requests(
extra_bandwidth_requests: Option<&BandwidthRequests>,
header_bandwidth_requests: Option<&BandwidthRequests>,
) -> Result<(), Error> {
if extra_bandwidth_requests.is_none()
&& header_bandwidth_requests == Some(&BandwidthRequests::empty())
{
// This corner case happens for the first chunk that has the BandwidthScheduler feature enabled.
// The previous chunk was applied with a protocol version which doesn't have bandwidth scheduler
// enabled and because of that the bandwidth requests in ChunkExtra are None.
// The header was produced in the new protocol version, and the newer version of header always
// has some bandwidth requests, it's not an `Option`. Because of that the header requests are `Some(BandwidthRequests::empty())`.
return Ok(());
}

if extra_bandwidth_requests != header_bandwidth_requests {
fn requests_len(requests_opt: Option<&BandwidthRequests>) -> usize {
match requests_opt {
Some(BandwidthRequests::V1(requests_v1)) => requests_v1.requests.len(),
None => 0,
}
}
let error_info_str = format!(
"chunk extra: (is_some: {}, len: {}) chunk header: (is_some: {}, len: {})",
extra_bandwidth_requests.is_some(),
requests_len(extra_bandwidth_requests),
header_bandwidth_requests.is_some(),
requests_len(header_bandwidth_requests)
);
return Err(Error::InvalidBandwidthRequests(error_info_str));
}

Ok(())
}

/// Validates a double sign challenge.
/// Only valid if ancestors of both blocks are present in the chain.
fn validate_double_sign(
Expand Down
3 changes: 3 additions & 0 deletions chain/chunks/src/shards_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ use near_network::types::{
};
use near_network::types::{NetworkRequests, PeerManagerMessageRequest};
use near_performance_metrics_macros::perf;
use near_primitives::bandwidth_scheduler::BandwidthRequests;
use near_primitives::block::Tip;
use near_primitives::congestion_info::CongestionInfo;
use near_primitives::errors::EpochError;
Expand Down Expand Up @@ -1994,6 +1995,7 @@ impl ShardsManagerActor {
prev_outgoing_receipts_root: CryptoHash,
tx_root: CryptoHash,
congestion_info: Option<CongestionInfo>,
bandwidth_requests: Option<BandwidthRequests>,
signer: &ValidatorSigner,
rs: &ReedSolomon,
protocol_version: ProtocolVersion,
Expand All @@ -2014,6 +2016,7 @@ impl ShardsManagerActor {
prev_outgoing_receipts,
prev_outgoing_receipts_root,
congestion_info,
bandwidth_requests,
signer,
protocol_version,
)
Expand Down
2 changes: 2 additions & 0 deletions chain/chunks/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use near_epoch_manager::test_utils::setup_epoch_manager_with_block_and_chunk_pro
use near_epoch_manager::EpochManagerHandle;
use near_network::shards_manager::ShardsManagerRequestFromNetwork;
use near_network::test_utils::MockPeerManagerAdapter;
use near_primitives::bandwidth_scheduler::BandwidthRequests;
use near_primitives::congestion_info::CongestionInfo;
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::{self, MerklePath};
Expand Down Expand Up @@ -153,6 +154,7 @@ impl ChunkTestFixture {
receipts_root,
MerkleHash::default(),
congestion_info,
BandwidthRequests::default_for_protocol_version(PROTOCOL_VERSION),
&signer,
&rs,
PROTOCOL_VERSION,
Expand Down
8 changes: 5 additions & 3 deletions chain/client/src/chunk_distribution_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,12 @@ mod tests {
time::Clock,
};
use near_primitives::{
bandwidth_scheduler::BandwidthRequests,
congestion_info::CongestionInfo,
hash::hash,
sharding::{
PartialEncodedChunkV2, ShardChunkHeaderInner, ShardChunkHeaderInnerV3,
ShardChunkHeaderV3,
shard_chunk_header_inner::ShardChunkHeaderInnerV4, PartialEncodedChunkV2,
ShardChunkHeaderInner, ShardChunkHeaderV3,
},
validator_signer::EmptyValidatorSigner,
};
Expand Down Expand Up @@ -398,7 +399,7 @@ mod tests {
let mut mock_hashes = MockHashes::new(prev_block_hash);

let signer = EmptyValidatorSigner::default().into();
let header_inner = ShardChunkHeaderInner::V3(ShardChunkHeaderInnerV3 {
let header_inner = ShardChunkHeaderInner::V4(ShardChunkHeaderInnerV4 {
prev_block_hash,
prev_state_root: mock_hashes.next().unwrap(),
prev_outcome_root: mock_hashes.next().unwrap(),
Expand All @@ -413,6 +414,7 @@ mod tests {
tx_root: mock_hashes.next().unwrap(),
prev_validator_proposals: Vec::new(),
congestion_info: CongestionInfo::default(),
bandwidth_requests: BandwidthRequests::empty(),
});
let header = ShardChunkHeaderV3::from_inner(header_inner, &signer);
PartialEncodedChunk::V2(PartialEncodedChunkV2 {
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ impl Client {
outgoing_receipts_root,
tx_root,
congestion_info,
chunk_extra.bandwidth_requests().cloned(),
&*validator_signer,
&mut self.rs_for_chunk_production,
protocol_version,
Expand Down
Loading

0 comments on commit 55f21d5

Please sign in to comment.