Skip to content

Commit

Permalink
refactor shard management
Browse files Browse the repository at this point in the history
  • Loading branch information
posvyatokum committed Jan 31, 2024
1 parent d229b06 commit 084cf19
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 26 deletions.
6 changes: 3 additions & 3 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,11 @@ impl ViewClientActor {
let bp = epoch_info.get_validator(bp).account_id().clone();
let cps: Vec<AccountId> = (0..num_shards)
.map(|shard_id| {
let cp = epoch_info.sample_chunk_producer(block_height, shard_id);
let cp = epoch_info.sample_chunk_producer(block_height, shard_id)?;
let cp = epoch_info.get_validator(cp).account_id().clone();
cp
Ok(cp)
})
.collect();
.collect::<Result<Vec<AccountId>, near_chain::Error>>()?;
if account_id != bp && !cps.iter().any(|a| *a == account_id) {
if let Some(start) = start_block_of_window {
if block_height == last_block_of_epoch {
Expand Down
6 changes: 3 additions & 3 deletions chain/epoch-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ impl EpochManager {
shard_id: ShardId,
) -> Result<ValidatorStake, EpochError> {
let epoch_info = self.get_epoch_info(epoch_id)?;
let validator_id = Self::chunk_producer_from_info(&epoch_info, height, shard_id);
let validator_id = Self::chunk_producer_from_info(&epoch_info, height, shard_id)?;
Ok(epoch_info.get_validator(validator_id))
}

Expand Down Expand Up @@ -1465,7 +1465,7 @@ impl EpochManager {
epoch_info: &EpochInfo,
height: BlockHeight,
shard_id: ShardId,
) -> ValidatorId {
) -> Result<ValidatorId, EpochError> {
epoch_info.sample_chunk_producer(height, shard_id)
}

Expand Down Expand Up @@ -1776,7 +1776,7 @@ impl EpochManager {
let prev_epoch = prev_info.epoch_id().clone();

let block_info = self.get_block_info(&cur_hash)?;
aggregator.update_tail(&block_info, &epoch_info, prev_height);
aggregator.update_tail(&block_info, &epoch_info, prev_height)?;

if prev_hash == self.epoch_info_aggregator.last_block_hash {
// We’ve reached sync point of the old aggregator. If old
Expand Down
5 changes: 3 additions & 2 deletions chain/epoch-manager/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ fn test_expected_chunks_prev_block_not_produced() {
let prev_block_info = epoch_manager.get_block_info(&prev_block).unwrap();
let prev_height = prev_block_info.height();
let expected_chunk_producer =
EpochManager::chunk_producer_from_info(&epoch_info, prev_height + 1, 0);
EpochManager::chunk_producer_from_info(&epoch_info, prev_height + 1, 0).unwrap();
// test1 does not produce blocks during first epoch
if block_producer == 0 && epoch_id == initial_epoch_id {
expected += 1;
Expand Down Expand Up @@ -1469,7 +1469,8 @@ fn test_chunk_validator_kickout() {
&epoch_info,
height,
shard_id as u64,
);
)
.unwrap();
// test1 skips chunks
if chunk_producer == 0 {
expected += 1;
Expand Down
7 changes: 5 additions & 2 deletions chain/epoch-manager/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use near_primitives::block_header::BlockHeader;
use near_primitives::challenge::SlashedValidator;
use near_primitives::epoch_manager::block_info::BlockInfo;
use near_primitives::epoch_manager::epoch_info::EpochInfo;
use near_primitives::errors::EpochError;
use near_primitives::hash::CryptoHash;
use near_primitives::types::validator_stake::ValidatorStake;
use near_primitives::types::{
Expand Down Expand Up @@ -105,7 +106,7 @@ impl EpochInfoAggregator {
block_info: &BlockInfo,
epoch_info: &EpochInfo,
prev_block_height: BlockHeight,
) {
) -> Result<(), EpochError> {
// Step 1: update block tracer
let block_info_height = block_info.height();
for height in prev_block_height + 1..=block_info_height {
Expand Down Expand Up @@ -133,7 +134,7 @@ impl EpochInfoAggregator {
epoch_info,
prev_block_height + 1,
i as ShardId,
);
)?;
let tracker = self.shard_tracker.entry(i as ShardId).or_insert_with(HashMap::new);
tracker
.entry(chunk_validator_id)
Expand All @@ -157,6 +158,8 @@ impl EpochInfoAggregator {
for proposal in block_info.proposals_iter() {
self.all_proposals.entry(proposal.account_id().clone()).or_insert(proposal);
}

Ok(())
}

/// Merges information from `other` aggregator into `self`.
Expand Down
4 changes: 2 additions & 2 deletions chain/epoch-manager/src/validator_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod tests {

for shard_id in 0..num_shards {
for h in 0..100_000 {
let cp = epoch_info.sample_chunk_producer(h, shard_id);
let cp = epoch_info.sample_chunk_producer(h, shard_id).unwrap();
// Don't read too much into this. The reason the ValidatorId always
// equals the ShardId is because the validators are assigned to shards in order.
assert_eq!(cp, shard_id)
Expand Down Expand Up @@ -608,7 +608,7 @@ mod tests {
for shard_id in 0..num_shards {
let mut counts: [i32; 2] = [0, 0];
for h in 0..100_000 {
let cp = epoch_info.sample_chunk_producer(h, shard_id);
let cp = epoch_info.sample_chunk_producer(h, shard_id).unwrap();
// if ValidatorId is in the second half then it is the lower
// stake validator (because they are sorted by decreasing stake).
let index = if cp >= num_shards { 1 } else { 0 };
Expand Down
39 changes: 32 additions & 7 deletions core/primitives/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ pub mod epoch_info {
use smart_default::SmartDefault;
use std::collections::{BTreeMap, HashMap};

use crate::errors::EpochError;
use crate::types::validator_stake::ValidatorStakeV1;
use crate::{epoch_manager::RngSeed, rand::WeightedIndex};
use near_primitives_core::{
Expand Down Expand Up @@ -892,17 +893,33 @@ pub mod epoch_info {
}
}

pub fn sample_chunk_producer(&self, height: BlockHeight, shard_id: ShardId) -> ValidatorId {
pub fn sample_chunk_producer(
&self,
height: BlockHeight,
shard_id: ShardId,
) -> Result<ValidatorId, EpochError> {
match &self {
Self::V1(v1) => {
let cp_settlement = &v1.chunk_producers_settlement;
let shard_cps = &cp_settlement[shard_id as usize];
shard_cps[(height as u64 % (shard_cps.len() as u64)) as usize]
let shard_id = shard_id as usize;
let shard_cps = cp_settlement.get(shard_id);
let shard_cps = shard_cps
.ok_or(EpochError::new_sharding_error(shard_id, v1.epoch_height))?;
let sample = (height as u64 % (shard_cps.len() as u64)) as usize;
let cp = shard_cps.get(sample);
cp.ok_or(EpochError::new_sampling_error(sample, shard_id, v1.epoch_height))
.copied()
}
Self::V2(v2) => {
let cp_settlement = &v2.chunk_producers_settlement;
let shard_cps = &cp_settlement[shard_id as usize];
shard_cps[(height as u64 % (shard_cps.len() as u64)) as usize]
let shard_id = shard_id as usize;
let shard_cps = cp_settlement.get(shard_id);
let shard_cps = shard_cps
.ok_or(EpochError::new_sharding_error(shard_id, v2.epoch_height))?;
let sample = (height as u64 % (shard_cps.len() as u64)) as usize;
let cp = shard_cps.get(sample);
cp.ok_or(EpochError::new_sampling_error(sample, shard_id, v2.epoch_height))
.copied()
}
Self::V3(v3) => {
let protocol_version = self.protocol_version();
Expand All @@ -924,8 +941,16 @@ pub mod epoch_info {
hash(&buffer).0
};
let shard_id = shard_id as usize;
v3.chunk_producers_settlement[shard_id]
[v3.chunk_producers_sampler[shard_id].sample(seed)]
let sampler = v3.chunk_producers_sampler.get(shard_id);
let sampler =
sampler.ok_or(EpochError::new_sharding_error(shard_id, v3.epoch_height))?;
let sample = sampler.sample(seed);
let settlement = v3.chunk_producers_settlement.get(shard_id);
let settlement = settlement
.ok_or(EpochError::new_sharding_error(shard_id, v3.epoch_height))?;
let cp = settlement.get(sample);
cp.ok_or(EpochError::new_sampling_error(sample, shard_id, v3.epoch_height))
.copied()
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions core/primitives/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,18 @@ pub enum EpochError {
},
}

impl EpochError {
pub fn new_sharding_error(shard_id: usize, epoch_height: u64) -> Self {
EpochError::ShardingError(format!("{shard_id} is out of bound for epoch {epoch_height}"))
}

pub fn new_sampling_error(sample: usize, shard_id: usize, epoch_height: u64) -> Self {
EpochError::ShardingError(format!(
"Sample {sample} is out of bound for shard {shard_id} in epoch {epoch_height}"
))
}
}

impl std::error::Error for EpochError {}

impl Display for EpochError {
Expand Down
1 change: 0 additions & 1 deletion core/primitives/src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub use near_primitives_core::runtime::fees;
pub use near_primitives_core::runtime::*;

pub mod apply_state;
pub mod config;
Expand Down
1 change: 0 additions & 1 deletion core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,6 @@ where
}
}


impl ColdMigrationStore for Store {
fn iter_prefix_with_callback_for_cold(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ fn meta_tx_fn_call_access_key() {
// Check previous allowance is set as expected
let key =
node.user().get_access_key(&sender, &public_key).expect("failed looking up fn access key");
let AccessKeyPermissionView::FunctionCall { allowance, ..} = key.permission else {
let AccessKeyPermissionView::FunctionCall { allowance, .. } = key.permission else {
panic!("should be function access key")
};
assert_eq!(allowance.unwrap(), INITIAL_ALLOWANCE);
Expand All @@ -338,7 +338,7 @@ fn meta_tx_fn_call_access_key() {
.user()
.get_access_key(&sender, &signer.public_key())
.expect("failed looking up fn access key");
let AccessKeyPermissionView::FunctionCall { allowance, ..} = key.permission else {
let AccessKeyPermissionView::FunctionCall { allowance, .. } = key.permission else {
panic!("should be function access key")
};
assert_eq!(
Expand Down
4 changes: 3 additions & 1 deletion runtime/runtime-params-estimator/src/gas_cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ impl GasCost {

/// Like [`std::cmp::Ord::min`] but operates on heterogenous types ([`GasCost`] + [`Gas`]).
pub(crate) fn min_gas(mut self, gas: Gas) -> Self {
let Some(to_add) = gas.checked_sub(self.to_gas()) else { return self; };
let Some(to_add) = gas.checked_sub(self.to_gas()) else {
return self;
};
if let Some(qemu) = &mut self.qemu {
// QEMU gas is split across multiple components (instructions
// and IO). When rounding up to an amount of gas, the assumption
Expand Down
5 changes: 3 additions & 2 deletions tools/state-viewer/src/epoch_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn display_block_and_chunk_producers(
let bp = epoch_info.get_validator(bp).account_id().clone();
let cps: Vec<AccountId> = (0..num_shards)
.map(|shard_id| {
let cp = epoch_info.sample_chunk_producer(block_height, shard_id);
let cp = epoch_info.sample_chunk_producer(block_height, shard_id).unwrap();
let cp = epoch_info.get_validator(cp).account_id().clone();
cp
})
Expand Down Expand Up @@ -259,7 +259,8 @@ fn display_validator_info(
.clone()
.map(|shard_id| (block_height, shard_id))
.filter(|&(block_height, shard_id)| {
epoch_info.sample_chunk_producer(block_height, shard_id) == *validator_id
epoch_info.sample_chunk_producer(block_height, shard_id).unwrap()
== *validator_id
})
.collect::<Vec<(BlockHeight, ShardId)>>()
})
Expand Down

0 comments on commit 084cf19

Please sign in to comment.