Skip to content
This repository has been archived by the owner on Jun 8, 2022. It is now read-only.

Commit

Permalink
feat(key snapshot by prover id + sector size) (#4)
Browse files Browse the repository at this point in the history
* feat(key snapshot by prover id + sector size)

* feat(don't be a memory hog)

- don't copy memory from SectorBuilderState to StateSnapshot
- don't use a StateSnapshot type at all
- pass references, not owned (cloned) values to snapshotting stuff
  • Loading branch information
laser authored Jun 20, 2019
1 parent a09e0f0 commit c08e744
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 77 deletions.
5 changes: 4 additions & 1 deletion sector-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::{mpsc, Arc, Mutex};

use filecoin_proofs::error::ExpectWithBacktrace;
use filecoin_proofs::post_adapter::*;
use filecoin_proofs::types::SectorClass;
use filecoin_proofs::types::{PaddedBytesAmount, SectorClass};
use slog::*;

use crate::constants::*;
Expand Down Expand Up @@ -74,6 +74,8 @@ impl SectorBuilder {
(tx, workers)
};

let SectorClass(sector_size, _, _) = sector_class;

// Configure main worker.
let main_worker = Scheduler::start_with_metadata(
main_rx,
Expand All @@ -84,6 +86,7 @@ impl SectorBuilder {
last_committed_sector_id,
max_num_staged_sectors,
prover_id,
PaddedBytesAmount::from(sector_size),
);

Ok(SectorBuilder {
Expand Down
5 changes: 2 additions & 3 deletions sector-builder/src/helpers/get_seal_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ pub fn get_seal_status(

#[cfg(test)]
mod tests {
use super::*;

use std::collections::HashMap;

use crate::metadata::{SealedSectorMetadata, StagedSectorMetadata};
use crate::state::{SealedState, SectorBuilderState, StagedState};

use super::*;

fn setup() -> SectorBuilderState {
let mut staged_sectors: HashMap<SectorId, StagedSectorMetadata> = Default::default();
let mut sealed_sectors: HashMap<SectorId, SealedSectorMetadata> = Default::default();
Expand Down Expand Up @@ -61,7 +61,6 @@ mod tests {
);

SectorBuilderState {
prover_id: Default::default(),
staged: StagedState {
sector_id_nonce: 0,
sectors: staged_sectors,
Expand Down
129 changes: 86 additions & 43 deletions sector-builder/src/helpers/snapshots.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
use std::sync::Arc;

use byteorder::{LittleEndian, WriteBytesExt};
use filecoin_proofs::types::PaddedBytesAmount;

use crate::builder::WrappedKeyValueStore;
use crate::error::Result;
use crate::kv_store::KeyValueStore;
use crate::state::*;

pub struct SnapshotKey {
prover_id: [u8; 31],
sector_size: PaddedBytesAmount,
}

impl SnapshotKey {
pub fn new(prover_id: [u8; 31], sector_size: PaddedBytesAmount) -> SnapshotKey {
SnapshotKey {
prover_id,
sector_size,
}
}
}

pub fn load_snapshot<T: KeyValueStore>(
kv_store: &Arc<WrappedKeyValueStore<T>>,
prover_id: &[u8; 31],
) -> Result<Option<StateSnapshot>> {
let result: Option<Vec<u8>> = kv_store.inner().get(prover_id)?;
key: &SnapshotKey,
) -> Result<Option<SectorBuilderState>> {
let result: Option<Vec<u8>> = kv_store.inner().get(&Vec::from(key))?;

if let Some(val) = result {
return serde_cbor::from_slice(&val[..])
Expand All @@ -20,80 +37,106 @@ pub fn load_snapshot<T: KeyValueStore>(
Ok(None)
}

impl From<&SnapshotKey> for Vec<u8> {
fn from(n: &SnapshotKey) -> Self {
// convert the sector size to a byte vector
let mut snapshot_key = vec![];
snapshot_key
.write_u64::<LittleEndian>(u64::from(n.sector_size))
.unwrap();

// concatenate the prover id bytes
snapshot_key.extend_from_slice(&n.prover_id[..]);

snapshot_key
}
}

pub fn persist_snapshot<T: KeyValueStore>(
kv_store: &Arc<WrappedKeyValueStore<T>>,
snapshot: &StateSnapshot,
key: &SnapshotKey,
state: &SectorBuilderState,
) -> Result<()> {
let serialized = serde_cbor::to_vec(snapshot)?;
kv_store.inner().put(&snapshot.prover_id[..], &serialized)?;
let serialized = serde_cbor::to_vec(state)?;
kv_store.inner().put(&Vec::from(key), &serialized)?;
Ok(())
}

pub fn make_snapshot(
prover_id: &[u8; 31],
staged_state: &StagedState,
sealed_state: &SealedState,
) -> StateSnapshot {
StateSnapshot {
prover_id: *prover_id,
staged: StagedState {
sector_id_nonce: staged_state.sector_id_nonce,
sectors: staged_state.sectors.clone(),
},
sealed: SealedState {
sectors: sealed_state.sectors.clone(),
},
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;

use crate::builder::{SectorId, WrappedKeyValueStore};
use crate::kv_store::SledKvs;
use crate::metadata::StagedSectorMetadata;
use crate::state::{SealedState, StagedState};
use crate::state::StagedState;

use super::*;

#[test]
fn test_alpha() {
fn test_snapshotting() {
let metadata_dir = tempfile::tempdir().unwrap();

let kv_store = Arc::new(WrappedKeyValueStore::new(
SledKvs::initialize(metadata_dir).unwrap(),
));

let prover_id = [0; 31];

let (staged_state, sealed_state) = {
// create a snapshot to persist and load
let snapshot_a = {
let mut m: HashMap<SectorId, StagedSectorMetadata> = HashMap::new();

m.insert(123, Default::default());

let staged_state = Mutex::new(StagedState {
let staged_state = StagedState {
sector_id_nonce: 100,
sectors: m,
});
};

let sealed_state = Default::default();

SectorBuilderState {
staged: staged_state,
sealed: sealed_state,
}
};

// create a second (different) snapshot
let snapshot_b = {
let mut m: HashMap<SectorId, StagedSectorMetadata> = HashMap::new();

m.insert(666, Default::default());

let staged_state = StagedState {
sector_id_nonce: 102,
sectors: m,
};

let sealed_state: Mutex<SealedState> = Default::default();
let sealed_state = Default::default();

(staged_state, sealed_state)
SectorBuilderState {
staged: staged_state,
sealed: sealed_state,
}
};

let to_persist = make_snapshot(
&prover_id,
&staged_state.lock().unwrap(),
&sealed_state.lock().unwrap(),
);
let key_a = SnapshotKey::new([0; 31], PaddedBytesAmount(1024));
let key_b = SnapshotKey::new([0; 31], PaddedBytesAmount(1111));
let key_c = SnapshotKey::new([1; 31], PaddedBytesAmount(1024));

// persist both snapshots
let _ = persist_snapshot(&kv_store, &key_a, &snapshot_a).unwrap();
let _ = persist_snapshot(&kv_store, &key_b, &snapshot_b).unwrap();

let _ = persist_snapshot(&kv_store, &to_persist).unwrap();
// load both snapshots
let loaded_a = load_snapshot(&kv_store, &key_a).unwrap().unwrap();
let loaded_b = load_snapshot(&kv_store, &key_b).unwrap().unwrap();

let loaded = load_snapshot(&kv_store, &prover_id).unwrap().unwrap();
// key corresponds to no snapshot
let lookup_miss = load_snapshot(&kv_store, &key_c).unwrap();

assert_eq!(to_persist, loaded);
assert_eq!(snapshot_a, loaded_a);
assert_eq!(snapshot_b, loaded_b);
assert_eq!(true, lookup_miss.is_none());
}
}
25 changes: 14 additions & 11 deletions sector-builder/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ use std::thread;
use filecoin_proofs::error::ExpectWithBacktrace;
use filecoin_proofs::generate_post;
use filecoin_proofs::post_adapter::*;
use filecoin_proofs::types::UnpaddedBytesAmount;

use crate::builder::{SectorId, WrappedKeyValueStore};
use crate::error::{err_piecenotfound, err_unrecov, Result};
use crate::helpers::{
add_piece, get_seal_status, get_sectors_ready_for_sealing, load_snapshot, make_snapshot,
persist_snapshot,
add_piece, get_seal_status, get_sectors_ready_for_sealing, load_snapshot, persist_snapshot,
SnapshotKey,
};
use crate::kv_store::KeyValueStore;
use crate::metadata::{SealStatus, SealedSectorMetadata, StagedSectorMetadata};
use crate::sealer::SealerInput;
use crate::state::{SectorBuilderState, StagedState};
use crate::store::SectorStore;
use crate::{PaddedBytesAmount, UnpaddedBytesAmount};

const FATAL_NOLOAD: &str = "could not load snapshot";
const FATAL_NORECV: &str = "could not receive task";
Expand Down Expand Up @@ -60,18 +60,18 @@ impl Scheduler {
last_committed_sector_id: SectorId,
max_num_staged_sectors: u8,
prover_id: [u8; 31],
sector_size: PaddedBytesAmount,
) -> Scheduler {
let thread = thread::spawn(move || {
// Build the scheduler's initial state. If available, we
// reconstitute this state from persisted metadata. If not, we
// create it from scratch.
let state = {
let loaded = load_snapshot(&kv_store, &prover_id)
let loaded = load_snapshot(&kv_store, &SnapshotKey::new(prover_id, sector_size))
.expects(FATAL_NOLOAD)
.map(Into::into);

loaded.unwrap_or_else(|| SectorBuilderState {
prover_id,
staged: StagedState {
sector_id_nonce: last_committed_sector_id,
sectors: Default::default(),
Expand All @@ -91,6 +91,8 @@ impl Scheduler {
scheduler_input_tx: scheduler_input_tx.clone(),
max_num_staged_sectors,
max_user_bytes_per_staged_sector,
prover_id,
sector_size,
};

loop {
Expand Down Expand Up @@ -143,6 +145,8 @@ pub struct SectorMetadataManager<T: KeyValueStore, S: SectorStore> {
scheduler_input_tx: mpsc::SyncSender<Request>,
max_num_staged_sectors: u8,
max_user_bytes_per_staged_sector: UnpaddedBytesAmount,
prover_id: [u8; 31],
sector_size: PaddedBytesAmount,
}

impl<T: KeyValueStore, S: SectorStore> SectorMetadataManager<T, S> {
Expand Down Expand Up @@ -329,12 +333,11 @@ impl<T: KeyValueStore, S: SectorStore> SectorMetadataManager<T, S> {

// Create and persist metadata snapshot.
fn checkpoint(&self) -> Result<()> {
let snapshot = make_snapshot(
&self.state.prover_id,
&self.state.staged,
&self.state.sealed,
);
persist_snapshot(&self.kv_store, &snapshot)?;
persist_snapshot(
&self.kv_store,
&SnapshotKey::new(self.prover_id, self.sector_size),
&self.state,
)?;

Ok(())
}
Expand Down
20 changes: 1 addition & 19 deletions sector-builder/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,8 @@ pub struct SealedState {
pub sectors: HashMap<SectorId, SealedSectorMetadata>,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Default, Serialize, Deserialize, Debug, PartialEq)]
pub struct SectorBuilderState {
pub prover_id: [u8; 31],
pub staged: StagedState,
pub sealed: SealedState,
}

#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct StateSnapshot {
pub prover_id: [u8; 31],
pub staged: StagedState,
pub sealed: SealedState,
}

impl Into<SectorBuilderState> for StateSnapshot {
fn into(self) -> SectorBuilderState {
SectorBuilderState {
prover_id: self.prover_id,
staged: self.staged,
sealed: self.sealed,
}
}
}

0 comments on commit c08e744

Please sign in to comment.