Skip to content

Commit

Permalink
feat: file upload and calculate tag (#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
ytqaljn authored Dec 13, 2023
1 parent 4d94ad0 commit b543038
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 160 deletions.
4 changes: 0 additions & 4 deletions pallets/file-bank/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +0,0 @@
// The average number of bytes that a storage node can transmit within each block
pub(super) const TRANSFER_RATE: u128 = 2_089_446;

pub(super) const CALCULATE_RATE: u128 = 838_860;
87 changes: 26 additions & 61 deletions pallets/file-bank/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl<T: Config> Pallet<T> {
let frag_info = FragmentInfo::<T> {
hash: *fragment_hash,
avail: true,
tag: None,
miner: complete_list[index as usize].miner.clone(),
};

Expand Down Expand Up @@ -106,15 +107,8 @@ impl<T: Config> Pallet<T> {
user_brief: UserBrief<T>,
file_size: u128,
) -> DispatchResult {
let space = Self::cal_file_size(file_info.len() as u128);

let life = space / TRANSFER_RATE + 1;

Self::start_first_task(file_hash.0.to_vec(), file_hash, 1, life as u32)?;

let deal = DealInfo::<T> {
stage: 1,
count: 0,
file_size,
segment_list: file_info.clone(),
user: user_brief,
Expand All @@ -126,45 +120,7 @@ impl<T: Config> Pallet<T> {
Ok(())
}

pub(super) fn start_first_task(task_id: Vec<u8>, deal_hash: Hash, count: u8, life: u32) -> DispatchResult {
let start: u32 = <frame_system::Pallet<T>>::block_number().saturated_into();
let survival_block = start
// temp
// .checked_add(50 * (count as u32)).ok_or(Error::<T>::Overflow)?
.checked_add(50).ok_or(Error::<T>::Overflow)?
.checked_add(life).ok_or(Error::<T>::Overflow)?;

T::FScheduler::schedule_named(
task_id,
DispatchTime::At(survival_block.saturated_into()),
Option::None,
schedule::HARD_DEADLINE,
frame_system::RawOrigin::Root.into(),
Call::deal_timing_task{deal_hash: deal_hash, count: count, life: life}.into(),
).map_err(|_| Error::<T>::Unexpected)?;

Ok(())
}

pub(super) fn start_second_task(task_id: Vec<u8>, deal_hash: Hash, life: u32) -> DispatchResult {
let start: u32 = <frame_system::Pallet<T>>::block_number().saturated_into();
// todo! calculate time
let survival_block = start
.checked_add(life).ok_or(Error::<T>::Overflow)?;

T::FScheduler::schedule_named(
task_id,
DispatchTime::At(survival_block.saturated_into()),
Option::None,
schedule::HARD_DEADLINE,
frame_system::RawOrigin::Root.into(),
Call::calculate_end{deal_hash: deal_hash}.into(),
).map_err(|_| Error::<T>::Unexpected)?;

Ok(())
}

pub(super) fn remove_deal(deal_hash: &Hash) -> DispatchResult {
pub fn remove_deal(deal_hash: &Hash) -> DispatchResult {
let deal_info = <DealMap<T>>::try_get(deal_hash).map_err(|_| Error::<T>::NonExistent)?;
let segment_len = deal_info.segment_list.len() as u128;
let needed_space = Self::cal_file_size(segment_len);
Expand All @@ -185,7 +141,6 @@ impl<T: Config> Pallet<T> {

pub(super) fn delete_user_file(file_hash: &Hash, acc: &AccountOf<T>, file: &FileInfo<T>) -> Result<Weight, DispatchError> {
let mut weight: Weight = Weight::zero();
ensure!(file.stat != FileState::Calculate, Error::<T>::Calculate);

for user_brief in file.owner.iter() {
if &user_brief.user == acc {
Expand Down Expand Up @@ -261,41 +216,51 @@ impl<T: Config> Pallet<T> {
let mut total_fragment_dec = 0;
// Used to record and store the amount of service space that miners need to reduce,
// and read changes once through counting
let mut miner_list: BTreeMap<AccountOf<T>, Vec<Hash>> = Default::default();
let mut miner_list: BTreeMap<AccountOf<T>, Vec<(Hash, bool)>> = Default::default();
// Traverse every segment
for segment_info in file.segment_list.iter() {
for fragment_info in segment_info.fragment_list.iter() {
// The total number of fragments in a file should never exceed u32
total_fragment_dec += 1;
let tag_avail = match fragment_info.tag {
Some(_) => true,
None => false,
};
if miner_list.contains_key(&fragment_info.miner) {
let temp_list = miner_list.get_mut(&fragment_info.miner).ok_or(Error::<T>::BugInvalid)?;
// The total number of fragments in a file should never exceed u32
temp_list.push(fragment_info.hash);
temp_list.push((fragment_info.hash, tag_avail));
} else {
miner_list.insert(fragment_info.miner.clone(), vec![fragment_info.hash]);
miner_list.insert(fragment_info.miner.clone(), vec![(fragment_info.hash, tag_avail)]);
}
}
}

for (miner, hash_list) in miner_list.iter() {
let count = hash_list.len() as u128;

if T::MinerControl::restoral_target_is_exist(miner) {
T::MinerControl::update_restoral_target(miner, FRAGMENT_SIZE * count)?;
weight = weight.saturating_add(T::DbWeight::get().reads_writes(1, 1));
} else {
let mut count: u128 = 0;
let mut unlock_count: u128 = 0;
let mut binary_list: Vec<Box<[u8; 256]>> = Default::default();
for fragment_hash in hash_list {
let binary_temp = fragment_hash.binary().map_err(|_| Error::<T>::BugInvalid)?;
binary_list.push(binary_temp);
}
if file.stat == FileState::Active {
T::MinerControl::sub_miner_service_space(miner, FRAGMENT_SIZE * count)?;
T::MinerControl::delete_service_bloom(miner, binary_list)?;
}
if file.stat == FileState::Calculate {
T::MinerControl::unlock_space(miner, FRAGMENT_SIZE * count)?;
for (fragment_hash, tag_avail) in hash_list {
if *tag_avail {
let binary_temp = fragment_hash.binary().map_err(|_| Error::<T>::BugInvalid)?;
binary_list.push(binary_temp);
count = count + 1;
} else {
unlock_count = unlock_count + 1;
}
}
T::MinerControl::sub_miner_service_space(miner, FRAGMENT_SIZE * count)?;
T::MinerControl::delete_service_bloom(miner, binary_list)?;
T::MinerControl::unlock_space_direct(miner, FRAGMENT_SIZE * unlock_count)?;
weight = weight.saturating_add(T::DbWeight::get().reads_writes(3, 3));
}
weight = weight.saturating_add(T::DbWeight::get().reads_writes(1, 1));

}

if user_clear {
Expand Down
1 change: 0 additions & 1 deletion pallets/file-bank/src/impls/dealimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ impl<T: Config> DealInfo<T> {
}

pub fn completed_all(&mut self) -> DispatchResult {
self.stage = 2;
for complete_info in self.complete_list.iter() {
<PendingReplacements<T>>::try_mutate(&complete_info.miner, |pending_space| -> DispatchResult {
let replace_space = FRAGMENT_SIZE
Expand Down
11 changes: 3 additions & 8 deletions pallets/file-bank/src/impls/receptionist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<T: Config> Receptionist<T> {
deal_info.segment_list.clone(),
deal_info.complete_list.clone(),
deal_info.user.clone(),
FileState::Calculate,
FileState::Active,
deal_info.file_size,
)?;

Expand All @@ -61,19 +61,14 @@ impl<T: Config> Receptionist<T> {
if let Err(_) = result {
log::info!("transfer report cancel schedule failed: {:?}", deal_hash.clone());
}
// Calculate the maximum time required for storage nodes to tag files
let max_needed_cal_space = (segment_count as u32).checked_mul(FRAGMENT_SIZE as u32).ok_or(Error::<T>::Overflow)?;
let mut life: u32 = (max_needed_cal_space / TRANSFER_RATE as u32).checked_add(11).ok_or(Error::<T>::Overflow)?;
life = (max_needed_cal_space / CALCULATE_RATE as u32)
.checked_add(100).ok_or(Error::<T>::Overflow)?
.checked_add(life).ok_or(Error::<T>::Overflow)?;
Pallet::<T>::start_second_task(deal_hash.0.to_vec(), deal_hash, life)?;

if <Bucket<T>>::contains_key(&deal_info.user.user, &deal_info.user.bucket_name) {
Pallet::<T>::add_file_to_bucket(&deal_info.user.user, &deal_info.user.bucket_name, &deal_hash)?;
} else {
Pallet::<T>::create_bucket_helper(&deal_info.user.user, &deal_info.user.bucket_name, Some(deal_hash))?;
}
Pallet::<T>::add_user_hold_fileslice(&deal_info.user.user, deal_hash.clone(), needed_space)?;
<DealMap<T>>::remove(deal_hash);
Pallet::<T>::deposit_event(Event::<T>::StorageCompleted{ file_hash: deal_hash });
}

Expand Down
114 changes: 35 additions & 79 deletions pallets/file-bank/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
use frame_support::traits::{
FindAuthor, Randomness,
StorageVersion,
schedule::{Anon as ScheduleAnon, DispatchTime, Named as ScheduleNamed},
schedule::{Anon as ScheduleAnon, Named as ScheduleNamed},
};
// use sc_network::Multiaddr;

Expand All @@ -43,7 +43,6 @@ pub use types::*;
mod functions;

mod constants;
use constants::*;

mod impls;
use impls::receptionist::Receptionist;
Expand All @@ -56,7 +55,6 @@ use frame_support::{
dispatch::DispatchResult,
pallet_prelude::*,
weights::Weight,
traits::schedule,
};
use frame_system::pallet_prelude::*;
use scale_info::TypeInfo;
Expand Down Expand Up @@ -193,6 +191,8 @@ pub mod pallet {
RecoveryCompleted { miner: AccountOf<T>, order_id: Hash },

StorageCompleted { file_hash: Hash },

CalculateReport { miner: AccountOf<T>, file_hash: Hash },
}

#[pallet::error]
Expand Down Expand Up @@ -456,48 +456,6 @@ pub mod pallet {

Ok(())
}

/// Reassign Miners for a Storage Deal
///
/// This function is used to reassign miners for an existing storage deal. It is typically called when a deal
/// needs to be modified or if the storage task fails for some miners in the deal. The function allows reassigning
/// miners to ensure the storage deal is fulfilled within the specified parameters.
///
/// Parameters:
/// - `origin`: The origin of the transaction, expected to be a root/administrator account.
/// - `deal_hash`: The unique hash identifier of the storage deal.
/// - `count`: The new count (number of miners) for the storage deal. Must be less than or equal to 20.
/// - `life`: The duration (in blocks) for which the storage deal will remain active.
#[pallet::call_index(1)]
// #[transactional]
#[pallet::weight(
{
let v = Pallet::<T>::get_segment_length_from_deal(&deal_hash);
<T as pallet::Config>::WeightInfo::deal_reassign_miner(v)
})]
pub fn deal_timing_task(
origin: OriginFor<T>,
deal_hash: Hash,
count: u8,
life: u32,
) -> DispatchResult {
let _ = ensure_root(origin)?;

if count < 2 {
if let Err(_e) = <DealMap<T>>::try_mutate(&deal_hash, |deal_opt| -> DispatchResult {
let deal_info = deal_opt.as_mut().ok_or(Error::<T>::NonExistent)?;
deal_info.count = count;
Self::start_first_task(deal_hash.0.to_vec(), deal_hash, count + 1, life)?;
Ok(())
}) {
Self::remove_deal(&deal_hash)?;
}
} else {
Self::remove_deal(&deal_hash)?;
}

Ok(())
}

/// Transfer Ownership of a File
///
Expand Down Expand Up @@ -600,47 +558,40 @@ pub mod pallet {
Ok(())
}

/// Calculate End of a Storage Deal
///
/// This function is used to finalize a storage deal and mark it as successfully completed.
///
/// Parameters:
/// - `origin`: The root origin, which authorizes administrative operations.
/// - `deal_hash`: The unique hash identifier of the storage deal to be finalized.
#[pallet::call_index(4)]
#[transactional]
#[pallet::weight(<T as pallet::Config>::WeightInfo::calculate_end(Pallet::<T>::get_segment_length_from_deal(&deal_hash)))]
pub fn calculate_end(
#[pallet::call_index(1)]
// FIX ME
#[pallet::weight(Weight::zero())]
pub fn calculate_report(
origin: OriginFor<T>,
deal_hash: Hash,
file_hash: Hash,
) -> DispatchResult {
let _ = ensure_root(origin)?;
let sender = ensure_signed(origin)?;

let deal_info = <DealMap<T>>::try_get(&deal_hash).map_err(|_| Error::<T>::NonExistent)?;
let count = deal_info.segment_list.len() as u128;
for complete_info in deal_info.complete_list.iter() {
<File<T>>::try_mutate(&file_hash, |file_info_opt| -> DispatchResult {
let file_info = file_info_opt.as_mut().ok_or(Error::<T>::NonExistent)?;
let now = <frame_system::Pallet<T>>::block_number();
let mut count: u128 = 0;
let mut hash_list: Vec<Box<[u8; 256]>> = Default::default();
for segment in &deal_info.segment_list {
let fragment_hash = segment.fragment_list[(complete_info.index - 1) as usize];
let hash_temp = fragment_hash.binary().map_err(|_| Error::<T>::BugInvalid)?;
hash_list.push(hash_temp);
for segment in file_info.segment_list.iter_mut() {
for fragment in segment.fragment_list.iter_mut() {
if fragment.miner == sender {
fragment.tag = Some(now);
count = count + 1;
let hash_temp = fragment.hash.binary().map_err(|_| Error::<T>::BugInvalid)?;
hash_list.push(hash_temp);
}
}
}
// Accumulate the number of fragments stored by each miner

let unlock_space = FRAGMENT_SIZE.checked_mul(count as u128).ok_or(Error::<T>::Overflow)?;
T::MinerControl::unlock_space_to_service(&complete_info.miner, unlock_space)?;
T::MinerControl::insert_service_bloom(&complete_info.miner, hash_list)?;
}
T::MinerControl::unlock_space_to_service(&sender, unlock_space)?;
T::MinerControl::insert_service_bloom(&sender, hash_list)?;

Self::deposit_event(Event::<T>::CalculateReport{ miner: sender, file_hash: file_hash});

<File<T>>::try_mutate(&deal_hash, |file_opt| -> DispatchResult {
let file = file_opt.as_mut().ok_or(Error::<T>::BugInvalid)?;
file.stat = FileState::Active;
Ok(())
})?;

<DealMap<T>>::remove(&deal_hash);

Self::deposit_event(Event::<T>::CalculateEnd{ file_hash: deal_hash });

Ok(())
}

Expand Down Expand Up @@ -1064,9 +1015,7 @@ pub mod pallet {
if &fragment.hash == &fragment_hash {
if &fragment.miner == &order.origin_miner {
let binary = fragment.hash.binary().map_err(|_| Error::<T>::BugInvalid)?;
T::MinerControl::delete_service_bloom(&fragment.miner, vec![binary.clone()])?;
T::MinerControl::insert_service_bloom(&sender, vec![binary])?;
T::MinerControl::sub_miner_service_space(&fragment.miner, FRAGMENT_SIZE)?;
T::MinerControl::insert_service_bloom(&sender, vec![binary.clone()])?;
T::MinerControl::add_miner_service_space(&sender, FRAGMENT_SIZE)?;

// TODO!
Expand All @@ -1079,6 +1028,13 @@ pub mod pallet {

if T::MinerControl::restoral_target_is_exist(&fragment.miner) {
T::MinerControl::update_restoral_target(&fragment.miner, FRAGMENT_SIZE)?;
} else {
if fragment.tag.is_some() {
T::MinerControl::delete_service_bloom(&fragment.miner, vec![binary])?;
T::MinerControl::sub_miner_service_space(&fragment.miner, FRAGMENT_SIZE)?;
} else {
T::MinerControl::unlock_space_direct(&fragment.miner, FRAGMENT_SIZE)?;
}
}

fragment.avail = true;
Expand Down
3 changes: 1 addition & 2 deletions pallets/file-bank/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ pub enum FileState {
pub struct DealInfo<T: Config> {
// There are two stages in total:
// the first stage and the second stage, represented by 1 or 2, respectively.
pub(super) stage: u8,
pub(super) count: u8,
pub(super) file_size: u128,
pub(super) segment_list: BoundedVec<SegmentList<T>, T::SegmentCount>,
pub(super) user: UserBrief<T>,
Expand Down Expand Up @@ -79,6 +77,7 @@ pub struct SegmentInfo<T: Config> {
pub struct FragmentInfo<T: Config> {
pub(super) hash: Hash,
pub(super) avail: bool,
pub(super) tag: Option<BlockNumberFor<T>>,
pub(super) miner: AccountOf<T>,
}

Expand Down
2 changes: 1 addition & 1 deletion pallets/sminer/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl<T: Config> Pallet<T> {
}
T::StorageHandle::sub_total_idle_space(miner.idle_space + miner.lock_space)?;
T::Currency::unreserve(&miner.staking_account, miner.collaterals);
Self::create_restoral_target(acc, miner.service_space)?;
Self::create_restoral_target(acc, miner.service_space + miner.lock_space)?;
miner.state = Self::str_to_bound(STATE_OFFLINE)?;
let space_proof_info = miner.space_proof_info.clone().ok_or(Error::<T>::NotpositiveState)?;
let encoding = space_proof_info.pois_key.encode();
Expand Down
Loading

0 comments on commit b543038

Please sign in to comment.